~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/defer.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_defer,twisted.test.test_defgen,twisted.internet.test.test_inlinecb -*-
 
2
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
Support for results that aren't immediately available.
 
7
 
 
8
Maintainer: Glyph Lefkowitz
 
9
"""
 
10
 
 
11
import traceback
 
12
import warnings
 
13
from sys import exc_info
 
14
 
 
15
# Twisted imports
 
16
from twisted.python import log, failure, lockfile
 
17
from twisted.python.util import unsignedID, mergeFunctionMetadata
 
18
 
 
19
 
 
20
 
 
21
class AlreadyCalledError(Exception):
 
22
    pass
 
23
 
 
24
 
 
25
 
 
26
class TimeoutError(Exception):
 
27
    pass
 
28
 
 
29
 
 
30
 
 
31
def logError(err):
 
32
    log.err(err)
 
33
    return err
 
34
 
 
35
 
 
36
 
 
37
def succeed(result):
 
38
    """
 
39
    Return a L{Deferred} that has already had C{.callback(result)} called.
 
40
 
 
41
    This is useful when you're writing synchronous code to an
 
42
    asynchronous interface: i.e., some code is calling you expecting a
 
43
    L{Deferred} result, but you don't actually need to do anything
 
44
    asynchronous. Just return C{defer.succeed(theResult)}.
 
45
 
 
46
    See L{fail} for a version of this function that uses a failing
 
47
    L{Deferred} rather than a successful one.
 
48
 
 
49
    @param result: The result to give to the Deferred's 'callback'
 
50
           method.
 
51
 
 
52
    @rtype: L{Deferred}
 
53
    """
 
54
    d = Deferred()
 
55
    d.callback(result)
 
56
    return d
 
57
 
 
58
 
 
59
 
 
60
def fail(result=None):
 
61
    """
 
62
    Return a L{Deferred} that has already had C{.errback(result)} called.
 
63
 
 
64
    See L{succeed}'s docstring for rationale.
 
65
 
 
66
    @param result: The same argument that L{Deferred.errback} takes.
 
67
 
 
68
    @raise NoCurrentExceptionError: If C{result} is C{None} but there is no
 
69
        current exception state.
 
70
 
 
71
    @rtype: L{Deferred}
 
72
    """
 
73
    d = Deferred()
 
74
    d.errback(result)
 
75
    return d
 
76
 
 
77
 
 
78
 
 
79
def execute(callable, *args, **kw):
 
80
    """
 
81
    Create a L{Deferred} from a callable and arguments.
 
82
 
 
83
    Call the given function with the given arguments.  Return a L{Deferred}
 
84
    which has been fired with its callback as the result of that invocation
 
85
    or its C{errback} with a L{Failure} for the exception thrown.
 
86
    """
 
87
    try:
 
88
        result = callable(*args, **kw)
 
89
    except:
 
90
        return fail()
 
91
    else:
 
92
        return succeed(result)
 
93
 
 
94
 
 
95
 
 
96
def maybeDeferred(f, *args, **kw):
 
97
    """
 
98
    Invoke a function that may or may not return a L{Deferred}.
 
99
 
 
100
    Call the given function with the given arguments.  If the returned
 
101
    object is a L{Deferred}, return it.  If the returned object is a L{Failure},
 
102
    wrap it with L{fail} and return it.  Otherwise, wrap it in L{succeed} and
 
103
    return it.  If an exception is raised, convert it to a L{Failure}, wrap it
 
104
    in L{fail}, and then return it.
 
105
 
 
106
    @type f: Any callable
 
107
    @param f: The callable to invoke
 
108
 
 
109
    @param args: The arguments to pass to C{f}
 
110
    @param kw: The keyword arguments to pass to C{f}
 
111
 
 
112
    @rtype: L{Deferred}
 
113
    @return: The result of the function call, wrapped in a L{Deferred} if
 
114
    necessary.
 
115
    """
 
116
    try:
 
117
        result = f(*args, **kw)
 
118
    except:
 
119
        return fail(failure.Failure())
 
120
 
 
121
    if isinstance(result, Deferred):
 
122
        return result
 
123
    elif isinstance(result, failure.Failure):
 
124
        return fail(result)
 
125
    else:
 
126
        return succeed(result)
 
127
 
 
128
 
 
129
 
 
130
def timeout(deferred):
 
131
    deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
 
132
 
 
133
 
 
134
 
 
135
def passthru(arg):
 
136
    return arg
 
137
 
 
138
 
 
139
 
 
140
def setDebugging(on):
 
141
    """
 
142
    Enable or disable L{Deferred} debugging.
 
143
 
 
144
    When debugging is on, the call stacks from creation and invocation are
 
145
    recorded, and added to any L{AlreadyCalledErrors} we raise.
 
146
    """
 
147
    Deferred.debug=bool(on)
 
148
 
 
149
 
 
150
 
 
151
def getDebugging():
 
152
    """
 
153
    Determine whether L{Deferred} debugging is enabled.
 
154
    """
 
155
    return Deferred.debug
 
156
 
 
157
 
 
158
 
 
159
class Deferred:
 
160
    """
 
161
    This is a callback which will be put off until later.
 
162
 
 
163
    Why do we want this? Well, in cases where a function in a threaded
 
164
    program would block until it gets a result, for Twisted it should
 
165
    not block. Instead, it should return a L{Deferred}.
 
166
 
 
167
    This can be implemented for protocols that run over the network by
 
168
    writing an asynchronous protocol for L{twisted.internet}. For methods
 
169
    that come from outside packages that are not under our control, we use
 
170
    threads (see for example L{twisted.enterprise.adbapi}).
 
171
 
 
172
    For more information about Deferreds, see doc/howto/defer.html or
 
