~inkscape.dev/inkscape-devlibs64/trunk

« back to all changes in this revision

Viewing changes to python/Lib/multiprocessing/pool.py

  • Committer: Eduard Braun
  • Date: 2016-10-22 16:51:19 UTC
  • Revision ID: eduard.braun2@gmx.de-20161022165119-9eosgy6lp8j1kzli
Update Python to version 2.7.12

Included modules:
  coverage 4.2
  lxml 3.6.4
  numpy 1.11.2
  scour 0.35
  six 1.10.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#
2
 
# Module providing the `Pool` class for managing a process pool
3
 
#
4
 
# multiprocessing/pool.py
5
 
#
6
 
# Copyright (c) 2006-2008, R Oudkerk
7
 
# All rights reserved.
8
 
#
9
 
# Redistribution and use in source and binary forms, with or without
10
 
# modification, are permitted provided that the following conditions
11
 
# are met:
12
 
#
13
 
# 1. Redistributions of source code must retain the above copyright
14
 
#    notice, this list of conditions and the following disclaimer.
15
 
# 2. Redistributions in binary form must reproduce the above copyright
16
 
#    notice, this list of conditions and the following disclaimer in the
17
 
#    documentation and/or other materials provided with the distribution.
18
 
# 3. Neither the name of author nor the names of any contributors may be
19
 
#    used to endorse or promote products derived from this software
20
 
#    without specific prior written permission.
21
 
#
22
 
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23
 
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24
 
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25
 
# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26
 
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27
 
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28
 
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29
 
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31
 
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32
 
# SUCH DAMAGE.
33
 
#
34
 
 
35
 
__all__ = ['Pool']
36
 
 
37
 
#
38
 
# Imports
39
 
#
40
 
 
41
 
import threading
42
 
import Queue
43
 
import itertools
44
 
import collections
45
 
import time
46
 
 
47
 
from multiprocessing import Process, cpu_count, TimeoutError
48
 
from multiprocessing.util import Finalize, debug
49
 
 
50
 
#
51
 
# Constants representing the state of a pool
52
 
#
53
 
 
54
 
RUN = 0
55
 
CLOSE = 1
56
 
TERMINATE = 2
57
 
 
58
 
#
59
 
# Miscellaneous
60
 
#
61
 
 
62
 
job_counter = itertools.count()
63
 
 
64
 
def mapstar(args):
65
 
    return map(*args)
66
 
 
67
 
#
68
 
# Code run by worker processes
69
 
#
70
 
 
71
 
class MaybeEncodingError(Exception):
72
 
    """Wraps possible unpickleable errors, so they can be
73
 
    safely sent through the socket."""
74
 
 
75
 
    def __init__(self, exc, value):
76
 
        self.exc = repr(exc)
77
 
        self.value = repr(value)
78
 
        super(MaybeEncodingError, self).__init__(self.exc, self.value)
79
 
 
80
 
    def __str__(self):
81
 
        return "Error sending result: '%s'. Reason: '%s'" % (self.value,
82
 
                                                             self.exc)
83
 
 
84
 
    def __repr__(self):
85
 
        return "<MaybeEncodingError: %s>" % str(self)
86
 
 
87
 
 
88
 
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
89
 
    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
90
 
    put = outqueue.put
91
 
    get = inqueue.get
92
 
    if hasattr(inqueue, '_writer'):
93
 
        inqueue._writer.close()
94
 
        outqueue._reader.close()
95
 
 
96
 
    if initializer is not None:
97
 
        initializer(*initargs)
98
 
 
99
 
    completed = 0
100
 
    while maxtasks is None or (maxtasks and completed < maxtasks):
101
 
        try:
102
 
            task = get()
103
 
        except (EOFError, IOError):
104
 
            debug('worker got EOFError or IOError -- exiting')
105
 
            break
106
 
 
107
 
        if task is None:
108
 
            debug('worker got sentinel -- exiting')
109
 
            break
110
 
 
111
 
        job, i, func, args, kwds = task
112
 
        try:
113
 
            result = (True, func(*args, **kwds))
114
 
        except Exception, e:
115
 
            result = (False, e)
116
 
        try:
117
 
            put((job, i, result))
118
 
        except Exception as e:
119
 
            wrapped = MaybeEncodingError(e, result[1])
120
 
            debug("Possible encoding error while sending result: %s" % (
121
 
                wrapped))
122
 
            put((job, i, (False, wrapped)))
123
 
        completed += 1
124
 
    debug('worker exiting after %d tasks' % completed)
125
 
 
126
 
#
127
 
# Class representing a process pool
128
 
#
129
 
 
130
 
class Pool(object):
131
 
    '''
132
 
    Class which supports an async version of the `apply()` builtin
133
 
    '''
134
 
    Process = Process
135
 
 
136
 
    def __init__(self, processes=None, initializer=None, initargs=(),
137
 
                 maxtasksperchild=None):
138
 
        self._setup_queues()
139
 
        self._taskqueue = Queue.Queue()
140
 
        self._cache = {}
141
 
        self._state = RUN
142
 
        self._maxtasksperchild = maxtasksperchild
143
 
        self._initializer = initializer
144
 
        self._initargs = initargs
145
 
 
146
 
        if processes is None:
147
 
            try:
148
 
                processes = cpu_count()
149
 
            except NotImplementedError:
150
 
                processes = 1
151
 
        if processes < 1:
152
 
            raise ValueError("Number of processes must be at least 1")
153
 
 
154
 
        if initializer is not None and not hasattr(initializer, '__call__'):
155
 
            raise TypeError('initializer must be a callable')
156
 
 
157
 
        self._processes = processes
158
 
        self._pool = []
159
 
        self._repopulate_pool()
160
 
 
161
 
        self._worker_handler = threading.Thread(
162
 
            target=Pool._handle_workers,
163
 
            args=(self, )
164
 
            )
165
 
        self._worker_handler.daemon = True
166
 
        self._worker_handler._state = RUN
167
 
        self._worker_handler.start()
168
 
 
169
 
 
170
 
        self._task_handler = threading.Thread(
171
 
            target=Pool._handle_tasks,
172
 
            args=(self._taskqueue, self._quick_put, self._outqueue,
173
 
                  self._pool, self._cache)
174
 
            )
175
 
        self._task_handler.daemon = True
176
 
        self._task_handler._state = RUN
177
 
        self._task_handler.start()
178
 
 
179
 
        self._result_handler = threading.Thread(
180
 
            target=Pool._handle_results,
181
 
            args=(self._outqueue, self._quick_get, self._cache)
182
 
            )
183
 
        self._result_handler.daemon = True
184
 
        self._result_handler._state = RUN
