~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/internet/defer.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2004-06-21 22:01:11 UTC
  • mto: (2.2.3 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20040621220111-vkf909euqnyrp3nr
Tags: upstream-1.3.0
ImportĀ upstreamĀ versionĀ 1.3.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_defer -*-
 
2
#
 
3
# Twisted, the Framework of Your Internet
 
4
# Copyright (C) 2001 Matthew W. Lefkowitz
 
5
#
 
6
# This library is free software; you can redistribute it and/or
 
7
# modify it under the terms of version 2.1 of the GNU Lesser General Public
 
8
# License as published by the Free Software Foundation.
 
9
#
 
10
# This library is distributed in the hope that it will be useful,
 
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
13
# Lesser General Public License for more details.
 
14
#
 
15
# You should have received a copy of the GNU Lesser General Public
 
16
# License along with this library; if not, write to the Free Software
 
17
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
18
 
 
19
"""Support for results that aren't immediately available.
 
20
 
 
21
API Stability: stable
 
22
 
 
23
Maintainer: U{Glyph Lefkowitz<mailto:glyph@twistedmatrix.com>}
 
24
"""
 
25
 
 
26
from __future__ import nested_scopes
 
27
import traceback
 
28
 
 
29
# Twisted imports
 
30
from twisted.python import log, failure
 
31
 
 
32
 
 
33
class AlreadyCalledError(Exception):
 
34
    pass
 
35
 
 
36
class AlreadyArmedError(Exception):
 
37
    pass
 
38
 
 
39
class TimeoutError(Exception):
 
40
    pass
 
41
 
 
42
def logError(err):
 
43
    log.err(err)
 
44
    return err
 
45
 
 
46
def succeed(result):
 
47
    """
 
48
    Return a Deferred that has already had '.callback(result)' called.
 
49
 
 
50
    This is useful when you're writing synchronous code to an
 
51
    asynchronous interface: i.e., some code is calling you expecting a
 
52
    Deferred result, but you don't actually need to do anything
 
53
    asynchronous. Just return defer.succeed(theResult).
 
54
 
 
55
    See L{fail} for a version of this function that uses a failing
 
56
    Deferred rather than a successful one.
 
57
 
 
58
    @param result: The result to give to the Deferred's 'callback'
 
59
           method.
 
60
 
 
61
    @rtype: L{Deferred}
 
62
    """
 
63
    d = Deferred()
 
64
    d.callback(result)
 
65
    return d
 
66
 
 
67
class _nothing: pass
 
68
 
 
69
def fail(result=_nothing):
 
70
    """
 
71
    Return a Deferred that has already had '.errback(result)' called.
 
72
 
 
73
    See L{succeed}'s docstring for rationale.
 
74
 
 
75
    @param result: The same argument that L{Deferred.errback<twisted.internet.defer.Deferred.errback>} takes.
 
76
 
 
77
    @rtype: L{Deferred}
 
78
    """
 
79
    if result is _nothing:
 
80
        result = failure.Failure()
 
81
    d = Deferred()
 
82
    d.errback(result)
 
83
    return d
 
84
 
 
85
def execute(callable, *args, **kw):
 
86
    """Create a deferred from a callable and arguments.
 
87
 
 
88
    Call the given function with the given arguments.  Return a deferred which
 
89
    has been fired with its callback as the result of that invocation or its
 
90
    errback with a Failure for the exception thrown.
 
91
    """
 
92
    try:
 
93
        result = callable(*args, **kw)
 
94
    except:
 
95
        return fail()
 
96
    else:
 
97
        return succeed(result)
 
98
 
 
99
def maybeDeferred(f, *args, **kw):
 
100
    """Invoke a function that may or may not return a deferred.
 
101
 
 
102
    Call the given function with the given arguments.  If the returned
 
103
    object is a C{Deferred}, return it.  If the returned object is a C{Failure},
 
104
    wrap it with C{fail} and return it.  Otherwise, wrap it in C{succeed} and
 
105
    return it.  If an exception is raised, convert it to a C{Failure}, wrap it
 
106
    in C{fail}, and then return it.
 
107
 
 
108
    @type f: Any callable
 
109
    @param f: The callable to invoke
 
110
 
 
111
    @param args: The arguments to pass to C{f}
 
112
    @param kw: The keyword arguments to pass to C{f}
 
113
 
 
114
    @rtype: C{Deferred}
 
115
    @return: The result of the function call, wrapped in a C{Deferred} if
 
116
    necessary.
 
117
 
 
118
    API Stability: Unstable
 
119
    """
 
120
    deferred = None
 
121
    if isinstance(f, Deferred) or f is None:
 
122
        import warnings
 
123
        warnings.warn("First argument to maybeDeferred() should no longer be a Deferred or None.  Just pass the function and the arguments.", DeprecationWarning, stacklevel=2)
 
124
        deferred = f or Deferred()
 
125
        f = args[0]
 
126
        args = args[1:]
 
127
 
 
128
    try:
 
129
        result = f(*args, **kw)
 
130
    except:
 
131
        if deferred is None:
 
132
            return fail(failure.Failure())
 
133
        else:
 
134
            deferred.errback(failure.Failure())
 
135
    else:
 
136
        if isinstance(result, Deferred):
 
137
            if deferred is None:
 
138
                return result
 
139
            else:
 
140
                result.chainDeferred(deferred)
 
141
        elif isinstance(result, failure.Failure):
 
142
            if deferred is None:
 
143
                return fail(result)
 
144
            else:
 
145
                deferred.errback(result)
 
146
        else:
 
147
            if deferred is None:
 
148
                return succeed(result)
 
149
            else:
 
150
                deferred.callback(result)
 
151
    return deferred
 
152
 
 
153
def timeout(deferred):
 
154
    deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
 
155
 
 
156
def passthru(arg):
 
157
    return arg
 
158
 
 
159
class Deferred:
 
160
    """This is a callback which will be put off until later.
 
161
 
 
162
    Why do we want this? Well, in cases where a function in a threaded
 
163
    program would block until it gets a result, for Twisted it should
 
164
    not block. Instead, it should return a Deferred.
 
165
 
 
166
    This can be implemented for protocols that run over the network by
 
167
    writing an asynchronous protocol for twisted.internet. For methods
 
168
    that come from outside packages that are not under our control, we use
 
169
    threads (see for example L{twisted.enterprise.adbapi}).
 
170
 
 
171
    For more information about Deferreds, see doc/howto/defer.html or
 
172
    U{http://www.twistedmatrix.com/documents/howto/defer}
 
173
    """
 
174
 
 
175
    called = 0
 
176
    default = 0
 
177
    paused = 0
 
178
    timeoutCall = None
 
179
    # enable .debug to record creation/first-invoker call stacks, and they
 
180
    # will be added to any AlreadyCalledErrors we raise
 
181
    debug = False
 
182
 
 
183
    def __init__(self):
 
184
        self.callbacks = []
 
185
        if self.debug:
 
186
            self.creator = traceback.format_stack()[:-1]
 
187
 
 
188
    def addCallbacks(self, callback, errback=None,
 
189
                     callbackArgs=None, callbackKeywords=None,
 
190
                     errbackArgs=None, errbackKeywords=None, asDefaults=0):
 
191
        """Add a pair of callbacks (success and error) to this Deferred.
 
192
 
 
193
        These will be executed when the 'master' callback is run.
 
194
        """
 
195
        assert callable(callback)
 
196
        assert errback == None or callable(errback)
 
197
        cbs = ((callback, callbackArgs, callbackKeywords),
 
198
               (errback or (passthru), errbackArgs, errbackKeywords))
 
199
        if self.default:
 
200
            self.callbacks[-1] = cbs
 
201
        else:
 
202
            self.callbacks.append(cbs)
 
203
        self.default = asDefaults
 
204
        if self.called:
 
205
            self._runCallbacks()
 
206
        return self
 
207
 
 
208
    def addCallback(self, callback, *args, **kw):
 
209
        """Convenience method for adding just a callback.
 
210
 
 
211
        See L{addCallbacks}.
 
212
        """
 
213
        return self.addCallbacks(callback, callbackArgs=args,
 
214
                                 callbackKeywords=kw)
 
215
 
 
216
    def addErrback(self, errback, *args, **kw):
 
217
        """Convenience method for adding just an errback.
 
218
 
 
219
        See L{addCallbacks}.
 
220
        """
 
221
        return self.addCallbacks(passthru, errback,
 
222
                                 errbackArgs=args,
 
223
                                 errbackKeywords=kw)
 
224
 
 
225
    def addBoth(self, callback, *args, **kw):
 
226
        """Convenience method for adding a single callable as both a callback
 
227
        and an errback.
 
228
 
 
229
        See L{addCallbacks}.
 
230
        """
 
231
        return self.addCallbacks(callback, callback,
 
232
                                 callbackArgs=args, errbackArgs=args,
 
233
                                 callbackKeywords=kw, errbackKeywords=kw)
 
234
 
 
235
    def chainDeferred(self, d):
 
236
        """Chain another Deferred to this Deferred.
 
237
 
 
238
        This method adds callbacks to this Deferred to call d's callback or
 
239
        errback, as appropriate."""
 
240
        return self.addCallbacks(d.callback, d.errback)
 
241
 
 
242
    def callback(self, result):
 
243
        """Run all success callbacks that have been added to this Deferred.
 
244
 
 
245
        Each callback will have its result passed as the first
 
246
        argument to the next; this way, the callbacks act as a
 
247
        'processing chain'. Also, if the success-callback returns a Failure
 
248
        or raises an Exception, processing will continue on the *error*-
 
249
        callback chain.
 
250
        """
 
251
        assert not isinstance(result, Deferred)
 
252
        self._startRunCallbacks(result)
 
253
 
 
254
 
 
255
    def errback(self, fail=None):
 
256
        """Run all error callbacks that have been added to this Deferred.
 
257
 
 
258
        Each callback will have its result passed as the first
 
259
        argument to the next; this way, the callbacks act as a
 
260
        'processing chain'. Also, if the error-callback returns a non-Failure
 
261
        or doesn't raise an Exception, processing will continue on the
 
262
        *success*-callback chain.
 
263
 
 
264
        If the argument that's passed to me is not a failure.Failure instance,
 
265
        it will be embedded in one. If no argument is passed, a failure.Failure
 
266
        instance will be created based on the current traceback stack.
 
267
 
 
268
        Passing a string as `fail' is deprecated, and will be punished with
 
269
        a warning message.
 
270
        """
 
271
        if not isinstance(fail, failure.Failure):
 
272
            fail = failure.Failure(fail)
 
273
 
 
274
        self._startRunCallbacks(fail)
 
275
 
 
276
 
 
277
    def pause(self):
 
278
        """Stop processing on a Deferred until L{unpause}() is called.
 
279
        """
 
280
        self.paused = self.paused + 1
 
281
 
 
282
 
 
283
    def unpause(self):
 
284
        """Process all callbacks made since L{pause}() was called.
 
285
        """
 
286
        self.paused = self.paused - 1
 
287
        if self.paused:
 
288
            return
 
289
        if self.called:
 
290
            self._runCallbacks()
 
291
 
 
292
    def _continue(self, result):
 
293
        self.result = result
 
294
        self.unpause()
 
295
 
 
296
    def _startRunCallbacks(self, result):
 
297
        if self.called:
 
298
            if not self.debug:
 
299
                raise AlreadyCalledError
 
300
            extra = "\n" + self._debugInfo()
 
301
            raise AlreadyCalledError(extra)
 
302
        if self.debug:
 
303
            self.invoker = traceback.format_stack()[:-2]
 
304
        self.called = True
 
305
        self.result = result
 
306
        if self.timeoutCall:
 
307
            try:
 
308
                self.timeoutCall.cancel()
 
309
            except:
 
310
                pass
 
311
            # Avoid reference cycles, because this object defines __del__
 
312
            del self.timeoutCall
 
313
        self._runCallbacks()
 
314
 
 
315
    def _debugInfo(self):
 
316
        info = ''
 
317
        if hasattr(self, "creator"):
 
318
            info += " C: Deferred was created:\n C:"
 
319
            info += "".join(self.creator).rstrip().replace("\n","\n C:")
 
320
            info += "\n"
 
321
        if hasattr(self, "invoker"):
 
322
            info += " I: First Invoker was:\n I:"
 
323
            info += "".join(self.invoker).rstrip().replace("\n","\n I:")
 
324
            info += "\n"
 
325
        return info
 
326
 
 
327
    def _runCallbacks(self):
 
328
        if not self.paused:
 
329
            cb = self.callbacks
 
330
            self.callbacks = []
 
331
            while cb:
 
332
                item = cb.pop(0)
 
333
                callback, args, kw = item[
 
334
                    isinstance(self.result, failure.Failure)]
 
335
                args = args or ()
 
336
                kw = kw or {}
 
337
                try:
 
338
                    self.result = callback(self.result, *args, **kw)
 
339
                    if isinstance(self.result, Deferred):
 
340
                        self.callbacks = cb
 
341
 
 
342
                        # note: this will cause _runCallbacks to be called
 
343
                        # "recursively" sometimes... this shouldn't cause any
 
344
                        # problems, since all the state has been set back to
 
345
                        # the way it's supposed to be, but it is useful to know
 
346
                        # in case something goes wrong.  deferreds really ought
 
347
                        # not to return themselves from their callbacks.
 
348
                        self.pause()
 
349
                        self.result.addBoth(self._continue)
 
350
                        break
 
351
                except:
 
352
                    self.result = failure.Failure()
 
353
        if isinstance(self.result, failure.Failure):
 
354
            self.result.cleanFailure()
 
355
 
 
356
 
 
357
    def arm(self):
 
358
        """This method is deprecated.
 
359
        """
 
360
        pass
 
361
 
 
362
    def setTimeout(self, seconds, timeoutFunc=timeout, *args, **kw):
 
363
        """Set a timeout function to be triggered if I am not called.
 
364
 
 
365
        @param seconds: How long to wait (from now) before firing the
 
366
        timeoutFunc.
 
367
 
 
368
        @param timeoutFunc: will receive the Deferred and *args, **kw as its
 
369
        arguments.  The default timeoutFunc will call the errback with a
 
370
        L{TimeoutError}.
 
371
 
 
372
        DON'T USE THIS! It's a bad idea! Use a function called by reactor.callLater instead
 
373
        to accomplish the same thing!
 
374
 
 
375
        YOU HAVE BEEN WARNED!
 
376
        """
 
377
 
 
378
        if self.called:
 
379
            return
 
380
        assert not self.timeoutCall, "Don't call setTimeout twice on the same Deferred."
 
381
 
 
382
        from twisted.internet import reactor
 
383
        self.timeoutCall = reactor.callLater(
 
384
            seconds,
 
385
            lambda: self.called or timeoutFunc(self, *args, **kw))
 
386
        return self.timeoutCall
 
387
 
 
388
    armAndErrback = errback
 
389
    armAndCallback = callback
 
390
    armAndChain = chainDeferred
 
391
 
 
392
 
 
393
    def __str__(self):
 
394
        cname = self.__class__.__name__
 
395
        if hasattr(self, 'result'):
 
396
            return "<%s at %s  current result: %r>" % (cname, hex(id(self)),
 
397
                                                       self.result)
 
398
        return "<%s at %s>" % (cname, hex(id(self)))
 
399
    __repr__ = __str__
 
400
 
 
401
 
 
402
    def __del__(self):
 
403
        """Print tracebacks and die.
 
404
 
 
405
        If the *last* (and I do mean *last*) callback leaves me in an error
 
406
        state, print a traceback (if said errback is a Failure).
 
407
        """
 
408
        if (self.called and
 
409
            isinstance(self.result, failure.Failure)):
 
410
            log.msg("Unhandled error in Deferred:", isError=True)
 
411
            if self.debug:
 
412
                log.msg("(debug: " + self._debugInfo() + ")", isError=True)
 
413
            log.err(self.result)
 
414
 
 
415
 
 
416
class DeferredList(Deferred):
 
417
    """I combine a group of deferreds into one callback.
 
418
 
 
419
    I track a list of L{Deferred}s for their callbacks, and make a single
 
420
    callback when they have all completed, a list of (success, result)
 
421
    tuples, 'success' being a boolean.
 
422
 
 
423
    Note that you can still use a L{Deferred} after putting it in a
 
424
    DeferredList.  For example, you can suppress 'Unhandled error in Deferred'
 
425
    messages by adding errbacks to the Deferreds *after* putting them in the
 
426
    DeferredList, as a DeferredList won't swallow the errors.  (Although a more
 
427
    convenient way to do this is simply to set the consumeErrors flag)
 
428
    """
 
429
 
 
430
    fireOnOneCallback = 0
 
431
    fireOnOneErrback = 0
 
432
 
 
433
    def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
 
434
                 consumeErrors=0):
 
