~malept/ubuntu/lucid/python2.6/dev-dependency-fix

« back to all changes in this revision

Viewing changes to Lib/threading.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-02-13 12:51:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090213125100-uufgcb9yeqzujpqw
Tags: upstream-2.6.1
ImportĀ upstreamĀ versionĀ 2.6.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""Thread module emulating a subset of Java's threading model."""
 
2
 
 
3
import sys as _sys
 
4
 
 
5
try:
 
6
    import thread
 
7
except ImportError:
 
8
    del _sys.modules[__name__]
 
9
    raise
 
10
 
 
11
import warnings
 
12
 
 
13
from functools import wraps
 
14
from time import time as _time, sleep as _sleep
 
15
from traceback import format_exc as _format_exc
 
16
from collections import deque
 
17
 
 
18
# Note regarding PEP 8 compliant aliases
 
19
#  This threading model was originally inspired by Java, and inherited
 
20
# the convention of camelCase function and method names from that
 
21
# language. While those names are not in any imminent danger of being
 
22
# deprecated, starting with Python 2.6, the module now provides a
 
23
# PEP 8 compliant alias for any such method name.
 
24
# Using the new PEP 8 compliant names also facilitates substitution
 
25
# with the multiprocessing module, which doesn't provide the old
 
26
# Java inspired names.
 
27
 
 
28
 
 
29
# Rename some stuff so "from threading import *" is safe
 
30
__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
 
31
           'current_thread', 'enumerate', 'Event',
 
32
           'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
 
33
           'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
 
34
 
 
35
_start_new_thread = thread.start_new_thread
 
36
_allocate_lock = thread.allocate_lock
 
37
_get_ident = thread.get_ident
 
38
ThreadError = thread.error
 
39
del thread
 
40
 
 
41
 
 
42
# sys.exc_clear is used to work around the fact that except blocks
 
43
# don't fully clear the exception until 3.0.
 
44
warnings.filterwarnings('ignore', category=DeprecationWarning,
 
45
                        module='threading', message='sys.exc_clear')
 
46
 
 
47
# Debug support (adapted from ihooks.py).
 
48
# All the major classes here derive from _Verbose.  We force that to
 
49
# be a new-style class so that all the major classes here are new-style.
 
50
# This helps debugging (type(instance) is more revealing for instances
 
51
# of new-style classes).
 
52
 
 
53
_VERBOSE = False
 
54
 
 
55
if __debug__:
 
56
 
 
57
    class _Verbose(object):
 
58
 
 
59
        def __init__(self, verbose=None):
 
60
            if verbose is None:
 
61
                verbose = _VERBOSE
 
62
            self.__verbose = verbose
 
63
 
 
64
        def _note(self, format, *args):
 
65
            if self.__verbose:
 
66
                format = format % args
 
67
                format = "%s: %s\n" % (
 
68
                    current_thread().name, format)
 
69
                _sys.stderr.write(format)
 
70
 
 
71
else:
 
72
    # Disable this when using "python -O"
 
73
    class _Verbose(object):
 
74
        def __init__(self, verbose=None):
 
75
            pass
 
76
        def _note(self, *args):
 
77
            pass
 
78
 
 
79
# Support for profile and trace hooks
 
80
 
 
81
_profile_hook = None
 
82
_trace_hook = None
 
83
 
 
84
def setprofile(func):
 
85
    global _profile_hook
 
86
    _profile_hook = func
 
87
 
 
88
def settrace(func):
 
89
    global _trace_hook
 
90
    _trace_hook = func
 
91
 
 
92
# Synchronization classes
 
93
 
 
94
Lock = _allocate_lock
 
95
 
 
96
def RLock(*args, **kwargs):
 
97
    return _RLock(*args, **kwargs)
 
98
 
 
99
class _RLock(_Verbose):
 
100
 
 
101
    def __init__(self, verbose=None):
 
102
        _Verbose.__init__(self, verbose)
 
103
        self.__block = _allocate_lock()
 
104
        self.__owner = None
 
105
        self.__count = 0
 
106
 
 
107
    def __repr__(self):
 
108
        owner = self.__owner
 
109
        return "<%s(%s, %d)>" % (
 
110
                self.__class__.__name__,
 
111
                owner and owner.name,
 
112
                self.__count)
 
113
 
 
114
    def acquire(self, blocking=1):
 
115
        me = current_thread()
 
116
        if self.__owner is me:
 
117
            self.__count = self.__count + 1
 
118
            if __debug__:
 
119
                self._note("%s.acquire(%s): recursive success", self, blocking)
 
120
            return 1
 