185
 
        self._result_handler.start()
186
 
 
187
 
        self._terminate = Finalize(
188
 
            self, self._terminate_pool,
189
 
            args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
190
 
                  self._worker_handler, self._task_handler,
191
 
                  self._result_handler, self._cache),
192
 
            exitpriority=15
193
 
            )
194
 
 
195
 
    def _join_exited_workers(self):
196
 
        """Cleanup after any worker processes which have exited due to reaching
197
 
        their specified lifetime.  Returns True if any workers were cleaned up.
198
 
        """
199
 
        cleaned = False
200
 
        for i in reversed(range(len(self._pool))):
201
 
            worker = self._pool[i]
202
 
            if worker.exitcode is not None:
203
 
                # worker exited
204
 
                debug('cleaning up worker %d' % i)
205
 
                worker.join()
206
 
                cleaned = True
207
 
                del self._pool[i]
208
 
        return cleaned
209
 
 
210
 
    def _repopulate_pool(self):
211
 
        """Bring the number of pool processes up to the specified number,
212
 
        for use after reaping workers which have exited.
213
 
        """
214
 
        for i in range(self._processes - len(self._pool)):
215
 
            w = self.Process(target=worker,
216
 
                             args=(self._inqueue, self._outqueue,
217
 
                                   self._initializer,
218
 
                                   self._initargs, self._maxtasksperchild)
219
 
                            )
220
 
            self._pool.append(w)
221
 
            w.name = w.name.replace('Process', 'PoolWorker')
222
 
            w.daemon = True
223
 
            w.start()
224
 
            debug('added worker')
225
 
 
226
 
    def _maintain_pool(self):
227
 
        """Clean up any exited workers and start replacements for them.
228
 
        """
229
 
        if self._join_exited_workers():
230
 
            self._repopulate_pool()
231
 
 
232
 
    def _setup_queues(self):
233
 
        from .queues import SimpleQueue
234
 
        self._inqueue = SimpleQueue()
235
 
        self._outqueue = SimpleQueue()
236
 
        self._quick_put = self._inqueue._writer.send
237
 
        self._quick_get = self._outqueue._reader.recv
238
 
 
239
 
    def apply(self, func, args=(), kwds={}):
240
 
        '''
241
 
        Equivalent of `apply()` builtin
242
 
        '''
243
 
        assert self._state == RUN
244
 
        return self.apply_async(func, args, kwds).get()
245
 
 
246
 
    def map(self, func, iterable, chunksize=None):
247
 
        '''
248
 
        Equivalent of `map()` builtin
249
 
        '''
250
 
        assert self._state == RUN
251
 
        return self.map_async(func, iterable, chunksize).get()
252
 
 
253
 
    def imap(self, func, iterable, chunksize=1):
254
 
        '''
255
 
        Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
256
 
        '''
257
 
        assert self._state == RUN
258
 
        if chunksize == 1:
259
 
            result = IMapIterator(self._cache)
260
 
            self._taskqueue.put((((result._job, i, func, (x,), {})
261
 
                         for i, x in enumerate(iterable)), result._set_length))
262
 
            return result
263
 
        else:
264
 
            assert chunksize > 1
265
 
            task_batches = Pool._get_tasks(func, iterable, chunksize)
266
 
            result = IMapIterator(self._cache)
267
 
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
268
 
                     for i, x in enumerate(task_batches)), result._set_length))
269
 
            return (item for chunk in result for item in chunk)
270
 
 
271
 
    def imap_unordered(self, func, iterable, chunksize=1):
272
 
        '''
273
 
        Like `imap()` method but ordering of results is arbitrary
274
 
        '''
275
 
        assert self._state == RUN
276
 
        if chunksize == 1:
277
 
            result = IMapUnorderedIterator(self._cache)
278
 
            self._taskqueue.put((((result._job, i, func, (x,), {})
279
 
                         for i, x in enumerate(iterable)), result._set_length))
280
 
            return result
281
 
        else:
282
 
            assert chunksize > 1
283
 
            task_batches = Pool._get_tasks(func, iterable, chunksize)
284
 
            result = IMapUnorderedIterator(self._cache)
285
 
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
286
 
                     for i, x in enumerate(task_batches)), result._set_length))
287
 
            return (item for chunk in result for item in chunk)
288
 
 
289
 
    def apply_async(self, func, args=(), kwds={}, callback=None):
290
 
        '''
291
 
        Asynchronous equivalent of `apply()` builtin
292
 
        '''
293
 
        assert self._state == RUN
294
 
        result = ApplyResult(self._cache, callback)
295
 
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
296
 
        return result
297
 
 
298
 
    def map_async(self, func, iterable, chunksize=None, callback=None):
299
 
        '''
300
 
        Asynchronous equivalent of `map()` builtin
301
 
        '''
302
 
        assert self._state == RUN
303
 
        if not hasattr(iterable, '__len__'):
304
 
            iterable = list(iterable)
305
 
 
306
 
        if chunksize is None:
307
 
            chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
308
 
            if extra:
309
 
                chunksize += 1
310
 
        if len(iterable) == 0:
311
 
            chunksize = 0
312
 
 
313
 
        task_batches = Pool._get_tasks(func, iterable, chunksize)
314
 
        result = MapResult(self._cache, chunksize, len(iterable), callback)
315
 
        self._taskqueue.put((((result._job, i, mapstar, (x,), {})
316
 
                              for i, x in enumerate(task_batches)), None))
317
 
        return result
318
 
 
319
 
    @staticmethod
320
 
    def _handle_workers(pool):
321
 
        thread = threading.current_thread()
322
 
 
323
 
        # Keep maintaining workers until the cache gets drained, unless the pool
324
 
        # is terminated.
325
 
        while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
326
 
            pool._maintain_pool()
327
 
            time.sleep(0.1)
328
 
        # send sentinel to stop workers
329
 
        pool._taskqueue.put(None)
330
 
        debug('worker handler exiting')
331
 
 
332
 
    @staticmethod
333
 
    def _handle_tasks(taskqueue, put, outqueue, pool, cache):
334
 
        thread = threading.current_thread()
335
 
 
336
 
        for taskseq, set_length in iter(taskqueue.get, None):
337
 
            task = None
338
 
            i = -1
339
 
            try:
340
 
                for i, task in enumerate(taskseq):
341
 
                    if thread._state:
342
 
                        debug('task handler found thread._state != RUN')
343
 
                        break
344
 
                    try:
345
 
                        put(task)
346
 
                    except Exception as e:
347
 
                        job, ind = task[:2]
348
 
                        try:
349
 
                            cache[job]._set(ind, (False, e))
350
 
                        except KeyError:
351
 
                            pass
