~ubuntu-branches/ubuntu/wily/python-concurrent.futures/wily-proposed

« back to all changes in this revision

Viewing changes to concurrent/futures/thread.py

  • Committer: Package Import Robot
  • Author(s): Thomas Goirand
  • Date: 2015-07-15 12:41:55 UTC
  • mfrom: (1.1.2) (2.1.5 sid)
  • Revision ID: package-import@ubuntu.com-20150715124155-0rnlixg5ddx38z41
Tags: 3.0.3-1
* New upstream release.
* Fixed (build-)depends for this release: dropping version for python-all,
  adding dh-python, etc.
* Standards-Version is now 3.9.6.

Show diffs side-by-side

added added

removed removed

Lines of Context:
3
3
 
4
4
"""Implements ThreadPoolExecutor."""
5
5
 
6
 
from __future__ import with_statement
7
6
import atexit
 
7
from concurrent.futures import _base
 
8
import Queue as queue
8
9
import threading
9
10
import weakref
10
11
import sys
11
12
 
12
 
from concurrent.futures import _base
13
 
 
14
 
try:
15
 
    import queue
16
 
except ImportError:
17
 
    import Queue as queue
18
 
 
19
13
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
20
14
 
21
15
# Workers are created as daemon threads. This is done to allow the interpreter
32
26
# workers to exit when their work queues are empty and then waits until the
33
27
# threads finish.
34
28
 
35
 
_thread_references = set()
 
29
_threads_queues = weakref.WeakKeyDictionary()
36
30
_shutdown = False
37
31
 
38
32
def _python_exit():
39
33
    global _shutdown
40
34
    _shutdown = True
41
 
    for thread_reference in _thread_references:
42
 
        thread = thread_reference()
43
 
        if thread is not None:
44
 
            thread.join()
45
 
 
46
 
def _remove_dead_thread_references():
47
 
    """Remove inactive threads from _thread_references.
48
 
 
49
 
    Should be called periodically to prevent memory leaks in scenarios such as:
50
 
    >>> while True:
51
 
    ...    t = ThreadPoolExecutor(max_workers=5)
52
 
    ...    t.map(int, ['1', '2', '3', '4', '5'])
53
 
    """
54
 
    for thread_reference in set(_thread_references):
55
 
        if thread_reference() is None:
56
 
            _thread_references.discard(thread_reference)
 
35
    items = list(_threads_queues.items()) if _threads_queues else ()
 
36
    for t, q in items:
 
37
        q.put(None)
 
38
    for t, q in items:
 
39
        t.join()
57
40
 
58
41
atexit.register(_python_exit)
59
42
 
71
54
        try:
72
55
            result = self.fn(*self.args, **self.kwargs)
73
56
        except BaseException:
74
 
            e = sys.exc_info()[1]
75
 
            self.future.set_exception(e)
 
57
            e, tb = sys.exc_info()[1:]
 
58
            self.future.set_exception_info(e, tb)
76
59
        else:
77
60
            self.future.set_result(result)
78
61
 
79
62
def _worker(executor_reference, work_queue):
80
63
    try:
81
64
        while True:
82
 
            try:
83
 
                work_item = work_queue.get(block=True, timeout=0.1)
84
 
            except queue.Empty:
85
 
                executor = executor_reference()
86
 
                # Exit if:
87
 
                #   - The interpreter is shutting down OR
88
 
                #   - The executor that owns the worker has been collected OR
89
 
                #   - The executor that owns the worker has been shutdown.
90
 
                if _shutdown or executor is None or executor._shutdown:
91
 
                    return
92
 
                del executor
93
 
            else:
 
65
            work_item = work_queue.get(block=True)
 
66
            if work_item is not None:
94
67
                work_item.run()
 
68
                # Delete references to object. See issue16284
 
69
                del work_item
 
70
                continue
 
71
            executor = executor_reference()
 
72
            # Exit if:
 
73
            #   - The interpreter is shutting down OR
 
74
            #   - The executor that owns the worker has been collected OR
 
75
            #   - The executor that owns the worker has been shutdown.
 
76
            if _shutdown or executor is None or executor._shutdown:
 
77
                # Notice other workers
 
78
                work_queue.put(None)
 
79
                return
 
80
            del executor
95
81
    except BaseException:
96
82
        _base.LOGGER.critical('Exception in worker', exc_info=True)
97
83
 
103
89
            max_workers: The maximum number of threads that can be used to
104
90
                execute the given calls.
105
91
        """
106
 
        _remove_dead_thread_references()
107
 
 
108
92
        self._max_workers = max_workers
109
93
        self._work_queue = queue.Queue()
110
94
        self._threads = set()
125
109
    submit.__doc__ = _base.Executor.submit.__doc__
126
110
 
127
111
    def _adjust_thread_count(self):
 
112
        # When the executor gets lost, the weakref callback will wake up
 
113
        # the worker threads.
 
114
        def weakref_cb(_, q=self._work_queue):
 
115
            q.put(None)
128
116
        # TODO(bquinlan): Should avoid creating new threads if there are more
129
117
        # idle threads than items in the work queue.
130
118
        if len(self._threads) < self._max_workers:
131
119
            t = threading.Thread(target=_worker,
132
 
                                 args=(weakref.ref(self), self._work_queue))
 
120
                                 args=(weakref.ref(self, weakref_cb),
 
121
                                       self._work_queue))
133
122
            t.daemon = True
134
123
            t.start()
135
124
            self._threads.add(t)
136
 
            _thread_references.add(weakref.ref(t))
 
125
            _threads_queues[t] = self._work_queue
137
126
 
138
127
    def shutdown(self, wait=True):
139
128
        with self._shutdown_lock:
140
129
            self._shutdown = True
 
130
            self._work_queue.put(None)
141
131
        if wait:
142
132
            for t in self._threads:
143
133
                t.join()