121
        rc = self.__block.acquire(blocking)
 
122
        if rc:
 
123
            self.__owner = me
 
124
            self.__count = 1
 
125
            if __debug__:
 
126
                self._note("%s.acquire(%s): initial success", self, blocking)
 
127
        else:
 
128
            if __debug__:
 
129
                self._note("%s.acquire(%s): failure", self, blocking)
 
130
        return rc
 
131
 
 
132
    __enter__ = acquire
 
133
 
 
134
    def release(self):
 
135
        if self.__owner is not current_thread():
 
136
            raise RuntimeError("cannot release un-aquired lock")
 
137
        self.__count = count = self.__count - 1
 
138
        if not count:
 
139
            self.__owner = None
 
140
            self.__block.release()
 
141
            if __debug__:
 
142
                self._note("%s.release(): final release", self)
 
143
        else:
 
144
            if __debug__:
 
145
                self._note("%s.release(): non-final release", self)
 
146
 
 
147
    def __exit__(self, t, v, tb):
 
148
        self.release()
 
149
 
 
150
    # Internal methods used by condition variables
 
151
 
 
152
    def _acquire_restore(self, count_owner):
 
153
        count, owner = count_owner
 
154
        self.__block.acquire()
 
155
        self.__count = count
 
156
        self.__owner = owner
 
157
        if __debug__:
 
158
            self._note("%s._acquire_restore()", self)
 
159
 
 
160
    def _release_save(self):
 
161
        if __debug__:
 
162
            self._note("%s._release_save()", self)
 
163
        count = self.__count
 
164
        self.__count = 0
 
165
        owner = self.__owner
 
166
        self.__owner = None
 
167
        self.__block.release()
 
168
        return (count, owner)
 
169
 
 
170
    def _is_owned(self):
 
171
        return self.__owner is current_thread()
 
172
 
 
173
 
 
174
def Condition(*args, **kwargs):
 
175
    return _Condition(*args, **kwargs)
 
176
 
 
177
class _Condition(_Verbose):
 
178
 
 
179
    def __init__(self, lock=None, verbose=None):
 
180
        _Verbose.__init__(self, verbose)
 
181
        if lock is None:
 
182
            lock = RLock()
 
183
        self.__lock = lock
 
184
        # Export the lock's acquire() and release() methods
 
185
        self.acquire = lock.acquire
 
186
        self.release = lock.release
 
187
        # If the lock defines _release_save() and/or _acquire_restore(),
 
188
        # these override the default implementations (which just call
 
189
        # release() and acquire() on the lock).  Ditto for _is_owned().
 
190
        try:
 
191
            self._release_save = lock._release_save
 
192
        except AttributeError:
 
193
            pass
 
194
        try:
 
195
            self._acquire_restore = lock._acquire_restore
 
196
        except AttributeError:
 
197
            pass
 
198
        try:
 
199
            self._is_owned = lock._is_owned
 
200
        except AttributeError:
 
201
            pass
 
202
        self.__waiters = []
 
203
 
 
204
    def __enter__(self):
 
205
        return self.__lock.__enter__()
 
206
 
 
207
    def __exit__(self, *args):
 
208
        return self.__lock.__exit__(*args)
 
209
 
 
210
    def __repr__(self):
 
211
        return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
 
212
 
 
213
    def _release_save(self):
 
214
        self.__lock.release()           # No state to save
 
215
 
 
216
    def _acquire_restore(self, x):
 
217
        self.__lock.acquire()           # Ignore saved state
 
218
 
 
219
    def _is_owned(self):
 
220
        # Return True if lock is owned by current_thread.
 
221
        # This method is called only if __lock doesn't have _is_owned().
 
222
        if self.__lock.acquire(0):
 
223
            self.__lock.release()
 
224
            return False
 
225
        else:
 
226
            return True
 
227
 
 
228
    def wait(self, timeout=None):
 
229
        if not self._is_owned():
 
230
            raise RuntimeError("cannot wait on un-aquired lock")
 
231
        waiter = _allocate_lock()
 
232
        waiter.acquire()
 
233
        self.__waiters.append(waiter)
 
234
        saved_state = self._release_save()
 
235
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
 
236
            if timeout is None:
 
237
                waiter.acquire()
 
238
                if __debug__:
 
239
                    self._note("%s.wait(): got it", self)
 
240
            else:
 
241
                # Balancing act:  We can't afford a pure busy loop, so we
 
242
                # have to sleep; but if we sleep the whole timeout time,
 
243
                # we'll be unresponsive.  The scheme here sleeps very
 
244
                # little at first, longer as time goes on, but never longer
 