352
 
                else:
353
 
                    if set_length:
354
 
                        debug('doing set_length()')
355
 
                        set_length(i+1)
356
 
                    continue
357
 
                break
358
 
            except Exception as ex:
359
 
                job, ind = task[:2] if task else (0, 0)
360
 
                if job in cache:
361
 
                    cache[job]._set(ind + 1, (False, ex))
362
 
                if set_length:
363
 
                    debug('doing set_length()')
364
 
                    set_length(i+1)
365
 
        else:
366
 
            debug('task handler got sentinel')
367
 
 
368
 
 
369
 
        try:
370
 
            # tell result handler to finish when cache is empty
371
 
            debug('task handler sending sentinel to result handler')
372
 
            outqueue.put(None)
373
 
 
374
 
            # tell workers there is no more work
375
 
            debug('task handler sending sentinel to workers')
376
 
            for p in pool:
377
 
                put(None)
378
 
        except IOError:
379
 
            debug('task handler got IOError when sending sentinels')
380
 
 
381
 
        debug('task handler exiting')
382
 
 
383
 
    @staticmethod
384
 
    def _handle_results(outqueue, get, cache):
385
 
        thread = threading.current_thread()
386
 
 
387
 
        while 1:
388
 
            try:
389
 
                task = get()
390
 
            except (IOError, EOFError):
391
 
                debug('result handler got EOFError/IOError -- exiting')
392
 
                return
393
 
 
394
 
            if thread._state:
395
 
                assert thread._state == TERMINATE
396
 
                debug('result handler found thread._state=TERMINATE')
397
 
                break
398
 
 
399
 
            if task is None:
400
 
                debug('result handler got sentinel')
401
 
                break
402
 
 
403
 
            job, i, obj = task
404
 
            try:
405
 
                cache[job]._set(i, obj)
406
 
            except KeyError:
407
 
                pass
408
 
 
409
 
        while cache and thread._state != TERMINATE:
410
 
            try:
411
 
                task = get()
412
 
            except (IOError, EOFError):
413
 
                debug('result handler got EOFError/IOError -- exiting')
414
 
                return
415
 
 
416
 
            if task is None:
417
 
                debug('result handler ignoring extra sentinel')
418
 
                continue
419
 
            job, i, obj = task
420
 
            try:
421
 
                cache[job]._set(i, obj)
422
 
            except KeyError:
423
 
                pass
424
 
 
425
 
        if hasattr(outqueue, '_reader'):
426
 
            debug('ensuring that outqueue is not full')
427
 
            # If we don't make room available in outqueue then
428
 
            # attempts to add the sentinel (None) to outqueue may
429
 
            # block.  There is guaranteed to be no more than 2 sentinels.
430
 
            try:
431
 
                for i in range(10):
432
 
                    if not outqueue._reader.poll():
433
 
                        break
434
 
                    get()
435
 
            except (IOError, EOFError):
436
 
                pass
437
 
 
438
 
        debug('result handler exiting: len(cache)=%s, thread._state=%s',
439
 
              len(cache), thread._state)
440
 
 
441
 
    @staticmethod
442
 
    def _get_tasks(func, it, size):
443
 
        it = iter(it)
444
 
        while 1:
445
 
            x = tuple(itertools.islice(it, size))
446
 
            if not x:
447
 
                return
448
 
            yield (func, x)
449
 
 
450
 
    def __reduce__(self):
451
 
        raise NotImplementedError(
452
 
              'pool objects cannot be passed between processes or pickled'
453
 
              )
454
 
 
455
 
    def close(self):
456
 
        debug('closing pool')
457
 
        if self._state == RUN:
458
 
            self._state = CLOSE
459
 
            self._worker_handler._state = CLOSE
460
 
 
461
 
    def terminate(self):
462
 
        debug('terminating pool')
463
 
        self._state = TERMINATE
464
 
        self._worker_handler._state = TERMINATE
465
 
        self._terminate()
466
 
 
467
 
    def join(self):
468
 
        debug('joining pool')
469
 
        assert self._state in (CLOSE, TERMINATE)
470
 
        self._worker_handler.join()
471
 
        self._task_handler.join()
472
 
        self._result_handler.join()
473
 
        for p in self._pool:
474
 
            p.join()
475
 
 
476
 
    @staticmethod
477
 
    def _help_stuff_finish(inqueue, task_handler, size):
478
 
        # task_handler may be blocked trying to put items on inqueue
479
 
        debug('removing tasks from inqueue until task handler finished')
480
 
        inqueue._rlock.acquire()
481
 
        while task_handler.is_alive() and inqueue._reader.poll():
482
 
            inqueue._reader.recv()
483
 
            time.sleep(0)
484
 
 
485
 
    @classmethod
486
 
    def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
487
 
                        worker_handler, task_handler, result_handler, cache):
488
 
        # this is guaranteed to only be called once
489
 
        debug('finalizing pool')
490
 
 
491
 
        worker_handler._state = TERMINATE
492
 
        task_handler._state = TERMINATE
493
 
 
494
 
        debug('helping task handler/workers to finish')
495
 
        cls._help_stuff_finish(inqueue, task_handler, len(pool))
496
 
 
497
 
        assert result_handler.is_alive() or len(cache) == 0
498
 
 
499
 
        result_handler._state = TERMINATE
500
 
        outqueue.put(None)                  # sentinel
501
 
 
502
 
        # We must wait for the worker handler to exit before terminating
503
 
        # workers because we don't want workers to be restarted behind our back.
504
 
        debug('joining worker handler')
505
 
        if threading.current_thread() is not worker_handler:
506
 
            worker_handler.join(1e100)
507
 
 
508
 
        # Terminate workers which haven't already finished.
509
 
        if pool and hasattr(pool[0], 'terminate'):
510
 
            debug('terminating workers')
511
 
            for p in pool:
512
 
                if p.exitcode is None:
513
 
                    p.terminate()
514
 
 
515
 
        debug('joining task handler')
516
 
        if threading.current_thread() is not task_handler:
517
 
            task_handler.join(1e100)
518
 
 
519
 
        debug('joining result handler')
520
 
        if threading.current_thread() is not result_handler:
521
 
            result_handler.join(1e100)
522
 
 
523
 
        if pool and hasattr(pool[0], 'terminate'):
524
 
            debug('joining pool workers')
525
 
            for p in pool:
526
 
                if p.is_alive():
527
 
                    # worker has not yet exited
528
 
                    debug('cleaning up worker %d' % p.pid)
529
 
                    p.join()
530
 
 
531
 
#
532
 
# Class whose instances are returned by `Pool.apply_async()`
533
 
#
534
 
 
535
 