435
        """Initialize a DeferredList.
 
436
 
 
437
        @type deferredList:  C{list} of L{Deferred}s
 
438
        @param deferredList: The list of deferreds to track.
 
439
        @param fireOnOneCallback: (keyword param) a flag indicating that
 
440
                             only one callback needs to be fired for me to call
 
441
                             my callback
 
442
        @param fireOnOneErrback: (keyword param) a flag indicating that
 
443
                            only one errback needs to be fired for me to call
 
444
                            my errback
 
445
        @param consumeErrors: (keyword param) a flag indicating that any errors
 
446
                            raised in the original deferreds should be
 
447
                            consumed by this DeferredList.  This is useful to
 
448
                            prevent spurious warnings being logged.
 
449
        """
 
450
        self.resultList = [None] * len(deferredList)
 
451
        Deferred.__init__(self)
 
452
        if len(deferredList) == 0 and not fireOnOneCallback:
 
453
            self.callback(self.resultList)
 
454
 
 
455
        # These flags need to be set *before* attaching callbacks to the
 
456
        # deferreds, because the callbacks use these flags, and will run
 
457
        # synchronously if any of the deferreds are already fired.
 
458
        self.fireOnOneCallback = fireOnOneCallback
 
459
        self.fireOnOneErrback = fireOnOneErrback
 