245
                # than 20 times per second (or the timeout time remaining).
 
246
                endtime = _time() + timeout
 
247
                delay = 0.0005 # 500 us -> initial delay of 1 ms
 
248
                while True:
 
249
                    gotit = waiter.acquire(0)
 
250
                    if gotit:
 
251
                        break
 
252
                    remaining = endtime - _time()
 
253
                    if remaining <= 0:
 
254
                        break
 
255
                    delay = min(delay * 2, remaining, .05)
 
256
                    _sleep(delay)
 
257
                if not gotit:
 
258
                    if __debug__:
 
259
                        self._note("%s.wait(%s): timed out", self, timeout)
 
260
                    try:
 
261
                        self.__waiters.remove(waiter)
 
262
                    except ValueError:
 
263
                        pass
 
264
                else:
 
265
                    if __debug__:
 
266
                        self._note("%s.wait(%s): got it", self, timeout)
 
267
        finally:
 
268
            self._acquire_restore(saved_state)
 
269
 
 
270
    def notify(self, n=1):
 
271
        if not self._is_owned():
 
272
            raise RuntimeError("cannot notify on un-aquired lock")
 
273
        __waiters = self.__waiters
 
274
        waiters = __waiters[:n]
 
275
        if not waiters:
 
276
            if __debug__:
 
277
                self._note("%s.notify(): no waiters", self)
 
278
            return
 
279
        self._note("%s.notify(): notifying %d waiter%s", self, n,
 
280
                   n!=1 and "s" or "")
 
281
        for waiter in waiters:
 
282
            waiter.release()
 
283
            try:
 
284
                __waiters.remove(waiter)
 
285
            except ValueError:
 
286
                pass
 
287
 
 
288
    def notifyAll(self):
 
289
        self.notify(len(self.__waiters))
 
290
 
 
291
    notify_all = notifyAll
 
292
 
 
293
 
 
294
def Semaphore(*args, **kwargs):
 
295
    return _Semaphore(*args, **kwargs)
 
296
 
 
297
class _Semaphore(_Verbose):
 
298
 
 
299
    # After Tim Peters' semaphore class, but not quite the same (no maximum)
 
300
 
 
301
    def __init__(self, value=1, verbose=None):
 
302
        if value < 0:
 
303
            raise ValueError("semaphore initial value must be >= 0")
 
304
        _Verbose.__init__(self, verbose)
 
305
        self.__cond = Condition(Lock())
 
306
        self.__value = value
 
307
 
 
308
    def acquire(self, blocking=1):
 
309
        rc = False
 
310
        self.__cond.acquire()
 
311
        while self.__value == 0:
 
312
            if not blocking:
 
313
                break
 
314
            if __debug__:
 
315
                self._note("%s.acquire(%s): blocked waiting, value=%s",
 
316
                           self, blocking, self.__value)
 
317
            self.__cond.wait()
 
318
        else:
 
319
            self.__value = self.__value - 1
 
320
            if __debug__:
 
321
                self._note("%s.acquire: success, value=%s",
 
322
                           self, self.__value)
 
323
            rc = True
 
324
        self.__cond.release()
 
325
        return rc
 
326
 
 
327
    __enter__ = acquire
 
328
 
 
329
    def release(self):
 
330
        self.__cond.acquire()
 
331
        self.__value = self.__value + 1
 
332
        if __debug__:
 
333
            self._note("%s.release: success, value=%s",
 
334
                       self, self.__value)
 
335
        self.__cond.notify()
 
336
        self.__cond.release()
 
337
 
 
338
    def __exit__(self, t, v, tb):
 
339
        self.release()
 
340
 
 
341
 
 
342
def BoundedSemaphore(*args, **kwargs):
 
343
    return _BoundedSemaphore(*args, **kwargs)
 
344
 
 
345
class _BoundedSemaphore(_Semaphore):
 
346
    """Semaphore that checks that # releases is <= # acquires"""
 
347
    def __init__(self, value=1, verbose=None):
 
348
        _Semaphore.__init__(self, value, verbose)
 
349
        self._initial_value = value
 
350
 
 
351
    def release(self):
 
352
        if self._Semaphore__value >= self._initial_value:
 
353
            raise ValueError, "Semaphore released too many times"
 
354
        return _Semaphore.release(self)
 
355
 
 
356
 
 
357
def Event(*args, **kwargs):
 
358
    return _Event(*args, **kwargs)
 
359
 
 
360
class _Event(_Verbose):
 
361
 
 
362
    # After Tim Peters' event class (without is_posted())
 
363
 
 
364
    def __init__(self, verbose=None):
 