173
    U{http://twistedmatrix.com/projects/core/documentation/howto/defer.html}
 
174
    """
 
175
 
 
176
    called = 0
 
177
    paused = 0
 
178
    timeoutCall = None
 
179
    _debugInfo = None
 
180
 
 
181
    # Are we currently running a user-installed callback?  Meant to prevent
 
182
    # recursive running of callbacks when a reentrant call to add a callback is
 
183
    # used.
 
184
    _runningCallbacks = False
 
185
 
 
186
    # Keep this class attribute for now, for compatibility with code that
 
187
    # sets it directly.
 
188
    debug = False
 
189
 
 
190
 
 
191
    def __init__(self):
 
192
        self.callbacks = []
 
193
        if self.debug:
 
194
            self._debugInfo = DebugInfo()
 
195
            self._debugInfo.creator = traceback.format_stack()[:-1]
 
196
 
 
197
 
 
198
    def addCallbacks(self, callback, errback=None,
 
199
                     callbackArgs=None, callbackKeywords=None,
 
200
                     errbackArgs=None, errbackKeywords=None):
 
201
        """
 
202
        Add a pair of callbacks (success and error) to this L{Deferred}.
 
203
 
 
204
        These will be executed when the 'master' callback is run.
 
205
        """
 
206
        assert callable(callback)
 
207
        assert errback == None or callable(errback)
 
208
        cbs = ((callback, callbackArgs, callbackKeywords),
 
209
               (errback or (passthru), errbackArgs, errbackKeywords))
 
210
        self.callbacks.append(cbs)
 
211
 
 
212
        if self.called:
 
213
            self._runCallbacks()
 
214
        return self
 
215
 
 
216
 
 
217
    def addCallback(self, callback, *args, **kw):
 
218
        """
 
219
        Convenience method for adding just a callback.
 
220
 
 
221
        See L{addCallbacks}.
 
222
        """
 
223
        return self.addCallbacks(callback, callbackArgs=args,
 
224
                                 callbackKeywords=kw)
 
225
 
 
226
 
 
227
    def addErrback(self, errback, *args, **kw):
 
228
        """
 
229
        Convenience method for adding just an errback.
 
230
 
 
231
        See L{addCallbacks}.
 
232
        """
 
233
        return self.addCallbacks(passthru, errback,
 
234
                                 errbackArgs=args,
 
235
                                 errbackKeywords=kw)
 
236
 
 
237
 
 
238
    def addBoth(self, callback, *args, **kw):
 
239
        """
 
240
        Convenience method for adding a single callable as both a callback
 
241
        and an errback.
 
242
 
 
243
        See L{addCallbacks}.
 
244
        """
 
245
        return self.addCallbacks(callback, callback,
 
246
                                 callbackArgs=args, errbackArgs=args,
 
247
                                 callbackKeywords=kw, errbackKeywords=kw)
 
248
 
 
249
 
 
250
    def chainDeferred(self, d):
 
251
        """
 
252
        Chain another L{Deferred} to this L{Deferred}.
 
253
 
 
254
        This method adds callbacks to this L{Deferred} to call C{d}'s callback
 
255
        or errback, as appropriate. It is merely a shorthand way of performing
 
256
        the following::
 
257
 
 
258
            self.addCallbacks(d.callback, d.errback)
 
259
 
 
260
        When you chain a deferred d2 to another deferred d1 with
 
261
        d1.chainDeferred(d2), you are making d2 participate in the callback
 
262
        chain of d1. Thus any event that fires d1 will also fire d2.
 
263
        However, the converse is B{not} true; if d2 is fired d1 will not be
 
264
        affected.
 
265
        """
 
266
        return self.addCallbacks(d.callback, d.errback)
 
267
 
 
268
 
 
269
    def callback(self, result):
 
270
        """
 
271
        Run all success callbacks that have been added to this L{Deferred}.
 
272
 
 
273
        Each callback will have its result passed as the first
 
274
        argument to the next; this way, the callbacks act as a
 
275
        'processing chain'. Also, if the success-callback returns a L{Failure}
 
276
        or raises an L{Exception}, processing will continue on the *error*-
 
277
        callback chain.
 
278
        """
 
279
        assert not isinstance(result, Deferred)
 
280
        self._startRunCallbacks(result)
 
281
 
 
282
 
 
283
    def errback(self, fail=None):
 
284
        """
 
285
        Run all error callbacks that have been added to this L{Deferred}.
 
286
 
 
287
        Each callback will have its result passed as the first
 
288
        argument to the next; this way, the callbacks act as a
 
289
        'processing chain'. Also, if the error-callback returns a non-Failure
 
290
        or doesn't raise an L{Exception}, processing will continue on the
 
291
        *success*-callback chain.
 
292
 
 
293
        If the argument that's passed to me is not a L{failure.Failure} instance,
 
294
        it will be embedded in one. If no argument is passed, a
 
295
        L{failure.Failure} instance will be created based on the current
 
296
        traceback stack.
 
297
 
 
298
        Passing a string as `fail' is deprecated, and will be punished with
 
299
        a warning message.
 
300
 
 
301
        @raise NoCurrentExceptionError: If C{fail} is C{None} but there is
 
302
            no current exception state.
 
303
        """
 
304
        if not isinstance(fail, failure.Failure):
 
305
            fail = failure.Failure(fail)
 
306
 
 
307
        self._startRunCallbacks(fail)
 
308
 
 
309
 
 
310
    def pause(self):
 
311
        """
 
312
        Stop processing on a L{Deferred} until L{unpause}() is called.
 
313
        """
 
314
        self.paused = self.paused + 1
 
315
 
 
316
 
 
317
    def unpause(self):
 
318
        """
 
319
        Process all callbacks made since L{pause}() was called.
 
320
        """
 
321
        self.paused = self.paused - 1
 
322
        if self.paused:
 
323
            return
 
324
        if self.called:
 
325
            self._runCallbacks()
 
326
 
 
327
 
 
328
    def _continue(self, result):
 
329
        self.result = result
 
330
        self.unpause()
 
331
 
 
332
 
 
333
    def _startRunCallbacks(self, result):
 
334
        if self.called:
 
335
            if self.debug:
 
336
                if self._debugInfo is None:
 
337
                    self._debugInfo = DebugInfo()
 
338
                extra = "\n" + self._debugInfo._getDebugTracebacks()
 
339
                raise AlreadyCalledError(extra)
 
340
            raise AlreadyCalledError
 
341
        if self.debug:
 
342
            if self._debugInfo is None:
 
343
                self._debugInfo = DebugInfo()
 
344
            self._debugInfo.invoker = traceback.format_stack()[:-2]
 
345
        self.called = True
 
346
        self.result = result
 
347
        if self.timeoutCall:
 
348
            try:
 
349
                self.timeoutCall.cancel()
 
350
            except:
 
351
                pass
 
352
 
 
353
            del self.timeoutCall
 
354
        self._runCallbacks()
 
355
 
 
356
 
 
357
    def _runCallbacks(self):
 
358
        if self._runningCallbacks:
 
359
            # Don't recursively run callbacks
 
360
            return
 
361
        if not self.paused:
 
362
            while self.callbacks:
 
363
                item = self.callbacks.pop(0)
 
364
                callback, args, kw = item[
 
365
                    isinstance(self.result, failure.Failure)]
 
366
                args = args or ()
 
367
                kw = kw or {}
 
368
                try:
 
369
                    self._runningCallbacks = True
 
370
                    try:
 
371
                        self.result = callback(self.result, *args, **kw)
 
372
                    finally:
 
373
                        self._runningCallbacks = False
 
374
                    if isinstance(self.result, Deferred):
 
375
                        # note: this will cause _runCallbacks to be called
 
376
                        # recursively if self.result already has a result.
 
377
                        # This shouldn't cause any problems, since there is no
 
378
                        # relevant state in this stack frame at this point.
 
379
                        # The recursive call will continue to process
 
380
                        # self.callbacks until it is empty, then return here,
 
381
                        # where there is no more work to be done, so this call
 
382
                        # will return as well.
 
383
                        self.pause()
 
384
                        self.result.addBoth(self._continue)
 
385
                        break
 
386
                except:
 
387
                    self.result = failure.Failure()
 
388
 
 
389
        if isinstance(self.result, failure.Failure):
 
390
            self.result.cleanFailure()
 
391
            if self._debugInfo is None:
 
392
                self._debugInfo = DebugInfo()
 
393
            self._debugInfo.failResult = self.result
 
394
        else:
 
395
            if self._debugInfo is not None:
 
396
                self._debugInfo.failResult = None
 
397
 
 
398
 
 
399
    def setTimeout(self, seconds, timeoutFunc=timeout, *args, **kw):
 
400
        """
 
401
        Set a timeout function to be triggered if I am not called.
 
402
 
 
403
        @param seconds: How long to wait (from now) before firing the
 
404
        C{timeoutFunc}.
 
405
 
 
406
        @param timeoutFunc: will receive the L{Deferred} and *args, **kw as its
 
407
        arguments.  The default C{timeoutFunc} will call the errback with a
 
408
        L{TimeoutError}.
 
409
        """
 
410
        warnings.warn(
 
411
            "Deferred.setTimeout is deprecated.  Look for timeout "
 
412
            "support specific to the API you are using instead.",
 
413
            DeprecationWarning, stacklevel=2)
 
414
 
 
415
        if self.called:
 
416
            return
 
417
        assert not self.timeoutCall, "Don't call setTimeout twice on the same Deferred."
 
418
 
 
419
        from twisted.internet import reactor
 
420
        self.timeoutCall = reactor.callLater(
 
421
            seconds,
 
422
            lambda: self.called or timeoutFunc(self, *args, **kw))
 
423
        return self.timeoutCall
 
424
 
 
425
    def __str__(self):
 
426
        cname = self.__class__.__name__
 
427
        if hasattr(self, 'result'):
 
428
            return "<%s at %s  current result: %r>" % (cname, hex(unsignedID(self)),
 
429
                                                       self.result)
 
430
        return "<%s at %s>" % (cname, hex(unsignedID(self)))
 
431
    __repr__ = __str__
 
432
 
 
433
 
 
434
 
 
435
class DebugInfo:
 
436
    """
 
437
    Deferred debug helper.
 
438
    """
 
439
 
 
440
    failResult = None
 
441
 
 
442
 
 
443
    def _getDebugTracebacks(self):
 
444
        info = ''
 
445
        if hasattr(self, "creator"):
 
446
            info += " C: Deferred was created:\n C:"
 
447
            info += "".join(self.creator).rstrip().replace("\n","\n C:")
 
448
            info += "\n"
 
449
        if hasattr(self, "invoker"):
 
450
            info += " I: First Invoker was:\n I:"
 
451
            info += "".join(self.invoker).rstrip().replace("\n","\n I:")
 
452
            info += "\n"
 
453
        return info
 
454
 
 
455
 
 
456
    def __del__(self):
 
457
        """
 
458
        Print tracebacks and die.
 
459
 
 
460
        If the *last* (and I do mean *last*) callback leaves me in an error
 
461
        state, print a traceback (if said errback is a L{Failure}).
 
462
        """
 
463
        if self.failResult is not None:
 
464
            log.msg("Unhandled error in Deferred:", isError=True)
 
465
            debugInfo = self._getDebugTracebacks()
 
466
            if debugInfo != '':
 
467
                log.msg("(debug: " + debugInfo + ")", isError=True)
 
468
            log.err(self.failResult)
 
469
 
 
470
 
 
471
 
 
472
class FirstError(Exception):
 
473
    """
 
474
    First error to occur in a L{DeferredList} if C{fireOnOneErrback} is set.
 
475
 
 
476
    @ivar subFailure: The L{Failure} that occurred.
 
477
    @type subFailure: L{Failure}
 
478
 
 
479
    @ivar index: The index of the L{Deferred} in the L{DeferredList} where
 
480
        it happened.
 
481
    @type index: C{int}
 
482
    """
 
483
    def __init__(self, failure, index):
 
484
        Exception.__init__(self, failure, index)
 
485
        self.subFailure = failure
 
486
        self.index = index
 
487
 
 
488
 
 
489
    def __repr__(self):
 
490
        """
 
491
        The I{repr} of L{FirstError} instances includes the repr of the
 
492
        wrapped failure's exception and the index of the L{FirstError}.
 
493
        """
 
494
        return 'FirstError[#%d, %r]' % (self.index, self.subFailure.value)
 
495
 
 
496
 
 
497
    def __str__(self):
 
498
        """
 
499
        The I{str} of L{FirstError} instances includes the I{str} of the
 
500
        entire wrapped failure (including its traceback and exception) and
 
501
        the index of the L{FirstError}.
 
502
        """
 
503
        return 'FirstError[#%d, %s]' % (self.index, self.subFailure)
 
504
 
 
505
 
 
506
    def __cmp__(self, other):
 
507
        """
 
508
        Comparison between L{FirstError} and other L{FirstError} instances
 
509
        is defined as the comparison of the index and sub-failure of each
 
510
        instance.  L{FirstError} instances don't compare equal to anything
 
511
        that isn't a L{FirstError} instance.
 
512
 
 
513
        @since: 8.2
 
514
        """
 
515
        if isinstance(other, FirstError):
 
516
            return cmp(
 
517
                (self.index, self.subFailure),
 
518
                (other.index, other.subFailure))
 
519
        return -1
 
520
 
 
521
 
 
522
 
 
523
class DeferredList(Deferred):
 
524
    """
 
525
    I combine a group of deferreds into one callback.
 
526
 
 
527
    I track a list of L{Deferred}s for their callbacks, and make a single
 
528
    callback when they have all completed, a list of (success, result)
 
529
    tuples, 'success' being a boolean.
 
530
 
 
531
    Note that you can still use a L{Deferred} after putting it in a
 
532
    DeferredList.  For example, you can suppress 'Unhandled error in Deferred'
 
533
    messages by adding errbacks to the Deferreds *after* putting them in the
 
534
    DeferredList, as a DeferredList won't swallow the errors.  (Although a more
 
535
    convenient way to do this is simply to set the consumeErrors flag)
 
536
    """
 
537
 
 
538
    fireOnOneCallback = 0
 
539
    fireOnOneErrback = 0
 
540
 
 
541
 
 
542
    def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
 
543
                 consumeErrors=0):
 
544
        """
 
545
        Initialize a DeferredList.
 
546
 
 
547
        @type deferredList:  C{list} of L{Deferred}s
 
548
        @param deferredList: The list of deferreds to track.
 
549
        @param fireOnOneCallback: (keyword param) a flag indicating that
 
550
                             only one callback needs to be fired for me to call
 
551
                             my callback
 
552
        @param fireOnOneErrback: (keyword param) a flag indicating that
 
553
                            only one errback needs to be fired for me to call
 
554
                            my errback
 
555
        @param consumeErrors: (keyword param) a flag indicating that any errors
 
556
                            raised in the original deferreds should be
 
557
                            consumed by this DeferredList.  This is useful to
 
558
                            prevent spurious warnings being logged.
 
559
        """
 
560
        self.resultList = [None] * len(deferredList)
 
561
        Deferred.__init__(self)
 
562
        if len(deferredList) == 0 and not fireOnOneCallback:
 
563
            self.callback(self.resultList)
 
564
 
 
565
        # These flags need to be set *before* attaching callbacks to the
 
566
        # deferreds, because the callbacks use these flags, and will run
 
567
        # synchronously if any of the deferreds are already fired.
 
568
        self.fireOnOneCallback = fireOnOneCallback
 
569
        self.fireOnOneErrback = fireOnOneErrback
 
570
        self.consumeErrors = consumeErrors
 
571
        self.finishedCount = 0
 
572
 
 
573
        index = 0
 
574
        for deferred in deferredList:
 
575
            deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
 
576
                                  callbackArgs=(index,SUCCESS),
 
577
                                  errbackArgs=(index,FAILURE))
 
578
            index = index + 1
 
579
 
 
580
 
 
581
    def _cbDeferred(self, result, index, succeeded):
 
582
        """
 
583
        (internal) Callback for when one of my deferreds fires.
 
584
        """
 
585
        self.resultList[index] = (succeeded, result)
 
586
 
 
587
        self.finishedCount += 1
 
588
        if not self.called:
 
589
            if succeeded == SUCCESS and self.fireOnOneCallback:
 
590
                self.callback((result, index))
 
591
            elif succeeded == FAILURE and self.fireOnOneErrback:
 
592
                self.errback(failure.Failure(FirstError(result, index)))
 
593
            elif self.finishedCount == len(self.resultList):
 
594
                self.callback(self.resultList)
 
595
 
 
596
        if succeeded == FAILURE and self.consumeErrors:
 
597
            result = None
 
598
 
 
599
        return result
 
600
 
 
601
 
 
602
 
 
603
def _parseDListResult(l, fireOnOneErrback=0):
 
604
    if __debug__:
 
605
        for success, value in l:
 
606
            assert success
 
607
    return [x[1] for x in l]
 
608
 
 
609
 
 
610
 
 
611
def gatherResults(deferredList):
 
612
    """
 
613
    Returns list with result of given L{Deferred}s.
 
614
 
 
615
    This builds on L{DeferredList} but is useful since you don't
 
616
    need to parse the result for success/failure.
 
617
 
 
618
    @type deferredList:  C{list} of L{Deferred}s
 
619
    """
 
620
    d = DeferredList(deferredList, fireOnOneErrback=1)
 
621
    d.addCallback(_parseDListResult)
 
622
    return d
 
623
 
 
624
 
 
625
 
 
626
# Constants for use with DeferredList
 
627
 
 
628
SUCCESS = True
 
629
FAILURE = False
 
630
 
 
631
 
 
632
 
 
633
## deferredGenerator
 
634
 
 
635
class waitForDeferred:
 
636
    """
 
637
    See L{deferredGenerator}.
 
638
    """
 
639
 
 
640
    def __init__(self, d):
 
641
        if not isinstance(d, Deferred):
 
642
            raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,))
 
643
        self.d = d
 
644
 
 
645
 
 
646
    def getResult(self):
 
647
        if isinstance(self.result, failure.Failure):
 
648
            self.result.raiseException()
 
649
        return self.result
 
650
 
 
651
 
 
652
 
 
653
def _deferGenerator(g, deferred):
 
654
    """
 
655
    See L{deferredGenerator}.
 
656
    """
 
657
    result = None
 
658
 
 
659
    # This function is complicated by the need to prevent unbounded recursion
 
660
    # arising from repeatedly yielding immediately ready deferreds.  This while
 
661
    # loop and the waiting variable solve that by manually unfolding the
 
662
    # recursion.
 
663
 
 
664
    waiting = [True, # defgen is waiting for result?
 
665
               None] # result
 
666
 
 
667
    while 1:
 
668
        try:
 
669
            result = g.next()
 
670
        except StopIteration:
 
671
            deferred.callback(result)
 
672
            return deferred
 
673
        except:
 
674
            deferred.errback()
 
675
            return deferred
 
676
 
 
677
        # Deferred.callback(Deferred) raises an error; we catch this case
 
678
        # early here and give a nicer error message to the user in case
 
679
        # they yield a Deferred.
 
680
        if isinstance(result, Deferred):
 
681
            return fail(TypeError("Yield waitForDeferred(d), not d!"))
 
682
 
 
683
        if isinstance(result, waitForDeferred):
 
684
            # a waitForDeferred was yielded, get the result.
 
685
            # Pass result in so it don't get changed going around the loop
 
686
            # This isn't a problem for waiting, as it's only reused if
 
687
            # gotResult has already been executed.
 
688
            def gotResult(r, result=result):
 
689
                result.result = r
 
690
                if waiting[0]:
 
691
                    waiting[0] = False
 
692
                    waiting[1] = r
 
693
                else:
 
694
                    _deferGenerator(g, deferred)
 
695
            result.d.addBoth(gotResult)
 
696
            if waiting[0]:
 
697
                # Haven't called back yet, set flag so that we get reinvoked
 
698
                # and return from the loop
 
699
                waiting[0] = False
 
700
                return deferred
 
701
            # Reset waiting to initial values for next loop
 
702
            waiting[0] = True
 
703
            waiting[1] = None
 
704
 
 
705
            result = None
 
706
 
 
707
 
 
708
 
 
709
def deferredGenerator(f):
 
710
    """
 
711
    deferredGenerator and waitForDeferred help you write L{Deferred}-using code
 
712
    that looks like a regular sequential function. If your code has a minimum
 
713
    requirement of Python 2.5, consider the use of L{inlineCallbacks} instead,
 
714
    which can accomplish the same thing in a more concise manner.
 
715
 
 
716
    There are two important functions involved: L{waitForDeferred}, and
 
717
    L{deferredGenerator}.  They are used together, like this::
 
718
 
 
719
        def thingummy():
 
720
            thing = waitForDeferred(makeSomeRequestResultingInDeferred())
 
721
            yield thing
 
722
            thing = thing.getResult()
 
723
            print thing #the result! hoorj!
 
724
        thingummy = deferredGenerator(thingummy)
 
725
 
 
726
    L{waitForDeferred} returns something that you should immediately yield; when
 
727
    your generator is resumed, calling C{thing.getResult()} will either give you
 
728
    the result of the L{Deferred} if it was a success, or raise an exception if it
 
729
    was a failure.  Calling C{getResult} is B{absolutely mandatory}.  If you do
 
730
    not call it, I{your program will not work}.
 
731
 
 
732
    L{deferredGenerator} takes one of these waitForDeferred-using generator
 
733
    functions and converts it into a function that returns a L{Deferred}. The
 
734
    result of the L{Deferred} will be the last value that your generator yielded
 
735
    unless the last value is a L{waitForDeferred} instance, in which case the
 
736
    result will be C{None}.  If the function raises an unhandled exception, the
 
737
    L{Deferred} will errback instead.  Remember that C{return result} won't work;
 
738
    use C{yield result; return} in place of that.
 
739
 
 
740
    Note that not yielding anything from your generator will make the L{Deferred}
 
741
    result in C{None}. Yielding a L{Deferred} from your generator is also an error
 
742
    condition; always yield C{waitForDeferred(d)} instead.
 
743
 
 
744
    The L{Deferred} returned from your deferred generator may also errback if your
 
745
    generator raised an exception.  For example::
 
746
 
 
747
        def thingummy():
 
748
            thing = waitForDeferred(makeSomeRequestResultingInDeferred())
 
749
            yield thing
 
750
            thing = thing.getResult()
 
751
            if thing == 'I love Twisted':
 
752
                # will become the result of the Deferred
 
753
                yield 'TWISTED IS GREAT!'
 
754
                return
 
755
            else:
 
756
                # will trigger an errback
 
757
                raise Exception('DESTROY ALL LIFE')
 
758
        thingummy = deferredGenerator(thingummy)
 
759
 
 
760
    Put succinctly, these functions connect deferred-using code with this 'fake
 
761
    blocking' style in both directions: L{waitForDeferred} converts from a
 
762
    L{Deferred} to the 'blocking' style, and L{deferredGenerator} converts from the
 
763
    'blocking' style to a L{Deferred}.
 
764
    """
 
765
 
 
766
    def unwindGenerator(*args, **kwargs):
 
767
        return _deferGenerator(f(*args, **kwargs), Deferred())
 
768
    return mergeFunctionMetadata(f, unwindGenerator)
 
769
 
 
770
 
 
771
## inlineCallbacks
 
772
 
 
773
# BaseException is only in Py 2.5.
 
774
try:
 
775
    BaseException
 
776
except NameError:
 
777
    BaseException=Exception
 
778
 
 
779
 
 
780
 
 
781
class _DefGen_Return(BaseException):
 
782
    def __init__(self, value):
 
783
        self.value = value
 
784
 
 
785
 
 
786
 
 
787
def returnValue(val):
 
788
    """
 
789
    Return val from a L{inlineCallbacks} generator.
 
790
 
 
791
    Note: this is currently implemented by raising an exception
 
792
    derived from L{BaseException}.  You might want to change any
 
793
    'except:' clauses to an 'except Exception:' clause so as not to
 
794
    catch this exception.
 
795
 
 
796
    Also: while this function currently will work when called from
 
797
    within arbitrary functions called from within the generator, do
 
798
    not rely upon this behavior.
 
799
    """
 
800
    raise _DefGen_Return(val)
 
801
 
 
802
 
 
803
 
 
804
def _inlineCallbacks(result, g, deferred):
 
805
    """
 
806
    See L{inlineCallbacks}.
 
807
    """
 
808
    # This function is complicated by the need to prevent unbounded recursion
 
809
    # arising from repeatedly yielding immediately ready deferreds.  This while
 
810
    # loop and the waiting variable solve that by manually unfolding the
 
811
    # recursion.
 
812
 
 
813
    waiting = [True, # waiting for result?
 
814
               None] # result
 
815
 
 
816
    while 1:
 
817
        try:
 
818
            # Send the last result back as the result of the yield expression.
 
819
            isFailure = isinstance(result, failure.Failure)
 
820
            if isFailure:
 
821
                result = result.throwExceptionIntoGenerator(g)
 
822
            else:
 
823
                result = g.send(result)
 
824
        except StopIteration:
 
825
            # fell off the end, or "return" statement
 
826
            deferred.callback(None)
 
827
            return deferred
 
828
        except _DefGen_Return, e:
 
829
            # returnValue() was called; time to give a result to the original
 
830
            # Deferred.  First though, let's try to identify the potentially
 
831
            # confusing situation which results when returnValue() is
 
832
            # accidentally invoked from a different function, one that wasn't
 
833
            # decorated with @inlineCallbacks.
 
834
 
 
835
            # The traceback starts in this frame (the one for
 
836
            # _inlineCallbacks); the next one down should be the application
 
837
            # code.
 
838
            appCodeTrace = exc_info()[2].tb_next
 
839
            if isFailure:
 
840
                # If we invoked this generator frame by throwing an exception
 
841
                # into it, then throwExceptionIntoGenerator will consume an
 
842
                # additional stack frame itself, so we need to skip that too.
 
843
                appCodeTrace = appCodeTrace.tb_next
 
844
            # Now that we've identified the frame being exited by the
 
845
            # exception, let's figure out if returnValue was called from it
 
846
            # directly.  returnValue itself consumes a stack frame, so the
 
847
            # application code will have a tb_next, but it will *not* have a
 
848
            # second tb_next.
 
849
            if appCodeTrace.tb_next.tb_next:
 
850
                # If returnValue was invoked non-local to the frame which it is
 
851
                # exiting, identify the frame that ultimately invoked
 
852
                # returnValue so that we can warn the user, as this behavior is
 
853
                # confusing.
 
854
                ultimateTrace = appCodeTrace
 
855
                while ultimateTrace.tb_next.tb_next:
 
856
                    ultimateTrace = ultimateTrace.tb_next
 
857
                filename = ultimateTrace.tb_frame.f_code.co_filename
 
858
                lineno = ultimateTrace.tb_lineno
 
859
                warnings.warn_explicit(
 
860
                    "returnValue() in %r causing %r to exit: "
 
861
                    "returnValue should only be invoked by functions decorated "
 
862
                    "with inlineCallbacks" % (
 
863
                        ultimateTrace.tb_frame.f_code.co_name,
 
864
                        appCodeTrace.tb_frame.f_code.co_name),
 
865
                    DeprecationWarning, filename, lineno)
 
866
            deferred.callback(e.value)
 
867
            return deferred
 
868
        except:
 
869
            deferred.errback()
 
870
            return deferred
 
871
 
 
872
        if isinstance(result, Deferred):
 
873
            # a deferred was yielded, get the result.
 
874
            def gotResult(r):
 
875
                if waiting[0]:
 
876
                    waiting[0] = False
 
877
                    waiting[1] = r
 
878
                else:
 
879
                    _inlineCallbacks(r, g, deferred)
 
880
 
 
881
            result.addBoth(gotResult)
 
882
            if waiting[0]:
 
883
                # Haven't called back yet, set flag so that we get reinvoked
 
884
                # and return from the loop
 
885
                waiting[0] = False
 
886
                return deferred
 
887
 
 
888
            result = waiting[1]
 
889
            # Reset waiting to initial values for next loop.  gotResult uses
 
890
            # waiting, but this isn't a problem because gotResult is only
 
891
            # executed once, and if it hasn't been executed yet, the return
 
892
            # branch above would have been taken.
 
893
 
 
894
 
 
895
            waiting[0] = True
 
896
            waiting[1] = None
 
897
 
 
898
 
 
899
    return deferred
 
900
 
 
901
 
 
902
 
 
903
def inlineCallbacks(f):
 
904
    """
 
905
    WARNING: this function will not work in Python 2.4 and earlier!
 
906
 
 
907
    inlineCallbacks helps you write Deferred-using code that looks like a
 
908
    regular sequential function. This function uses features of Python 2.5
 
909
    generators.  If you need to be compatible with Python 2.4 or before, use
 
910
    the L{deferredGenerator} function instead, which accomplishes the same
 
911
    thing, but with somewhat more boilerplate.  For example::
 
912
 
 
913
        def thingummy():
 
914
            thing = yield makeSomeRequestResultingInDeferred()
 
915
            print thing #the result! hoorj!
 
916
        thingummy = inlineCallbacks(thingummy)
 
917
 
 
918
    When you call anything that results in a L{Deferred}, you can simply yield it;
 
919
    your generator will automatically be resumed when the Deferred's result is
 
920
    available. The generator will be sent the result of the L{Deferred} with the
 
921
    'send' method on generators, or if the result was a failure, 'throw'.
 
922
 
 
923
    Your inlineCallbacks-enabled generator will return a L{Deferred} object, which
 
924
    will result in the return value of the generator (or will fail with a
 
925
    failure object if your generator raises an unhandled exception). Note that
 
926
    you can't use C{return result} to return a value; use C{returnValue(result)}
 
927
    instead. Falling off the end of the generator, or simply using C{return}
 
928
    will cause the L{Deferred} to have a result of C{None}.
 
929
 
 
930
    The L{Deferred} returned from your deferred generator may errback if your
 
931
    generator raised an exception::
 
932
 
 
933
        def thingummy():
 
934
            thing = yield makeSomeRequestResultingInDeferred()
 
935
            if thing == 'I love Twisted':
 
936
                # will become the result of the Deferred
 
937
                returnValue('TWISTED IS GREAT!')
 
938
            else:
 
939
                # will trigger an errback
 
940
                raise Exception('DESTROY ALL LIFE')
 
941
        thingummy = inlineCallbacks(thingummy)
 
942
    """
 
943
    def unwindGenerator(*args, **kwargs):
 
944
        return _inlineCallbacks(None, f(*args, **kwargs), Deferred())
 
945
    return mergeFunctionMetadata(f, unwindGenerator)
 
946
 
 
947
 
 
948
## DeferredLock/DeferredQueue
 
949
 
 
950
class _ConcurrencyPrimitive(object):
 
951
    def __init__(self):
 
952
        self.waiting = []
 
953
 
 
954
 
 
955
    def _releaseAndReturn(self, r):
 
956
        self.release()
 
957
        return r
 
958
 
 
959
 
 
960
    def run(*args, **kwargs):
 
961
        """
 
962
        Acquire, run, release.
 
963
 
 
964
        This function takes a callable as its first argument and any
 
965
        number of other positional and keyword arguments.  When the
 
966
        lock or semaphore is acquired, the callable will be invoked
 
967
        with those arguments.
 
968
 
 
969
        The callable may return a L{Deferred}; if it does, the lock or
 
970
        semaphore won't be released until that L{Deferred} fires.
 
971
 
 
972
        @return: L{Deferred} of function result.
 
973
        """
 
974
        if len(args) < 2:
 
975
            if not args:
 
976
                raise TypeError("run() takes at least 2 arguments, none given.")
 
977
            raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
 
978
                args[0].__class__.__name__,))
 
979
        self, f = args[:2]
 
980
        args = args[2:]
 
981
 
 
982
        def execute(ignoredResult):
 
983
            d = maybeDeferred(f, *args, **kwargs)
 
984
            d.addBoth(self._releaseAndReturn)
 
985
            return d
 
986
 
 
987
        d = self.acquire()
 
988
        d.addCallback(execute)
 
989
        return d
 
990
 
 
991
 
 
992
 
 
993
class DeferredLock(_ConcurrencyPrimitive):
 
994
    """
 
995
    A lock for event driven systems.
 
996
 
 
997
    @ivar locked: C{True} when this Lock has been acquired, false at all
 
998
    other times.  Do not change this value, but it is useful to
 
999
    examine for the equivalent of a "non-blocking" acquisition.
 
1000
    """
 
1001
 
 
1002
    locked = 0
 
1003
 
 
1004
 
 
1005
    def acquire(self):
 
1006
        """
 
1007
        Attempt to acquire the lock.  Returns a L{Deferred} that fires on
 
1008
        lock acquisition with the L{DeferredLock} as the value.  If the lock
 
1009
        is locked, then the Deferred is placed at the end of a waiting list.
 
1010
 
 
1011
        @return: a L{Deferred} which fires on lock acquisition.
 
1012
        @rtype: a L{Deferred}
 
1013
        """
 
1014
        d = Deferred()
 
1015
        if self.locked:
 
1016
            self.waiting.append(d)
 
1017
        else:
 
1018
            self.locked = 1
 
1019
            d.callback(self)
 
1020
        return d
 
1021
 
 
1022
 
 
1023
    def release(self):
 
1024
        """
 
1025
        Release the lock.  If there is a waiting list, then the first
 
1026
        L{Deferred} in that waiting list will be called back.
 
1027
 
 
1028
        Should be called by whomever did the L{acquire}() when the shared
 
1029
        resource is free.
 
1030
        """
 
1031
        assert self.locked, "Tried to release an unlocked lock"
 
1032
        self.locked = 0
 
1033
        if self.waiting:
 
1034
            # someone is waiting to acquire lock
 
1035
            self.locked = 1
 
1036
            d = self.waiting.pop(0)
 
1037
            d.callback(self)
 
1038
 
 
1039
 
 
1040
 
 
1041
class DeferredSemaphore(_ConcurrencyPrimitive):
 
1042
    """
 
1043
    A semaphore for event driven systems.
 
1044
    """
 
1045
 
 
1046
 
 
1047
    def __init__(self, tokens):
 
1048
        _ConcurrencyPrimitive.__init__(self)
 
1049
        self.tokens = tokens
 
1050
        self.limit = tokens
 
1051
 
 
1052
 
 
1053
    def acquire(self):
 
1054
        """
 
1055
        Attempt to acquire the token.
 
1056
 
 
1057
        @return: a L{Deferred} which fires on token acquisition.
 
1058
        """
 
1059
        assert self.tokens >= 0, "Internal inconsistency??  tokens should never be negative"
 
1060
        d = Deferred()
 
1061
        if not self.tokens:
 
1062
            self.waiting.append(d)
 
1063
        else:
 
1064
            self.tokens = self.tokens - 1
 
1065
            d.callback(self)
 
1066
        return d
 
1067
 
 
1068
 
 
1069
    def release(self):
 
1070
        """
 
1071
        Release the token.
 
1072
 
 
1073
        Should be called by whoever did the L{acquire}() when the shared
 
1074
        resource is free.
 
1075
        """
 
1076
        assert self.tokens < self.limit, "Someone released me too many times: too many tokens!"
 
1077
        self.tokens = self.tokens + 1
 
1078
        if self.waiting:
 
1079
            # someone is waiting to acquire token
 
1080
            self.tokens = self.tokens - 1
 
1081
            d = self.waiting.pop(0)
 
1082
            d.callback(self)
 
1083
 
 
1084
 
 
1085
 
 
1086
class QueueOverflow(Exception):
 
1087
    pass
 
1088
 
 
1089
 
 
1090
 
 
1091
class QueueUnderflow(Exception):
 
1092
    pass
 
1093
 
 
1094
 
 
1095
 
 
1096
class DeferredQueue(object):
 
1097
    """
 
1098
    An event driven queue.
 
1099
 
 
1100
    Objects may be added as usual to this queue.  When an attempt is
 
1101
    made to retrieve an object when the queue is empty, a L{Deferred} is
 
1102
    returned which will fire when an object becomes available.
 
1103
 
 
1104
    @ivar size: The maximum number of objects to allow into the queue
 
1105
    at a time.  When an attempt to add a new object would exceed this
 
1106
    limit, L{QueueOverflow} is raised synchronously.  C{None} for no limit.
 
1107
 
 
1108
    @ivar backlog: The maximum number of L{Deferred} gets to allow at
 
1109
    one time.  When an attempt is made to get an object which would
 
1110
    exceed this limit, L{QueueUnderflow} is raised synchronously.  C{None}
 
1111
    for no limit.
 
1112
    """
 
1113
 
 
1114
    def __init__(self, size=None, backlog=None):
 
1115
        self.waiting = []
 
1116
        self.pending = []
 
1117
        self.size = size
 
1118
        self.backlog = backlog
 
1119
 
 
1120
 
 
1121
    def put(self, obj):
 
1122
        """
 
1123
        Add an object to this queue.
 
1124
 
 
1125
        @raise QueueOverflow: Too many objects are in this queue.
 
1126
        """
 
1127
        if self.waiting:
 
1128
            self.waiting.pop(0).callback(obj)
 
1129
        elif self.size is None or len(self.pending) < self.size:
 
1130
            self.pending.append(obj)
 
1131
        else:
 
1132
            raise QueueOverflow()
 
1133
 
 
1134
 
 
1135
    def get(self):
 
1136
        """
 
1137
        Attempt to retrieve and remove an object from the queue.
 
1138
 
 
1139
        @return: a L{Deferred} which fires with the next object available in
 
1140
        the queue.
 
1141
 
 
1142
        @raise QueueUnderflow: Too many (more than C{backlog})
 
1143
        L{Deferred}s are already waiting for an object from this queue.
 
1144
        """
 
1145
        if self.pending:
 
1146
            return succeed(self.pending.pop(0))
 
1147
        elif self.backlog is None or len(self.waiting) < self.backlog:
 
1148
            d = Deferred()
 
1149
            self.waiting.append(d)
 
1150
            return d
 
1151
        else:
 
1152
            raise QueueUnderflow()
 
1153
 
 
1154
 
 
1155
 
 
1156
class AlreadyTryingToLockError(Exception):
 
1157
    """
 
1158
    Raised when L{DeferredFilesystemLock.deferUntilLocked} is called twice on a
 
1159
    single L{DeferredFilesystemLock}.
 
1160
    """
 
1161
 
 
1162
 
 
1163
 
 
1164
class DeferredFilesystemLock(lockfile.FilesystemLock):
 
1165
    """
 
1166
    A L{FilesystemLock} that allows for a L{Deferred} to be fired when the lock is
 
1167
    acquired.
 
1168
 
 
1169
    @ivar _scheduler: The object in charge of scheduling retries. In this
 
1170
        implementation this is parameterized for testing.
 
1171
 
 
1172
    @ivar _interval: The retry interval for an L{IReactorTime} based scheduler.
 
1173
 
 
1174
    @ivar _tryLockCall: A L{DelayedCall} based on C{_interval} that will manage
 
1175
        the next retry for aquiring the lock.
 
1176
 
 
1177
    @ivar _timeoutCall: A L{DelayedCall} based on C{deferUntilLocked}'s timeout
 
1178
        argument.  This is in charge of timing out our attempt to acquire the
 
1179
        lock.
 
1180
    """
 
1181
    _interval = 1
 
1182
    _tryLockCall = None
 
1183
    _timeoutCall = None
 
1184
 
 
1185
 
 
1186
    def __init__(self, name, scheduler=None):
 
1187
        """
 
1188
        @param name: The name of the lock to acquire
 
1189
        @param scheduler: An object which provides L{IReactorTime}
 
1190
        """
 
1191
        lockfile.FilesystemLock.__init__(self, name)
 
1192
 
 
1193
        if scheduler is None:
 
1194
            from twisted.internet import reactor
 
1195
            scheduler = reactor
 
1196
 
 
1197
        self._scheduler = scheduler
 
1198
 
 
1199
 
 
1200
    def deferUntilLocked(self, timeout=None):
 
1201
        """
 
1202
        Wait until we acquire this lock.  This method is not safe for
 
1203
        concurrent use.
 
1204
 
 
1205
        @type timeout: C{float} or C{int}
 
1206
        @param timeout: the number of seconds after which to time out if the
 
1207
            lock has not been acquired.
 
1208
 
 
1209
        @return: a L{Deferred} which will callback when the lock is acquired, or
 
1210
            errback with a L{TimeoutError} after timing out or an
 
1211
            L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already
 
1212
            been called and not successfully locked the file.
 
1213
        """
 
1214
        if self._tryLockCall is not None:
 
1215
            return fail(
 
1216
                AlreadyTryingToLockError(
 
1217
                    "deferUntilLocked isn't safe for concurrent use."))
 
1218
 
 
1219
        d = Deferred()
 
1220
 
 
1221
        def _cancelLock():
 
1222
            self._tryLockCall.cancel()
 
1223
            self._tryLockCall = None
 
1224
            self._timeoutCall = None
 
1225
 
 
1226
            if self.lock():
 
1227
                d.callback(None)
 
1228
            else:
 
1229
                d.errback(failure.Failure(
 
1230
                        TimeoutError("Timed out aquiring lock: %s after %fs" % (
 
1231
                                self.name,
 
1232
                                timeout))))
 
1233
 
 
1234
        def _tryLock():
 
1235
            if self.lock():
 
1236
                if self._timeoutCall is not None:
 
1237
                    self._timeoutCall.cancel()
 
1238
                    self._timeoutCall = None
 
1239
 
 
1240
                self._tryLockCall = None
 
1241
 
 
1242
                d.callback(None)
 
1243
            else:
 
1244
                if timeout is not None and self._timeoutCall is None:
 
1245
                    self._timeoutCall = self._scheduler.callLater(
 
1246
                        timeout, _cancelLock)
 
1247
 
 
1248
                self._tryLockCall = self._scheduler.callLater(
 
1249
                    self._interval, _tryLock)
 
1250
 
 
1251
        _tryLock()
 
1252
 
 
1253
        return d
 
1254
 
 
1255
 
 
1256
 
 
1257
__all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
 
1258
           "AlreadyCalledError", "TimeoutError", "gatherResults",
 
1259
           "maybeDeferred",
 
1260
           "waitForDeferred", "deferredGenerator", "inlineCallbacks",
 
1261
           "returnValue",
 
1262
           "DeferredLock", "DeferredSemaphore", "DeferredQueue",
 
1263
           "DeferredFilesystemLock", "AlreadyTryingToLockError",
 
1264
          ]