class ApplyResult(object):
536
 
 
537
 
    def __init__(self, cache, callback):
538
 
        self._cond = threading.Condition(threading.Lock())
539
 
        self._job = job_counter.next()
540
 
        self._cache = cache
541
 
        self._ready = False
542
 
        self._callback = callback
543
 
        cache[self._job] = self
544
 
 
545
 
    def ready(self):
546
 
        return self._ready
547
 
 
548
 
    def successful(self):
549
 
        assert self._ready
550
 
        return self._success
551
 
 
552
 
    def wait(self, timeout=None):
553
 
        self._cond.acquire()
554
 
        try:
555
 
            if not self._ready:
556
 
                self._cond.wait(timeout)
557
 
        finally:
558
 
            self._cond.release()
559
 
 
560
 
    def get(self, timeout=None):
561
 
        self.wait(timeout)
562
 
        if not self._ready:
563
 
            raise TimeoutError
564
 
        if self._success:
565
 
            return self._value
566
 
        else:
567
 
            raise self._value
568
 
 
569
 
    def _set(self, i, obj):
570
 
        self._success, self._value = obj
571
 
        if self._callback and self._success:
572
 
            self._callback(self._value)
573
 
        self._cond.acquire()
574
 
        try:
575
 
            self._ready = True
576
 
            self._cond.notify()
577
 
        finally:
578
 
            self._cond.release()
579
 
        del self._cache[self._job]
580
 
 
581
 
AsyncResult = ApplyResult       # create alias -- see #17805
582
 
 
583
 
#
584
 
# Class whose instances are returned by `Pool.map_async()`
585
 
#
586
 
 
587
 
class MapResult(ApplyResult):
588
 
 
589
 
    def __init__(self, cache, chunksize, length, callback):
590
 
        ApplyResult.__init__(self, cache, callback)
591
 
        self._success = True
592
 
        self._value = [None] * length
593
 
        self._chunksize = chunksize
594
 
        if chunksize <= 0:
595
 
            self._number_left = 0
596
 
            self._ready = True
597
 
            del cache[self._job]
598
 
        else:
599
 
            self._number_left = length//chunksize + bool(length % chunksize)
600
 
 
601
 
    def _set(self, i, success_result):
602
 
        success, result = success_result
603
 
        if success:
604
 
            self._value[i*self._chunksize:(i+1)*self._chunksize] = result
605
 
            self._number_left -= 1
606
 
            if self._number_left == 0:
607
 
                if self._callback:
608
 
                    self._callback(self._value)
609
 
                del self._cache[self._job]
610
 
                self._cond.acquire()
611
 
                try:
612
 
                    self._ready = True
613
 
                    self._cond.notify()
614
 
                finally:
615
 
                    self._cond.release()
616
 
 
617
 
        else:
618
 
            self._success = False
619
 
            self._value = result
620
 
            del self._cache[self._job]
621
 
            self._cond.acquire()
622
 
            try:
623
 
                self._ready = True
624
 
                self._cond.notify()
625
 
            finally:
626
 
                self._cond.release()
627
 
 
628
 
#
629
 
# Class whose instances are returned by `Pool.imap()`
630
 
#
631
 
 
632
 
class IMapIterator(object):
633
 
 
634
 
    def __init__(self, cache):
635
 
        self._cond = threading.Condition(threading.Lock())
636
 
        self._job = job_counter.next()
637
 
        self._cache = cache
638
 
        self._items = collections.deque()
639
 
        self._index = 0
640
 
        self._length = None
641
 
        self._unsorted = {}
642
 
        cache[self._job] = self
643
 
 
644
 
    def __iter__(self):
645
 
        return self
646
 
 
647
 
    def next(self, timeout=None):
648
 
        self._cond.acquire()
649
 
        try:
650
 
            try:
651
 
                item = self._items.popleft()
652
 
            except IndexError:
653
 
                if self._index == self._length:
654
 
                    raise StopIteration
655
 
                self._cond.wait(timeout)
656
 
                try:
657
 
                    item = self._items.popleft()
658
 
                except IndexError:
659
 
                    if self._index == self._length:
660
 
                        raise StopIteration
661
 
                    raise TimeoutError
662
 
        finally:
663
 
            self._cond.release()
664
 
 
665
 
        success, value = item
666
 
        if success:
667
 
            return value
668
 
        raise value
669
 
 
670
 
    __next__ = next                    # XXX
671
 
 
672
 
    def _set(self, i, obj):
673
 
        self._cond.acquire()
674
 
        try:
675
 
            if self._index == i:
676
 
                self._items.append(obj)
677
 
                self._index += 1
678
 
                while self._index in self._unsorted:
679
 
                    obj = self._unsorted.pop(self._index)
680
 
                    self._items.append(obj)
681
 
                    self._index += 1
682
 
                self._cond.notify()
683
 
            else:
684
 
                self._unsorted[i] = obj
685
 
 
686
 
            if self._index == self._length:
687
 
                del self._cache[self._job]
688
 
        finally:
689
 
            self._cond.release()
690
 
 
691
 
    def _set_length(self, length):
692
 
        self._cond.acquire()
693
 
        try:
694
 
            self._length = length
695
 
            if self._index == self._length:
696
 
                self._cond.notify()
697
 
                del self._cache[self._job]
698
 
        finally:
699
 
            self._cond.release()
700
 
 
701
 
#
702
 
# Class whose instances are returned by `Pool.imap_unordered()`
703
 
#
704
 
 
705
 
class IMapUnorderedIterator(IMapIterator):
706
 
 
707
 
    def _set(self, i, obj):
708
 
        self._cond.acquire()
709
 
        try:
710
 
            self._items.append(obj)
711
 
            self._index += 1
712
 
            self._cond.notify()
713
 
            if self._index == self._length:
714
 
                del self._cache[self._job]
715
 
        finally:
716
 
            self._cond.release()
717
 
 
718
 
#
719
 
#
720
 
#
721
 
 
722
 
class ThreadPool(Pool):
723
 
 
724
 
    from .dummy import Process
725
 
 
726
 
    def __init__(self, processes=None, initializer=None, initargs=()):
727
 
        Pool.__init__(self, processes, initializer, initargs)
728
 
 
729
 
    def _setup_queues(self):
730
 
        self._inqueue = Queue.Queue()
731
 
        self._outqueue = Queue.Queue()
732
 
        self._quick_put = self._inqueue.put
733
 
        self._quick_get = self._outqueue.get
734
 
 
735
 
    @staticmethod
736
 
    def _help_stuff_finish(inqueue, task_handler, size):
737
 
        # put sentinels at head of inqueue to make workers finish
738
 
        inqueue.not_empty.acquire()