460
        self.consumeErrors = consumeErrors
 
461
        self.finishedCount = 0
 
462
 
 
463
        index = 0
 
464
        for deferred in deferredList:
 
465
            deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
 
466
                                  callbackArgs=(index,SUCCESS),
 
467
                                  errbackArgs=(index,FAILURE))
 
468
            index = index + 1
 
469
 
 
470
    def addDeferred(self, deferred):
 
471
        """DEPRECATED"""
 
472
        import warnings
 
473
        warnings.warn('DeferredList.addDeferred is deprecated.',
 
474
                      DeprecationWarning, stacklevel=2)
 
475
        self.resultList.append(None)
 
476
        index = len(self.resultList) - 1
 
477
        deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
 
478
                              callbackArgs=(index,SUCCESS),
 
479
                              errbackArgs=(index,FAILURE))
 
480
 
 
481
    def _cbDeferred(self, result, index, succeeded):
 
482
        """(internal) Callback for when one of my deferreds fires.
 
483
        """
 
484
        self.resultList[index] = (succeeded, result)
 
485
 
 
486
        self.finishedCount += 1
 
487
        if not self.called:
 
488
            if succeeded == SUCCESS and self.fireOnOneCallback:
 
489
                self.callback((result, index))
 
490
            elif succeeded == FAILURE and self.fireOnOneErrback:
 