365
        _Verbose.__init__(self, verbose)
 
366
        self.__cond = Condition(Lock())
 
367
        self.__flag = False
 
368
 
 
369
    def isSet(self):
 
370
        return self.__flag
 
371
 
 
372
    is_set = isSet
 
373
 
 
374
    def set(self):
 
375
        self.__cond.acquire()
 
376
        try:
 
377
            self.__flag = True
 
378
            self.__cond.notify_all()
 
379
        finally:
 
380
            self.__cond.release()
 
381
 
 
382
    def clear(self):
 
383
        self.__cond.acquire()
 
384
        try:
 
385
            self.__flag = False
 
386
        finally:
 
387
            self.__cond.release()
 
388
 
 
389
    def wait(self, timeout=None):
 
390
        self.__cond.acquire()
 
391
        try:
 
392
            if not self.__flag:
 
393
                self.__cond.wait(timeout)
 
394
        finally:
 
395
            self.__cond.release()
 
396
 
 
397
# Helper to generate new thread names
 
398
_counter = 0
 
399
def _newname(template="Thread-%d"):
 
400
    global _counter
 
401
    _counter = _counter + 1
 
402
    return template % _counter
 
403
 
 
404
# Active thread administration
 
405
_active_limbo_lock = _allocate_lock()
 
406
_active = {}    # maps thread id to Thread object
 
407
_limbo = {}
 
408
 
 
409
 
 
410
# Main class for threads
 
411
 
 
412
class Thread(_Verbose):
 
413
 
 
414
    __initialized = False
 
415
    # Need to store a reference to sys.exc_info for printing
 
416
    # out exceptions when a thread tries to use a global var. during interp.
 
417
    # shutdown and thus raises an exception about trying to perform some
 
418
    # operation on/with a NoneType
 
419
    __exc_info = _sys.exc_info
 
420
    # Keep sys.exc_clear too to clear the exception just before
 
421
    # allowing .join() to return.
 
422
    __exc_clear = _sys.exc_clear
 
423
 
 
424
    def __init__(self, group=None, target=None, name=None,
 
425
                 args=(), kwargs=None, verbose=None):
 
426
        assert group is None, "group argument must be None for now"
 
427
        _Verbose.__init__(self, verbose)
 
428
        if kwargs is None:
 
429
            kwargs = {}
 
430
        self.__target = target
 
431
        self.__name = str(name or _newname())
 
432
        self.__args = args
 
433
        self.__kwargs = kwargs
 
434
        self.__daemonic = self._set_daemon()
 
435
        self.__ident = None
 
436
        self.__started = Event()
 
437
        self.__stopped = False
 
438
        self.__block = Condition(Lock())
 
439
        self.__initialized = True
 
440
        # sys.stderr is not stored in the class like
 
441
        # sys.exc_info since it can be changed between instances
 
442
        self.__stderr = _sys.stderr
 
443
 
 
444
    def _set_daemon(self):
 
445
        # Overridden in _MainThread and _DummyThread
 
446
        return current_thread().daemon
 
447
 
 
448
    def __repr__(self):
 
449
        assert self.__initialized, "Thread.__init__() was not called"
 
450
        status = "initial"
 
451
        if self.__started.is_set():
 
452
            status = "started"
 
453
        if self.__stopped:
 
454
            status = "stopped"
 
455
        if self.__daemonic:
 
456
            status += " daemon"
 
457
        if self.__ident is not None:
 
458
            status += " %s" % self.__ident
 
459
        return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
 
460
 
 
461
    def start(self):
 
462
        if not self.__initialized:
 
463
            raise RuntimeError("thread.__init__() not called")
 
464
        if self.__started.is_set():
 
465
            raise RuntimeError("thread already started")
 
466
        if __debug__:
 
467
            self._note("%s.start(): starting thread", self)
 
468
        _active_limbo_lock.acquire()
 
469
        _limbo[self] = self
 
470
        _active_limbo_lock.release()
 
471
        _start_new_thread(self.__bootstrap, ())
 
472
        self.__started.wait()
 
473
 
 
474
    def run(self):
 
475
        try:
 
476
            if self.__target:
 
477
                self.__target(*self.__args, **self.__kwargs)
 
478
        finally:
 
479
            # Avoid a refcycle if the thread is running a function with
 
480
            # an argument that has a member that points to the thread.
 
481
            del self.__target, self.__args, self.__kwargs
 
482
 
 
483
    def __bootstrap(self):
 
484
        # Wrapper around the real bootstrap code that ignores
 
485
        # exceptions during interpreter cleanup.  Those typically
 