739
 
        try:
740
 
            inqueue.queue.clear()
741
 
            inqueue.queue.extend([None] * size)
742
 
            inqueue.not_empty.notify_all()
743
 
        finally:
744
 
            inqueue.not_empty.release()
 
1
#
 
2
# Module providing the `Pool` class for managing a process pool
 
3
#
 
4
# multiprocessing/pool.py
 
5
#
 
6
# Copyright (c) 2006-2008, R Oudkerk
 
7
# All rights reserved.
 
8
#
 
9
# Redistribution and use in source and binary forms, with or without
 
10
# modification, are permitted provided that the following conditions
 
11
# are met:
 
12
#
 
13
# 1. Redistributions of source code must retain the above copyright
 
14
#    notice, this list of conditions and the following disclaimer.
 
15
# 2. Redistributions in binary form must reproduce the above copyright
 
16
#    notice, this list of conditions and the following disclaimer in the
 
17
#    documentation and/or other materials provided with the distribution.
 
18
# 3. Neither the name of author nor the names of any contributors may be
 
19
#    used to endorse or promote products derived from this software
 
20
#    without specific prior written permission.
 
21
#
 
22
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
 
23
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 
24
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 
25
# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 
26
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 
27
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 
28
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 
29
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
30
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 
31
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 
32
# SUCH DAMAGE.
 
33
#
 
34
 
 
35
__all__ = ['Pool']
 
36
 
 
37
#
 
38
# Imports
 
39
#
 
40
 
 
41
import threading
 
42
import Queue
 
43
import itertools
 
44
import collections
 
45
import time
 
46
 
 
47
from multiprocessing import Process, cpu_count, TimeoutError
 
48
from multiprocessing.util import Finalize, debug
 
49
 
 
50
#
 
51
# Constants representing the state of a pool
 
52
#
 
53
 
 
54
RUN = 0
 
55
CLOSE = 1
 
56
TERMINATE = 2
 
57
 
 
58
#
 
59
# Miscellaneous
 
60
#
 
61
 
 
62
job_counter = itertools.count()
 
63
 
 
64
def mapstar(args):
 
65
    return map(*args)
 
66
 
 
67
#
 
68
# Code run by worker processes
 
69
#
 
70
 
 
71
class MaybeEncodingError(Exception):
 
72
    """Wraps possible unpickleable errors, so they can be
 
73
    safely sent through the socket."""
 
74
 
 
75
    def __init__(self, exc, value):
 
76
        self.exc = repr(exc)
 
77
        self.value = repr(value)
 
78
        super(MaybeEncodingError, self).__init__(self.exc, self.value)
 
79
 
 
80
    def __str__(self):
 
81
        return "Error sending result: '%s'. Reason: '%s'" % (self.value,
 
82
                                                             self.exc)
 
83
 
 
84
    def __repr__(self):
 
85
        return "<MaybeEncodingError: %s>" % str(self)
 
86
 
 
87
 
 
88
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
 
89
    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
 
90
    put = outqueue.put
 
91
    get = inqueue.get
 
92
    if hasattr(inqueue, '_writer'):
 
93
        inqueue._writer.close()
 
94
        outqueue._reader.close()
 
95
 
 
96
    if initializer is not None:
 
97
        initializer(*initargs)
 
98
 
 
99
    completed = 0
 
100
    while maxtasks is None or (maxtasks and completed < maxtasks):
 
101
        try:
 
102
            task = get()
 
103
        except (EOFError, IOError):
 
104
            debug('worker got EOFError or IOError -- exiting')
 
105
            break
 
106
 
 
107
        if task is None:
 
108
            debug('worker got sentinel -- exiting')
 
109
            break
 
110
 
 
111
        job, i, func, args, kwds = task
 
112
        try:
 
113
            result = (True, func(*args, **kwds))
 
114
        except Exception, e:
 
115
            result = (False, e)
 
116
        try:
 
117
            put((job, i, result))
 
118
        except Exception as e:
 
119
            wrapped = MaybeEncodingError(e, result[1])
 
120
            debug("Possible encoding error while sending result: %s" % (
 
121
                wrapped))
 
122
            put((job, i, (False, wrapped)))
 
123
        completed += 1
 
124
    debug('worker exiting after %d tasks' % completed)
 
125
 
 
126
#
 
127
# Class representing a process pool
 
128
#
 
129
 
 
130
class Pool(object):
 
131
    '''
 
132
    Class which supports an async version of the `apply()` builtin
 
133
    '''
 
134
    Process = Process
 
135
 
 
136
    def __init__(self, processes=None, initializer=None, initargs=(),
 
137
                 maxtasksperchild=None):
 
138
        self._setup_queues()
 
139
        self._taskqueue = Queue.Queue()
 
140
        self._cache = {}
 
141
        self._state = RUN
 
142
        self._maxtasksperchild = maxtasksperchild
 
143
        self._initializer = initializer
 
144
        self._initargs = initargs
 
145
 
 
146
        if processes is None:
 
147
            try:
 
148
                processes = cpu_count()
 
149
            except NotImplementedError:
 
150
                processes = 1
 
151
        if processes < 1:
 
152
            raise ValueError("Number of processes must be at least 1")
 
153
 
 
154
        if initializer is not None and not hasattr(initializer, '__call__'):
 
155
            raise TypeError('initializer must be a callable')
 
156
 
 
157
        self._processes = processes
 
158
        self._pool = []
 
159
        self._repopulate_pool()
 
160
 
 
161
        self._worker_handler = threading.Thread(
 
162
            target=Pool._handle_workers,
 
163
            args=(self, )
 
164
            )
 
165
        self._worker_handler.daemon = True
 
166
        self._worker_handler._state = RUN
 
167
        self._worker_handler.start()
 
168
 
 
169
 
 
170
        self._task_handler = threading.Thread(
 
171
            target=Pool._handle_tasks,
 
172
            args=(self._taskqueue, self._quick_put, self._outqueue,
 
173
                  self._pool, self._cache)
 
174
            )
 
175
        self._task_handler.daemon = True
 
176
        self._task_handler._state = RUN
 
177
        self._task_handler.start()
 
178
 
 
179
        self._result_handler = threading.Thread(
 
180
            target=Pool._handle_results,
 
181
            args=(self._outqueue, self._quick_get, self._cache)
 
182
            )
 
183
        self._result_handler.daemon = True
 
184
        self._result_handler._state = RUN
 
185
        self._result_handler.start()
 
186
 
 
187
        self._terminate = Finalize(
 
188
            self, self._terminate_pool,
 
189
            args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
 
190
                  self._worker_handler, self._task_handler,
 
191
                  self._result_handler, self._cache),
 