491
                self.errback(failure.Failure((result, index)))
 
492
            elif self.finishedCount == len(self.resultList):
 
493
                self.callback(self.resultList)
 
494
 
 
495
        if succeeded == FAILURE and self.consumeErrors:
 
496
            result = None
 
497
 
 
498
        return result
 
499
 
 
500
 
 
501
def _parseDListResult(l, fireOnOneErrback=0):
 
502
    if __debug__:
 
503
        for success, value in l:
 
504
            assert success
 
505
    return [x[1] for x in l]
 
506
 
 
507
def gatherResults(deferredList, fireOnOneErrback=0):
 
508
    """Returns list with result of given Deferreds.
 
509
 
 
510
    This builds on C{DeferredList} but is useful since you don't
 
511
    need to parse the result for success/failure.
 
512
 
 
513
    @type deferredList:  C{list} of L{Deferred}s
 
514
    """
 
515
    if fireOnOneErrback:
 
516
        raise "This function was previously totally, totally broken.  Please fix your code to behave as documented."
 
517
    d = DeferredList(deferredList, fireOnOneErrback=1)
 
518
    d.addCallback(_parseDListResult)
 
519
    return d
 
520
 
 
521
# Constants for use with DeferredList
 
522
 
 
523
SUCCESS = True
 
524
FAILURE = False
 
525
 
 
526
__all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
 
527
           "AlreadyCalledError", "TimeoutError", "gatherResults",
 
528
           "maybeDeferred",
 
529
          ]