486
        # happen when a daemon thread wakes up at an unfortunate
 
487
        # moment, finds the world around it destroyed, and raises some
 
488
        # random exception *** while trying to report the exception in
 
489
        # __bootstrap_inner() below ***.  Those random exceptions
 
490
        # don't help anybody, and they confuse users, so we suppress
 
491
        # them.  We suppress them only when it appears that the world
 
492
        # indeed has already been destroyed, so that exceptions in
 
493
        # __bootstrap_inner() during normal business hours are properly
 
494
        # reported.  Also, we only suppress them for daemonic threads;
 
495
        # if a non-daemonic encounters this, something else is wrong.
 
496
        try:
 
497
            self.__bootstrap_inner()
 
498
        except:
 
499
            if self.__daemonic and _sys is None:
 
500
                return
 
501
            raise
 
502
 
 
503
    def __bootstrap_inner(self):
 
504
        try:
 
505
            self.__ident = _get_ident()
 
506
            self.__started.set()
 
507
            _active_limbo_lock.acquire()
 
508
            _active[self.__ident] = self
 
509
            del _limbo[self]
 
510
            _active_limbo_lock.release()
 
511
            if __debug__:
 
512
                self._note("%s.__bootstrap(): thread started", self)
 
513
 
 
514
            if _trace_hook:
 
515
                self._note("%s.__bootstrap(): registering trace hook", self)
 
516
                _sys.settrace(_trace_hook)
 
517
            if _profile_hook:
 
518
                self._note("%s.__bootstrap(): registering profile hook", self)
 
519
                _sys.setprofile(_profile_hook)
 
520
 
 
521
            try:
 
522
                self.run()
 
523
            except SystemExit:
 
524
                if __debug__:
 
525
                    self._note("%s.__bootstrap(): raised SystemExit", self)
 
526
            except:
 
527
                if __debug__:
 
528
                    self._note("%s.__bootstrap(): unhandled exception", self)
 
529
                # If sys.stderr is no more (most likely from interpreter
 
530
                # shutdown) use self.__stderr.  Otherwise still use sys (as in
 
531
                # _sys) in case sys.stderr was redefined since the creation of
 
532
                # self.
 
533
                if _sys:
 
534
                    _sys.stderr.write("Exception in thread %s:\n%s\n" %
 
535
                                      (self.name, _format_exc()))
 
536
                else:
 
537
                    # Do the best job possible w/o a huge amt. of code to
 
538
                    # approximate a traceback (code ideas from
 
539
                    # Lib/traceback.py)
 
540
                    exc_type, exc_value, exc_tb = self.__exc_info()
 
541
                    try:
 
542
                        print>>self.__stderr, (
 
543
                            "Exception in thread " + self.name +
 
544
                            " (most likely raised during interpreter shutdown):")
 
545
                        print>>self.__stderr, (
 
546
                            "Traceback (most recent call last):")
 
547
                        while exc_tb:
 
548
                            print>>self.__stderr, (
 
549
                                '  File "%s", line %s, in %s' %
 
550
                                (exc_tb.tb_frame.f_code.co_filename,
 
551
                                    exc_tb.tb_lineno,
 
552
                                    exc_tb.tb_frame.f_code.co_name))
 
553
                            exc_tb = exc_tb.tb_next
 
554
                        print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
 
555
                    # Make sure that exc_tb gets deleted since it is a memory
 
556
                    # hog; deleting everything else is just for thoroughness
 
557
                    finally:
 
558
                        del exc_type, exc_value, exc_tb
 
559
            else:
 
560
                if __debug__:
 
561
                    self._note("%s.__bootstrap(): normal return", self)
 
562
            finally:
 
563
                # Prevent a race in
 
564
                # test_threading.test_no_refcycle_through_target when
 
565
                # the exception keeps the target alive past when we
 
566
                # assert that it's dead.
 
567
                self.__exc_clear()
 
568
        finally:
 
569
            with _active_limbo_lock:
 
570
                self.__stop()
 
571
                try:
 
572
                    # We don't call self.__delete() because it also
 
573
                    # grabs _active_limbo_lock.
 
574
                    del _active[_get_ident()]
 
575
                except:
 
576
                    pass
 
577
 
 
578
    def __stop(self):
 
579
        self.__block.acquire()
 
580
        self.__stopped = True
 
581
        self.__block.notify_all()
 
582
        self.__block.release()
 
583
 
 
584
    def __delete(self):
 
585
        "Remove current thread from the dict of currently running threads."
 
586
 
 
587
        # Notes about running with dummy_thread:
 
588
        #
 