192
            exitpriority=15
 
193
            )
 
194
 
 
195
    def _join_exited_workers(self):
 
196
        """Cleanup after any worker processes which have exited due to reaching
 
197
        their specified lifetime.  Returns True if any workers were cleaned up.
 
198
        """
 
199
        cleaned = False
 
200
        for i in reversed(range(len(self._pool))):
 
201
            worker = self._pool[i]
 
202
            if worker.exitcode is not None:
 
203
                # worker exited
 
204
                debug('cleaning up worker %d' % i)
 
205
                worker.join()
 
206
                cleaned = True
 
207
                del self._pool[i]
 
208
        return cleaned
 
209
 
 
210
    def _repopulate_pool(self):
 
211
        """Bring the number of pool processes up to the specified number,
 
212
        for use after reaping workers which have exited.
 
213
        """
 
214
        for i in range(self._processes - len(self._pool)):
 
215
            w = self.Process(target=worker,
 
216
                             args=(self._inqueue, self._outqueue,
 
217
                                   self._initializer,
 
218
                                   self._initargs, self._maxtasksperchild)
 
219
                            )
 
220
            self._pool.append(w)
 
221
            w.name = w.name.replace('Process', 'PoolWorker')
 
222
            w.daemon = True
 
223
            w.start()
 
224
            debug('added worker')
 
225
 
 
226
    def _maintain_pool(self):
 
227
        """Clean up any exited workers and start replacements for them.
 
228
        """
 
229
        if self._join_exited_workers():
 
230
            self._repopulate_pool()
 
231
 
 
232
    def _setup_queues(self):
 
233
        from .queues import SimpleQueue
 
234
        self._inqueue = SimpleQueue()
 
235
        self._outqueue = SimpleQueue()
 
236
        self._quick_put = self._inqueue._writer.send
 
237
        self._quick_get = self._outqueue._reader.recv
 
238
 
 
239
    def apply(self, func, args=(), kwds={}):
 
240
        '''
 
241
        Equivalent of `apply()` builtin
 
242
        '''
 
243
        assert self._state == RUN
 
244
        return self.apply_async(func, args, kwds).get()
 
245
 
 
246
    def map(self, func, iterable, chunksize=None):
 
247
        '''
 
248
        Equivalent of `map()` builtin
 
249
        '''
 
250
        assert self._state == RUN
 
251
        return self.map_async(func, iterable, chunksize).get()
 
252
 
 
253
    def imap(self, func, iterable, chunksize=1):
 
254
        '''
 
255
        Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
 
256
        '''
 
257
        assert self._state == RUN
 
258
        if chunksize == 1:
 
259
            result = IMapIterator(self._cache)
 
260
            self._taskqueue.put((((result._job, i, func, (x,), {})
 
261
                         for i, x in enumerate(iterable)), result._set_length))
 
262
            return result
 
263
        else:
 
264
            assert chunksize > 1
 
265
            task_batches = Pool._get_tasks(func, iterable, chunksize)
 
266
            result = IMapIterator(self._cache)
 
267
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
 
268
                     for i, x in enumerate(task_batches)), result._set_length))
 
269
            return (item for chunk in result for item in chunk)
 
270
 
 
271
    def imap_unordered(self, func, iterable, chunksize=1):
 
272
        '''
 
273
        Like `imap()` method but ordering of results is arbitrary
 
274
        '''
 
275
        assert self._state == RUN
 
276
        if chunksize == 1:
 
277
            result = IMapUnorderedIterator(self._cache)
 
278
            self._taskqueue.put((((result._job, i, func, (x,), {})
 
279
                         for i, x in enumerate(iterable)), result._set_length))
 
280
            return result
 
281
        else:
 
282
            assert chunksize > 1
 
283
            task_batches = Pool._get_tasks(func, iterable, chunksize)
 
284
            result = IMapUnorderedIterator(self._cache)
 
285
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
 
286
                     for i, x in enumerate(task_batches)), result._set_length))
 
287
            return (item for chunk in result for item in chunk)
 
288
 
 
289
    def apply_async(self, func, args=(), kwds={}, callback=None):
 
290
        '''
 
291
        Asynchronous equivalent of `apply()` builtin
 
292
        '''
 
293
        assert self._state == RUN
 
294
        result = ApplyResult(self._cache, callback)
 
295
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
 
296
        return result
 
297
 
 
298
    def map_async(self, func, iterable, chunksize=None, callback=None):
 
299
        '''
 
300
        Asynchronous equivalent of `map()` builtin
 
301
        '''
 
302
        assert self._state == RUN
 
303
        if not hasattr(iterable, '__len__'):
 
304
            iterable = list(iterable)
 
305
 
 
306
        if chunksize is None:
 
307
            chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
 
308
            if extra:
 
309
                chunksize += 1
 
310
        if len(iterable) == 0:
 
311
            chunksize = 0
 
312
 
 
313
        task_batches = Pool._get_tasks(func, iterable, chunksize)
 
314
        result = MapResult(self._cache, chunksize, len(iterable), callback)
 
315
        self._taskqueue.put((((result._job, i, mapstar, (x,), {})
 
316
                              for i, x in enumerate(task_batches)), None))
 
317
        return result
 
318
 
 
319
    @staticmethod
 
320
    def _handle_workers(pool):
 
321
        thread = threading.current_thread()
 
322
 
 
323
        # Keep maintaining workers until the cache gets drained, unless the pool
 
324
        # is terminated.
 
325
        while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
 
326
            pool._maintain_pool()
 
327
            time.sleep(0.1)
 
328
        # send sentinel to stop workers
 
329
        pool._taskqueue.put(None)
 
330
        debug('worker handler exiting')
 
331
 
 
332
    @staticmethod
 
333
    def _handle_tasks(taskqueue, put, outqueue, pool, cache):
 
334
        thread = threading.current_thread()
 
335
 
 
336
        for taskseq, set_length in iter(taskqueue.get, None):
 
337
            task = None
 
338
            i = -1
 
339
            try:
 
340
                for i, task in enumerate(taskseq):
 
341
                    if thread._state:
 
342
                        debug('task handler found thread._state != RUN')
 
343
                        break
 
344
                    try:
 
345
                        put(task)
 
346
                    except Exception as e:
 
347
                        job, ind = task[:2]
 
348
                        try:
 
349
                            cache[job]._set(ind, (False, e))
 
350
                        except KeyError:
 
351
                            pass
 
352
                else:
 
353
                    if set_length:
 
354
                        debug('doing set_length()')
 
355
                        set_length(i+1)
 
356
                    continue
 
357
                break
 
358
            except Exception as ex:
 