589
        # Must take care to not raise an exception if dummy_thread is being
 
590
        # used (and thus this module is being used as an instance of
 
591
        # dummy_threading).  dummy_thread.get_ident() always returns -1 since
 
592
        # there is only one thread if dummy_thread is being used.  Thus
 
593
        # len(_active) is always <= 1 here, and any Thread instance created
 
594
        # overwrites the (if any) thread currently registered in _active.
 
595
        #
 
596
        # An instance of _MainThread is always created by 'threading'.  This
 
597
        # gets overwritten the instant an instance of Thread is created; both
 
598
        # threads return -1 from dummy_thread.get_ident() and thus have the
 
599
        # same key in the dict.  So when the _MainThread instance created by
 
600
        # 'threading' tries to clean itself up when atexit calls this method
 
601
        # it gets a KeyError if another Thread instance was created.
 
602
        #
 
603
        # This all means that KeyError from trying to delete something from
 
604
        # _active if dummy_threading is being used is a red herring.  But
 
605
        # since it isn't if dummy_threading is *not* being used then don't
 
606
        # hide the exception.
 
607
 
 
608
        try:
 
609
            with _active_limbo_lock:
 
610
                del _active[_get_ident()]
 
611
                # There must not be any python code between the previous line
 
612
                # and after the lock is released.  Otherwise a tracing function
 
613
                # could try to acquire the lock again in the same thread, (in
 
614
                # current_thread()), and would block.
 
615
        except KeyError:
 
616
            if 'dummy_threading' not in _sys.modules:
 
617
                raise
 
618
 
 
619
    def join(self, timeout=None):
 
620
        if not self.__initialized:
 
621
            raise RuntimeError("Thread.__init__() not called")
 
622
        if not self.__started.is_set():
 
623
            raise RuntimeError("cannot join thread before it is started")
 
624
        if self is current_thread():
 
625
            raise RuntimeError("cannot join current thread")
 
626
 
 
627
        if __debug__:
 
628
            if not self.__stopped:
 
629
                self._note("%s.join(): waiting until thread stops", self)
 
630
        self.__block.acquire()
 
631
        try:
 
632
            if timeout is None:
 
633
                while not self.__stopped:
 
634
                    self.__block.wait()
 
635
                if __debug__:
 
636
                    self._note("%s.join(): thread stopped", self)
 
637
            else:
 
638
                deadline = _time() + timeout
 
639
                while not self.__stopped:
 
640
                    delay = deadline - _time()
 
641
                    if delay <= 0:
 
642
                        if __debug__:
 
643
                            self._note("%s.join(): timed out", self)
 
644
                        break
 
645
                    self.__block.wait(delay)
 
646
                else:
 
647
                    if __debug__:
 
648
                        self._note("%s.join(): thread stopped", self)
 
649
        finally:
 
650
            self.__block.release()
 
651
 
 
652
    @property
 
653
    def name(self):
 
654
        assert self.__initialized, "Thread.__init__() not called"
 
655
        return self.__name
 
656
 
 
657
    @name.setter
 
658
    def name(self, name):
 
659
        assert self.__initialized, "Thread.__init__() not called"
 
660
        self.__name = str(name)
 
661
 
 
662
    @property
 
663
    def ident(self):
 
664
        assert self.__initialized, "Thread.__init__() not called"
 
665
        return self.__ident
 
666
 
 
667
    def isAlive(self):
 
668
        assert self.__initialized, "Thread.__init__() not called"
 
669
        return self.__started.is_set() and not self.__stopped
 
670
 
 
671
    is_alive = isAlive
 
672
 
 
673
    @property
 
674
    def daemon(self):
 
675
        assert self.__initialized, "Thread.__init__() not called"
 
676
        return self.__daemonic
 
677
 
 
678
    @daemon.setter
 
679
    def daemon(self, daemonic):
 
680
        if not self.__initialized:
 
681
            raise RuntimeError("Thread.__init__() not called")
 
682
        if self.__started.is_set():
 
683
            raise RuntimeError("cannot set daemon status of active thread");
 
684
        self.__daemonic = daemonic
 
685
 
 
686
    def isDaemon(self):
 
687
        return self.daemon
 
688
 
 
689
    def setDaemon(self, daemonic):
 
690
        self.daemon = daemonic
 
691
 
 
692
    def getName(self):
 
693
        return self.name
 
694
 
 
695
    def setName(self, name):
 
696
        self.name = name
 
697
 
 
698
# The timer class was contributed by Itamar Shtull-Trauring
 
699
 
 
700
def Timer(*args, **kwargs):
 
701
    return _Timer(*args, **kwargs)
 
702
 
 
703
class _Timer(Thread):
 
704
    """Call a function after a specified number of seconds:
 
705
 
 
706
    t = Timer(30.0, f, args=[], kwargs={})
 
707
    t.start()
 
708
    t.cancel() # stop the timer's action if it's still waiting
 
709
    """
 
710
 
 
711
    def __init__(self, interval, function, args=[], kwargs={}):
 
712
        Thread.__init__(self)
 
713
        self.interval = interval
 
714
        self.function = function
 
715
        self.args = args
 
716
        self.kwargs = kwargs
 
717
        self.finished = Event()
 
718
 
 
719
    def cancel(self):
 
720
        """Stop the timer if it hasn't finished yet"""
 
721
        self.finished.set()
 
722
 
 
723
    def run(self):
 
724
        self.finished.wait(self.interval)
 
725
        if not self.finished.is_set():
 
726
            self.function(*self.args, **self.kwargs)
 
727
        self.finished.set()
 
728
 
 
729
# Special thread class to represent the main thread
 
730
# This is garbage collected through an exit handler
 
731
 
 
732
class _MainThread(Thread):
 
733
 
 
734
    def __init__(self):
 
735
        Thread.__init__(self, name="MainThread")
 
736
        self._Thread__started.set()
 
737
        _active_limbo_lock.acquire()
 
738
        _active[_get_ident()] = self
 
739
        _active_limbo_lock.release()
 
740
 
 
741
    def _set_daemon(self):
 
742
        return False
 
743
 
 
744
    def _exitfunc(self):
 
745
        self._Thread__stop()
 
746
        t = _pickSomeNonDaemonThread()
 
747
        if t:
 
748
            if __debug__:
 
749
                self._note("%s: waiting for other threads", self)
 
750
        while t:
 
751
            t.join()
 
752
            t = _pickSomeNonDaemonThread()
 
753
        if __debug__:
 
754
            self._note("%s: exiting", self)
 
755
        self._Thread__delete()
 
756
 
 
757
def _pickSomeNonDaemonThread():
 
758
    for t in enumerate():
 
759
        if not t.daemon and t.is_alive():
 
760
            return t
 
761
    return None
 
762
 
 
763
 
 
764
# Dummy thread class to represent threads not started here.
 
765
# These aren't garbage collected when they die, nor can they be waited for.
 
766
# If they invoke anything in threading.py that calls current_thread(), they
 
767
# leave an entry in the _active dict forever after.
 
768
# Their purpose is to return *something* from current_thread().
 
769
# They are marked as daemon threads so we won't wait for them
 
770
# when we exit (conform previous semantics).
 
771
 
 
772
class _DummyThread(Thread):
 
773
 
 
774
    def __init__(self):
 
775
        Thread.__init__(self, name=_newname("Dummy-%d"))
 
776
 
 
777
        # Thread.__block consumes an OS-level locking primitive, which
 
778
        # can never be used by a _DummyThread.  Since a _DummyThread
 
779
        # instance is immortal, that's bad, so release this resource.
 
780
        del self._Thread__block
 
781
 
 
782
        self._Thread__started.set()
 
783
        _active_limbo_lock.acquire()
 
784
        _active[_get_ident()] = self
 
785
        _active_limbo_lock.release()
 
786
 
 
787
    def _set_daemon(self):
 
788
        return True
 
789
 
 
790
    def join(self, timeout=None):
 
791
        assert False, "cannot join a dummy thread"
 
792
 
 
793
 
 
794
# Global API functions
 
795
 
 
796
def currentThread():
 
797
    try:
 
798
        return _active[_get_ident()]
 
799
    except KeyError:
 
800
        ##print "current_thread(): no current thread for", _get_ident()
 
801
        return _DummyThread()
 
802
 
 
803
current_thread = currentThread
 
804
 
 
805
def activeCount():
 
806
    _active_limbo_lock.acquire()
 
807
    count = len(_active) + len(_limbo)
 
808
    _active_limbo_lock.release()
 
809
    return count
 
810
 
 
811
active_count = activeCount
 
812
 
 
813
def enumerate():
 
814
    _active_limbo_lock.acquire()
 
815
    active = _active.values() + _limbo.values()
 
816
    _active_limbo_lock.release()
 
817
    return active
 
818
 
 
819
from thread import stack_size
 
820
 
 
821
# Create the main thread object,
 
822
# and make it available for the interpreter
 
823
# (Py_Main) as threading._shutdown.
 
824
 
 
825
_shutdown = _MainThread()._exitfunc
 
826
 
 
827
# get thread-local implementation, either from the thread
 