359
                job, ind = task[:2] if task else (0, 0)
 
360
                if job in cache:
 
361
                    cache[job]._set(ind + 1, (False, ex))
 
362
                if set_length:
 
363
                    debug('doing set_length()')
 
364
                    set_length(i+1)
 
365
        else:
 
366
            debug('task handler got sentinel')
 
367
 
 
368
 
 
369
        try:
 
370
            # tell result handler to finish when cache is empty
 
371
            debug('task handler sending sentinel to result handler')
 
372
            outqueue.put(None)
 
373
 
 
374
            # tell workers there is no more work
 
375
            debug('task handler sending sentinel to workers')
 
376
            for p in pool:
 
377
                put(None)
 
378
        except IOError:
 
379
            debug('task handler got IOError when sending sentinels')
 
380
 
 
381
        debug('task handler exiting')
 
382
 
 
383
    @staticmethod
 
384
    def _handle_results(outqueue, get, cache):
 
385
        thread = threading.current_thread()
 
386
 
 
387
        while 1:
 
388
            try:
 
389
                task = get()
 
390
            except (IOError, EOFError):
 
391
                debug('result handler got EOFError/IOError -- exiting')
 
392
                return
 
393
 
 
394
            if thread._state:
 
395
                assert thread._state == TERMINATE
 
396
                debug('result handler found thread._state=TERMINATE')
 
397
                break
 
398
 
 
399
            if task is None:
 
400
                debug('result handler got sentinel')
 
401
                break
 
402
 
 
403
            job, i, obj = task
 
404
            try:
 
405
                cache[job]._set(i, obj)
 
406
            except KeyError:
 
407
                pass
 
408
 
 
409
        while cache and thread._state != TERMINATE:
 
410
            try:
 
411
                task = get()
 
412
            except (IOError, EOFError):
 
413
                debug('result handler got EOFError/IOError -- exiting')
 
414
                return
 
415
 
 
416
            if task is None:
 
417
                debug('result handler ignoring extra sentinel')
 
418
                continue
 
419
            job, i, obj = task
 
420
            try:
 
421
                cache[job]._set(i, obj)
 
422
            except KeyError:
 
423
                pass
 
424
 
 
425
        if hasattr(outqueue, '_reader'):
 
426
            debug('ensuring that outqueue is not full')
 
427
            # If we don't make room available in outqueue then
 
428
            # attempts to add the sentinel (None) to outqueue may
 
429
            # block.  There is guaranteed to be no more than 2 sentinels.
 
430
            try:
 
431
                for i in range(10):
 
432
                    if not outqueue._reader.poll():
 
433
                        break
 
434
                    get()
 
435
            except (IOError, EOFError):
 
436
                pass
 
437
 
 
438
        debug('result handler exiting: len(cache)=%s, thread._state=%s',
 
439
              len(cache), thread._state)
 
440
 
 
441
    @staticmethod
 
442
    def _get_tasks(func, it, size):
 
443
        it = iter(it)
 
444
        while 1:
 
445
            x = tuple(itertools.islice(it, size))
 
446
            if not x:
 
447
                return
 
448
            yield (func, x)
 
449
 
 
450
    def __reduce__(self):
 
451
        raise NotImplementedError(
 
452
              'pool objects cannot be passed between processes or pickled'
 
453
              )
 
454
 
 
455
    def close(self):
 
456
        debug('closing pool')
 
457
        if self._state == RUN:
 
458
            self._state = CLOSE
 
459
            self._worker_handler._state = CLOSE
 
460
 
 
461
    def terminate(self):
 
462
        debug('terminating pool')
 
463
        self._state = TERMINATE
 
464
        self._worker_handler._state = TERMINATE
 
465
        self._terminate()
 
466
 
 
467
    def join(self):
 
468
        debug('joining pool')
 
469
        assert self._state in (CLOSE, TERMINATE)
 
470
        self._worker_handler.join()
 
471
        self._task_handler.join()
 
472
        self._result_handler.join()
 
473
        for p in self._pool:
 
474
            p.join()
 
475
 
 
476
    @staticmethod
 
477
    def _help_stuff_finish(inqueue, task_handler, size):
 
478
        # task_handler may be blocked trying to put items on inqueue
 
479
        debug('removing tasks from inqueue until task handler finished')
 
480
        inqueue._rlock.acquire()
 
481
        while task_handler.is_alive() and inqueue._reader.poll():
 
482
            inqueue._reader.recv()
 
483
            time.sleep(0)
 
484
 
 
485
    @classmethod
 
486
    def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
 
487
                        worker_handler, task_handler, result_handler, cache):
 
488
        # this is guaranteed to only be called once
 
489
        debug('finalizing pool')
 
490
 
 
491
        worker_handler._state = TERMINATE
 
492
        task_handler._state = TERMINATE
 
493
 
 
494
        debug('helping task handler/workers to finish')
 
495
        cls._help_stuff_finish(inqueue, task_handler, len(pool))
 
496
 
 
497
        assert result_handler.is_alive() or len(cache) == 0
 
498
 
 
499
        result_handler._state = TERMINATE
 
500
        outqueue.put(None)                  # sentinel
 
501
 
 
502
        # We must wait for the worker handler to exit before terminating
 
503
        # workers because we don't want workers to be restarted behind our back.
 
504
        debug('joining worker handler')
 
505
        if threading.current_thread() is not worker_handler:
 
506
            worker_handler.join(1e100)
 
507
 
 
508
        # Terminate workers which haven't already finished.
 
509
        if pool and hasattr(pool[0], 'terminate'):
 
510
            debug('terminating workers')
 
511
            for p in pool:
 
512
                if p.exitcode is None:
 
513
                    p.terminate()
 
514
 
 
515
        debug('joining task handler')
 
516
        if threading.current_thread() is not task_handler:
 
517
            task_handler.join(1e100)
 
518
 
 
519
        debug('joining result handler')
 
520
        if threading.current_thread() is not result_handler:
 
521
            result_handler.join(1e100)
 
522
 
 
523
        if pool and hasattr(pool[0], 'terminate'):
 
524
            debug('joining pool workers')
 
525
            for p in pool:
 
526
                if p.is_alive():
 
527
                    # worker has not yet exited
 
528
                    debug('cleaning up worker %d' % p.pid)
 
529
                    p.join()
 
530
 
 
531
#
 
532
# Class whose instances are returned by `Pool.apply_async()`
 
533
#
 
534
 
 
535
class ApplyResult(object):
 
536
 
 
537
    def __init__(self, cache, callback):
 
538
        self._cond = threading.Condition(threading.Lock())
 
539
        self._job = job_counter.next()
 
540
        self._cache = cache
 
541
        self._ready = False
 