828
# module, or from the python fallback
 
829
 
 
830
try:
 
831
    from thread import _local as local
 
832
except ImportError:
 
833
    from _threading_local import local
 
834
 
 
835
 
 
836
def _after_fork():
 
837
    # This function is called by Python/ceval.c:PyEval_ReInitThreads which
 
838
    # is called from PyOS_AfterFork.  Here we cleanup threading module state
 
839
    # that should not exist after a fork.
 
840
 
 
841
    # Reset _active_limbo_lock, in case we forked while the lock was held
 
842
    # by another (non-forked) thread.  http://bugs.python.org/issue874900
 
843
    global _active_limbo_lock
 
844
    _active_limbo_lock = _allocate_lock()
 
845
 
 
846
    # fork() only copied the current thread; clear references to others.
 
847
    new_active = {}
 
848
    current = current_thread()
 
849
    with _active_limbo_lock:
 
850
        for thread in _active.itervalues():
 
851
            if thread is current:
 
852
                # There is only one active thread. We reset the ident to
 
853
                # its new value since it can have changed.
 
854
                ident = _get_ident()
 
855
                thread._Thread__ident = ident
 
856
                new_active[ident] = thread
 
857
            else:
 
858
                # All the others are already stopped.
 
859
                # We don't call _Thread__stop() because it tries to acquire
 
860
                # thread._Thread__block which could also have been held while
 
861
                # we forked.
 
862
                thread._Thread__stopped = True
 
863
 
 
864
        _limbo.clear()
 
865
        _active.clear()
 
866
        _active.update(new_active)
 
867
        assert len(_active) == 1
 
868
 
 
869
 
 
870
# Self-test code
 
871
 
 
872
def _test():
 
873
 
 
874
    class BoundedQueue(_Verbose):
 
875
 
 
876
        def __init__(self, limit):
 
877
            _Verbose.__init__(self)
 
878
            self.mon = RLock()
 
879
            self.rc = Condition(self.mon)
 
880
            self.wc = Condition(self.mon)
 
881
            self.limit = limit
 
882
            self.queue = deque()
 
883
 
 
884
        def put(self, item):
 
885
            self.mon.acquire()
 
886
            while len(self.queue) >= self.limit:
 
887
                self._note("put(%s): queue full", item)
 
888
                self.wc.wait()
 
889
            self.queue.append(item)
 
890
            self._note("put(%s): appended, length now %d",
 
891
                       item, len(self.queue))
 
892
            self.rc.notify()
 
893
            self.mon.release()
 
894
 
 
895
        def get(self):
 
896
            self.mon.acquire()
 
897
            while not self.queue:
 
898
                self._note("get(): queue empty")
 
899
                self.rc.wait()
 
900
            item = self.queue.popleft()
 
901
            self._note("get(): got %s, %d left", item, len(self.queue))
 
902
            self.wc.notify()
 
903
            self.mon.release()
 
904
            return item
 
905
 
 
906
    class ProducerThread(Thread):
 
907
 
 
908
        def __init__(self, queue, quota):
 
909
            Thread.__init__(self, name="Producer")
 
910
            self.queue = queue
 
911
            self.quota = quota
 
912
 
 
913
        def run(self):
 
914
            from random import random
 
915
            counter = 0
 
916
            while counter < self.quota:
 
917
                counter = counter + 1
 
918
                self.queue.put("%s.%d" % (self.name, counter))
 
919
                _sleep(random() * 0.00001)
 
920
 
 
921
 
 
922
    class ConsumerThread(Thread):
 
923
 
 
924
        def __init__(self, queue, count):
 
925
            Thread.__init__(self, name="Consumer")
 
926
            self.queue = queue
 
927
            self.count = count
 
928
 
 
929
        def run(self):
 
930
            while self.count > 0:
 
931
                item = self.queue.get()
 
932
                print item
 
933
                self.count = self.count - 1
 
934
 
 
935
    NP = 3
 
936
    QL = 4
 
937
    NI = 5
 
938
 
 
939
    Q = BoundedQueue(QL)
 
940
    P = []
 
941
    for i in range(NP):
 
942
        t = ProducerThread(Q, NI)
 
943
        t.name = ("Producer-%d" % (i+1))
 
944
        P.append(t)
 
945
    C = ConsumerThread(Q, NI*NP)
 
946
    for t in P:
 
947
        t.start()
 
948
        _sleep(0.000001)
 
949
    C.start()
 
950
    for t in P:
 
951
        t.join()
 
952
    C.join()
 
953
 
 
954
if __name__ == '__main__':
 
955
    _test()