542
        self._callback = callback
 
543
        cache[self._job] = self
 
544
 
 
545
    def ready(self):
 
546
        return self._ready
 
547
 
 
548
    def successful(self):
 
549
        assert self._ready
 
550
        return self._success
 
551
 
 
552
    def wait(self, timeout=None):
 
553
        self._cond.acquire()
 
554
        try:
 
555
            if not self._ready:
 
556
                self._cond.wait(timeout)
 
557
        finally:
 
558
            self._cond.release()
 
559
 
 
560
    def get(self, timeout=None):
 
561
        self.wait(timeout)
 
562
        if not self._ready:
 
563
            raise TimeoutError
 
564
        if self._success:
 
565
            return self._value
 
566
        else:
 
567
            raise self._value
 
568
 
 
569
    def _set(self, i, obj):
 
570
        self._success, self._value = obj
 
571
        if self._callback and self._success:
 
572
            self._callback(self._value)
 
573
        self._cond.acquire()
 
574
        try:
 
575
            self._ready = True
 
576
            self._cond.notify()
 
577
        finally:
 
578
            self._cond.release()
 
579
        del self._cache[self._job]
 
580
 
 
581
AsyncResult = ApplyResult       # create alias -- see #17805
 
582
 
 
583
#
 
584
# Class whose instances are returned by `Pool.map_async()`
 
585
#
 
586
 
 
587
class MapResult(ApplyResult):
 
588
 
 
589
    def __init__(self, cache, chunksize, length, callback):
 
590
        ApplyResult.__init__(self, cache, callback)
 
591
        self._success = True
 
592
        self._value = [None] * length
 
593
        self._chunksize = chunksize
 
594
        if chunksize <= 0:
 
595
            self._number_left = 0
 
596
            self._ready = True
 
597
            del cache[self._job]
 
598
        else:
 
599
            self._number_left = length//chunksize + bool(length % chunksize)
 
600
 
 
601
    def _set(self, i, success_result):
 
602
        success, result = success_result
 
603
        if success:
 
604
            self._value[i*self._chunksize:(i+1)*self._chunksize] = result
 
605
            self._number_left -= 1
 
606
            if self._number_left == 0:
 
607
                if self._callback:
 
608
                    self._callback(self._value)
 
609
                del self._cache[self._job]
 
610
                self._cond.acquire()
 
611
                try:
 
612
                    self._ready = True
 
613
                    self._cond.notify()
 
614
                finally:
 
615
                    self._cond.release()
 
616
 
 
617
        else:
 
618
            self._success = False
 
619
            self._value = result
 
620
            del self._cache[self._job]
 
621
            self._cond.acquire()
 
622
            try:
 
623
                self._ready = True
 
624
                self._cond.notify()
 
625
            finally:
 
626
                self._cond.release()
 
627
 
 
628
#
 
629
# Class whose instances are returned by `Pool.imap()`
 
630
#
 
631
 
 
632
class IMapIterator(object):
 
633
 
 
634
    def __init__(self, cache):
 
635
        self._cond = threading.Condition(threading.Lock())
 
636
        self._job = job_counter.next()
 
637
        self._cache = cache
 
638
        self._items = collections.deque()
 
639
        self._index = 0
 
640
        self._length = None
 
641
        self._unsorted = {}
 
642
        cache[self._job] = self
 
643
 
 
644
    def __iter__(self):
 
645
        return self
 
646
 
 
647
    def next(self, timeout=None):
 
648
        self._cond.acquire()
 
649
        try:
 
650
            try:
 
651
                item = self._items.popleft()
 
652
            except IndexError:
 
653
                if self._index == self._length:
 
654
                    raise StopIteration
 
655
                self._cond.wait(timeout)
 
656
                try:
 
657
                    item = self._items.popleft()
 
658
                except IndexError:
 
659
                    if self._index == self._length:
 
660
                        raise StopIteration
 
661
                    raise TimeoutError
 
662
        finally:
 
663
            self._cond.release()
 
664
 
 
665
        success, value = item
 
666
        if success:
 
667
            return value
 
668
        raise value
 
669
 
 
670
    __next__ = next                    # XXX
 
671
 
 
672
    def _set(self, i, obj):
 
673
        self._cond.acquire()
 
674
        try:
 
675
            if self._index == i:
 
676
                self._items.append(obj)
 
677
                self._index += 1
 
678
                while self._index in self._unsorted:
 
679
                    obj = self._unsorted.pop(self._index)
 
680
                    self._items.append(obj)
 
681
                    self._index += 1
 
682
                self._cond.notify()
 
683
            else:
 
684
                self._unsorted[i] = obj
 
685
 
 
686
            if self._index == self._length:
 
687
                del self._cache[self._job]
 
688
        finally:
 
689
            self._cond.release()
 
690
 
 
691
    def _set_length(self, length):
 
692
        self._cond.acquire()
 
693
        try:
 
694
            self._length = length
 
695
            if self._index == self._length:
 
696
                self._cond.notify()
 
697
                del self._cache[self._job]
 
698
        finally:
 
699
            self._cond.release()
 
700
 
 
701
#
 
702
# Class whose instances are returned by `Pool.imap_unordered()`
 
703
#
 
704
 
 
705
class IMapUnorderedIterator(IMapIterator):
 
706
 
 
707
    def _set(self, i, obj):
 
708
        self._cond.acquire()
 
709
        try:
 
710
            self._items.append(obj)
 
711
            self._index += 1
 
712
            self._cond.notify()
 
713
            if self._index == self._length:
 
714
                del self._cache[self._job]
 
715
        finally:
 
716
            self._cond.release()
 
717
 
 
718
#
 
719
#
 
720
#
 
721
 
 
722
class ThreadPool(Pool):
 
723
 
 
724
    from .dummy import Process
 
725
 
 
726
    def __init__(self, processes=None, initializer=None, initargs=()):
 
727
        Pool.__init__(self, processes, initializer, initargs)
 
728
 
 
729
    def _setup_queues(self):
 
730
        self._inqueue = Queue.Queue()
 
731
        self._outqueue = Queue.Queue()
 
732
        self._quick_put = self._inqueue.put
 
733
        self._quick_get = self._outqueue.get
 
734
 
 
735
    @staticmethod
 
736
    def _help_stuff_finish(inqueue, task_handler, size):
 
737
        # put sentinels at head of inqueue to make workers finish
 
738
        inqueue.not_empty.acquire()
 
739
        try:
 
740
            inqueue.queue.clear()
 
741
            inqueue.queue.extend([None] * size)
 
742
            inqueue.not_empty.notify_all()
 
743
        finally:
 
744
            inqueue.not_empty.release()