1
# -*- test-case-name: twisted.test.test_defer,twisted.test.test_defgen,twisted.internet.test.test_inlinecb -*-
2
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Support for results that aren't immediately available.
8
Maintainer: Glyph Lefkowitz
13
from sys import exc_info
16
from twisted.python import log, failure, lockfile
17
from twisted.python.util import unsignedID, mergeFunctionMetadata
21
class AlreadyCalledError(Exception):
26
class TimeoutError(Exception):
39
Return a L{Deferred} that has already had C{.callback(result)} called.
41
This is useful when you're writing synchronous code to an
42
asynchronous interface: i.e., some code is calling you expecting a
43
L{Deferred} result, but you don't actually need to do anything
44
asynchronous. Just return C{defer.succeed(theResult)}.
46
See L{fail} for a version of this function that uses a failing
47
L{Deferred} rather than a successful one.
49
@param result: The result to give to the Deferred's 'callback'
60
def fail(result=None):
62
Return a L{Deferred} that has already had C{.errback(result)} called.
64
See L{succeed}'s docstring for rationale.
66
@param result: The same argument that L{Deferred.errback} takes.
68
@raise NoCurrentExceptionError: If C{result} is C{None} but there is no
69
current exception state.
79
def execute(callable, *args, **kw):
81
Create a L{Deferred} from a callable and arguments.
83
Call the given function with the given arguments. Return a L{Deferred}
84
which has been fired with its callback as the result of that invocation
85
or its C{errback} with a L{Failure} for the exception thrown.
88
result = callable(*args, **kw)
92
return succeed(result)
96
def maybeDeferred(f, *args, **kw):
98
Invoke a function that may or may not return a L{Deferred}.
100
Call the given function with the given arguments. If the returned
101
object is a L{Deferred}, return it. If the returned object is a L{Failure},
102
wrap it with L{fail} and return it. Otherwise, wrap it in L{succeed} and
103
return it. If an exception is raised, convert it to a L{Failure}, wrap it
104
in L{fail}, and then return it.
106
@type f: Any callable
107
@param f: The callable to invoke
109
@param args: The arguments to pass to C{f}
110
@param kw: The keyword arguments to pass to C{f}
113
@return: The result of the function call, wrapped in a L{Deferred} if
117
result = f(*args, **kw)
119
return fail(failure.Failure())
121
if isinstance(result, Deferred):
123
elif isinstance(result, failure.Failure):
126
return succeed(result)
130
def timeout(deferred):
131
deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
140
def setDebugging(on):
142
Enable or disable L{Deferred} debugging.
144
When debugging is on, the call stacks from creation and invocation are
145
recorded, and added to any L{AlreadyCalledErrors} we raise.
147
Deferred.debug=bool(on)
153
Determine whether L{Deferred} debugging is enabled.
155
return Deferred.debug
161
This is a callback which will be put off until later.
163
Why do we want this? Well, in cases where a function in a threaded
164
program would block until it gets a result, for Twisted it should
165
not block. Instead, it should return a L{Deferred}.
167
This can be implemented for protocols that run over the network by
168
writing an asynchronous protocol for L{twisted.internet}. For methods
169
that come from outside packages that are not under our control, we use
170
threads (see for example L{twisted.enterprise.adbapi}).
172
For more information about Deferreds, see doc/howto/defer.html or
173
U{http://twistedmatrix.com/projects/core/documentation/howto/defer.html}
181
# Are we currently running a user-installed callback? Meant to prevent
182
# recursive running of callbacks when a reentrant call to add a callback is
184
_runningCallbacks = False
186
# Keep this class attribute for now, for compatibility with code that
194
self._debugInfo = DebugInfo()
195
self._debugInfo.creator = traceback.format_stack()[:-1]
198
def addCallbacks(self, callback, errback=None,
199
callbackArgs=None, callbackKeywords=None,
200
errbackArgs=None, errbackKeywords=None):
202
Add a pair of callbacks (success and error) to this L{Deferred}.
204
These will be executed when the 'master' callback is run.
206
assert callable(callback)
207
assert errback == None or callable(errback)
208
cbs = ((callback, callbackArgs, callbackKeywords),
209
(errback or (passthru), errbackArgs, errbackKeywords))
210
self.callbacks.append(cbs)
217
def addCallback(self, callback, *args, **kw):
219
Convenience method for adding just a callback.
223
return self.addCallbacks(callback, callbackArgs=args,
227
def addErrback(self, errback, *args, **kw):
229
Convenience method for adding just an errback.
233
return self.addCallbacks(passthru, errback,
238
def addBoth(self, callback, *args, **kw):
240
Convenience method for adding a single callable as both a callback
245
return self.addCallbacks(callback, callback,
246
callbackArgs=args, errbackArgs=args,
247
callbackKeywords=kw, errbackKeywords=kw)
250
def chainDeferred(self, d):
252
Chain another L{Deferred} to this L{Deferred}.
254
This method adds callbacks to this L{Deferred} to call C{d}'s callback
255
or errback, as appropriate. It is merely a shorthand way of performing
258
self.addCallbacks(d.callback, d.errback)
260
When you chain a deferred d2 to another deferred d1 with
261
d1.chainDeferred(d2), you are making d2 participate in the callback
262
chain of d1. Thus any event that fires d1 will also fire d2.
263
However, the converse is B{not} true; if d2 is fired d1 will not be
266
return self.addCallbacks(d.callback, d.errback)
269
def callback(self, result):
271
Run all success callbacks that have been added to this L{Deferred}.
273
Each callback will have its result passed as the first
274
argument to the next; this way, the callbacks act as a
275
'processing chain'. Also, if the success-callback returns a L{Failure}
276
or raises an L{Exception}, processing will continue on the *error*-
279
assert not isinstance(result, Deferred)
280
self._startRunCallbacks(result)
283
def errback(self, fail=None):
285
Run all error callbacks that have been added to this L{Deferred}.
287
Each callback will have its result passed as the first
288
argument to the next; this way, the callbacks act as a
289
'processing chain'. Also, if the error-callback returns a non-Failure
290
or doesn't raise an L{Exception}, processing will continue on the
291
*success*-callback chain.
293
If the argument that's passed to me is not a L{failure.Failure} instance,
294
it will be embedded in one. If no argument is passed, a
295
L{failure.Failure} instance will be created based on the current
298
Passing a string as `fail' is deprecated, and will be punished with
301
@raise NoCurrentExceptionError: If C{fail} is C{None} but there is
302
no current exception state.
304
if not isinstance(fail, failure.Failure):
305
fail = failure.Failure(fail)
307
self._startRunCallbacks(fail)
312
Stop processing on a L{Deferred} until L{unpause}() is called.
314
self.paused = self.paused + 1
319
Process all callbacks made since L{pause}() was called.
321
self.paused = self.paused - 1
328
def _continue(self, result):
333
def _startRunCallbacks(self, result):
336
if self._debugInfo is None:
337
self._debugInfo = DebugInfo()
338
extra = "\n" + self._debugInfo._getDebugTracebacks()
339
raise AlreadyCalledError(extra)
340
raise AlreadyCalledError
342
if self._debugInfo is None:
343
self._debugInfo = DebugInfo()
344
self._debugInfo.invoker = traceback.format_stack()[:-2]
349
self.timeoutCall.cancel()
357
def _runCallbacks(self):
358
if self._runningCallbacks:
359
# Don't recursively run callbacks
362
while self.callbacks:
363
item = self.callbacks.pop(0)
364
callback, args, kw = item[
365
isinstance(self.result, failure.Failure)]
369
self._runningCallbacks = True
371
self.result = callback(self.result, *args, **kw)
373
self._runningCallbacks = False
374
if isinstance(self.result, Deferred):
375
# note: this will cause _runCallbacks to be called
376
# recursively if self.result already has a result.
377
# This shouldn't cause any problems, since there is no
378
# relevant state in this stack frame at this point.
379
# The recursive call will continue to process
380
# self.callbacks until it is empty, then return here,
381
# where there is no more work to be done, so this call
382
# will return as well.
384
self.result.addBoth(self._continue)
387
self.result = failure.Failure()
389
if isinstance(self.result, failure.Failure):
390
self.result.cleanFailure()
391
if self._debugInfo is None:
392
self._debugInfo = DebugInfo()
393
self._debugInfo.failResult = self.result
395
if self._debugInfo is not None:
396
self._debugInfo.failResult = None
399
def setTimeout(self, seconds, timeoutFunc=timeout, *args, **kw):
401
Set a timeout function to be triggered if I am not called.
403
@param seconds: How long to wait (from now) before firing the
406
@param timeoutFunc: will receive the L{Deferred} and *args, **kw as its
407
arguments. The default C{timeoutFunc} will call the errback with a
411
"Deferred.setTimeout is deprecated. Look for timeout "
412
"support specific to the API you are using instead.",
413
DeprecationWarning, stacklevel=2)
417
assert not self.timeoutCall, "Don't call setTimeout twice on the same Deferred."
419
from twisted.internet import reactor
420
self.timeoutCall = reactor.callLater(
422
lambda: self.called or timeoutFunc(self, *args, **kw))
423
return self.timeoutCall
426
cname = self.__class__.__name__
427
if hasattr(self, 'result'):
428
return "<%s at %s current result: %r>" % (cname, hex(unsignedID(self)),
430
return "<%s at %s>" % (cname, hex(unsignedID(self)))
437
Deferred debug helper.
443
def _getDebugTracebacks(self):
445
if hasattr(self, "creator"):
446
info += " C: Deferred was created:\n C:"
447
info += "".join(self.creator).rstrip().replace("\n","\n C:")
449
if hasattr(self, "invoker"):
450
info += " I: First Invoker was:\n I:"
451
info += "".join(self.invoker).rstrip().replace("\n","\n I:")
458
Print tracebacks and die.
460
If the *last* (and I do mean *last*) callback leaves me in an error
461
state, print a traceback (if said errback is a L{Failure}).
463
if self.failResult is not None:
464
log.msg("Unhandled error in Deferred:", isError=True)
465
debugInfo = self._getDebugTracebacks()
467
log.msg("(debug: " + debugInfo + ")", isError=True)
468
log.err(self.failResult)
472
class FirstError(Exception):
474
First error to occur in a L{DeferredList} if C{fireOnOneErrback} is set.
476
@ivar subFailure: The L{Failure} that occurred.
477
@type subFailure: L{Failure}
479
@ivar index: The index of the L{Deferred} in the L{DeferredList} where
483
def __init__(self, failure, index):
484
Exception.__init__(self, failure, index)
485
self.subFailure = failure
491
The I{repr} of L{FirstError} instances includes the repr of the
492
wrapped failure's exception and the index of the L{FirstError}.
494
return 'FirstError[#%d, %r]' % (self.index, self.subFailure.value)
499
The I{str} of L{FirstError} instances includes the I{str} of the
500
entire wrapped failure (including its traceback and exception) and
501
the index of the L{FirstError}.
503
return 'FirstError[#%d, %s]' % (self.index, self.subFailure)
506
def __cmp__(self, other):
508
Comparison between L{FirstError} and other L{FirstError} instances
509
is defined as the comparison of the index and sub-failure of each
510
instance. L{FirstError} instances don't compare equal to anything
511
that isn't a L{FirstError} instance.
515
if isinstance(other, FirstError):
517
(self.index, self.subFailure),
518
(other.index, other.subFailure))
523
class DeferredList(Deferred):
525
I combine a group of deferreds into one callback.
527
I track a list of L{Deferred}s for their callbacks, and make a single
528
callback when they have all completed, a list of (success, result)
529
tuples, 'success' being a boolean.
531
Note that you can still use a L{Deferred} after putting it in a
532
DeferredList. For example, you can suppress 'Unhandled error in Deferred'
533
messages by adding errbacks to the Deferreds *after* putting them in the
534
DeferredList, as a DeferredList won't swallow the errors. (Although a more
535
convenient way to do this is simply to set the consumeErrors flag)
538
fireOnOneCallback = 0
542
def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
545
Initialize a DeferredList.
547
@type deferredList: C{list} of L{Deferred}s
548
@param deferredList: The list of deferreds to track.
549
@param fireOnOneCallback: (keyword param) a flag indicating that
550
only one callback needs to be fired for me to call
552
@param fireOnOneErrback: (keyword param) a flag indicating that
553
only one errback needs to be fired for me to call
555
@param consumeErrors: (keyword param) a flag indicating that any errors
556
raised in the original deferreds should be
557
consumed by this DeferredList. This is useful to
558
prevent spurious warnings being logged.
560
self.resultList = [None] * len(deferredList)
561
Deferred.__init__(self)
562
if len(deferredList) == 0 and not fireOnOneCallback:
563
self.callback(self.resultList)
565
# These flags need to be set *before* attaching callbacks to the
566
# deferreds, because the callbacks use these flags, and will run
567
# synchronously if any of the deferreds are already fired.
568
self.fireOnOneCallback = fireOnOneCallback
569
self.fireOnOneErrback = fireOnOneErrback
570
self.consumeErrors = consumeErrors
571
self.finishedCount = 0
574
for deferred in deferredList:
575
deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
576
callbackArgs=(index,SUCCESS),
577
errbackArgs=(index,FAILURE))
581
def _cbDeferred(self, result, index, succeeded):
583
(internal) Callback for when one of my deferreds fires.
585
self.resultList[index] = (succeeded, result)
587
self.finishedCount += 1
589
if succeeded == SUCCESS and self.fireOnOneCallback:
590
self.callback((result, index))
591
elif succeeded == FAILURE and self.fireOnOneErrback:
592
self.errback(failure.Failure(FirstError(result, index)))
593
elif self.finishedCount == len(self.resultList):
594
self.callback(self.resultList)
596
if succeeded == FAILURE and self.consumeErrors:
603
def _parseDListResult(l, fireOnOneErrback=0):
605
for success, value in l:
607
return [x[1] for x in l]
611
def gatherResults(deferredList):
613
Returns list with result of given L{Deferred}s.
615
This builds on L{DeferredList} but is useful since you don't
616
need to parse the result for success/failure.
618
@type deferredList: C{list} of L{Deferred}s
620
d = DeferredList(deferredList, fireOnOneErrback=1)
621
d.addCallback(_parseDListResult)
626
# Constants for use with DeferredList
635
class waitForDeferred:
637
See L{deferredGenerator}.
640
def __init__(self, d):
641
if not isinstance(d, Deferred):
642
raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,))
647
if isinstance(self.result, failure.Failure):
648
self.result.raiseException()
653
def _deferGenerator(g, deferred):
655
See L{deferredGenerator}.
659
# This function is complicated by the need to prevent unbounded recursion
660
# arising from repeatedly yielding immediately ready deferreds. This while
661
# loop and the waiting variable solve that by manually unfolding the
664
waiting = [True, # defgen is waiting for result?
670
except StopIteration:
671
deferred.callback(result)
677
# Deferred.callback(Deferred) raises an error; we catch this case
678
# early here and give a nicer error message to the user in case
679
# they yield a Deferred.
680
if isinstance(result, Deferred):
681
return fail(TypeError("Yield waitForDeferred(d), not d!"))
683
if isinstance(result, waitForDeferred):
684
# a waitForDeferred was yielded, get the result.
685
# Pass result in so it don't get changed going around the loop
686
# This isn't a problem for waiting, as it's only reused if
687
# gotResult has already been executed.
688
def gotResult(r, result=result):
694
_deferGenerator(g, deferred)
695
result.d.addBoth(gotResult)
697
# Haven't called back yet, set flag so that we get reinvoked
698
# and return from the loop
701
# Reset waiting to initial values for next loop
709
def deferredGenerator(f):
711
deferredGenerator and waitForDeferred help you write L{Deferred}-using code
712
that looks like a regular sequential function. If your code has a minimum
713
requirement of Python 2.5, consider the use of L{inlineCallbacks} instead,
714
which can accomplish the same thing in a more concise manner.
716
There are two important functions involved: L{waitForDeferred}, and
717
L{deferredGenerator}. They are used together, like this::
720
thing = waitForDeferred(makeSomeRequestResultingInDeferred())
722
thing = thing.getResult()
723
print thing #the result! hoorj!
724
thingummy = deferredGenerator(thingummy)
726
L{waitForDeferred} returns something that you should immediately yield; when
727
your generator is resumed, calling C{thing.getResult()} will either give you
728
the result of the L{Deferred} if it was a success, or raise an exception if it
729
was a failure. Calling C{getResult} is B{absolutely mandatory}. If you do
730
not call it, I{your program will not work}.
732
L{deferredGenerator} takes one of these waitForDeferred-using generator
733
functions and converts it into a function that returns a L{Deferred}. The
734
result of the L{Deferred} will be the last value that your generator yielded
735
unless the last value is a L{waitForDeferred} instance, in which case the
736
result will be C{None}. If the function raises an unhandled exception, the
737
L{Deferred} will errback instead. Remember that C{return result} won't work;
738
use C{yield result; return} in place of that.
740
Note that not yielding anything from your generator will make the L{Deferred}
741
result in C{None}. Yielding a L{Deferred} from your generator is also an error
742
condition; always yield C{waitForDeferred(d)} instead.
744
The L{Deferred} returned from your deferred generator may also errback if your
745
generator raised an exception. For example::
748
thing = waitForDeferred(makeSomeRequestResultingInDeferred())
750
thing = thing.getResult()
751
if thing == 'I love Twisted':
752
# will become the result of the Deferred
753
yield 'TWISTED IS GREAT!'
756
# will trigger an errback
757
raise Exception('DESTROY ALL LIFE')
758
thingummy = deferredGenerator(thingummy)
760
Put succinctly, these functions connect deferred-using code with this 'fake
761
blocking' style in both directions: L{waitForDeferred} converts from a
762
L{Deferred} to the 'blocking' style, and L{deferredGenerator} converts from the
763
'blocking' style to a L{Deferred}.
766
def unwindGenerator(*args, **kwargs):
767
return _deferGenerator(f(*args, **kwargs), Deferred())
768
return mergeFunctionMetadata(f, unwindGenerator)
773
# BaseException is only in Py 2.5.
777
BaseException=Exception
781
class _DefGen_Return(BaseException):
782
def __init__(self, value):
787
def returnValue(val):
789
Return val from a L{inlineCallbacks} generator.
791
Note: this is currently implemented by raising an exception
792
derived from L{BaseException}. You might want to change any
793
'except:' clauses to an 'except Exception:' clause so as not to
794
catch this exception.
796
Also: while this function currently will work when called from
797
within arbitrary functions called from within the generator, do
798
not rely upon this behavior.
800
raise _DefGen_Return(val)
804
def _inlineCallbacks(result, g, deferred):
806
See L{inlineCallbacks}.
808
# This function is complicated by the need to prevent unbounded recursion
809
# arising from repeatedly yielding immediately ready deferreds. This while
810
# loop and the waiting variable solve that by manually unfolding the
813
waiting = [True, # waiting for result?
818
# Send the last result back as the result of the yield expression.
819
isFailure = isinstance(result, failure.Failure)
821
result = result.throwExceptionIntoGenerator(g)
823
result = g.send(result)
824
except StopIteration:
825
# fell off the end, or "return" statement
826
deferred.callback(None)
828
except _DefGen_Return, e:
829
# returnValue() was called; time to give a result to the original
830
# Deferred. First though, let's try to identify the potentially
831
# confusing situation which results when returnValue() is
832
# accidentally invoked from a different function, one that wasn't
833
# decorated with @inlineCallbacks.
835
# The traceback starts in this frame (the one for
836
# _inlineCallbacks); the next one down should be the application
838
appCodeTrace = exc_info()[2].tb_next
840
# If we invoked this generator frame by throwing an exception
841
# into it, then throwExceptionIntoGenerator will consume an
842
# additional stack frame itself, so we need to skip that too.
843
appCodeTrace = appCodeTrace.tb_next
844
# Now that we've identified the frame being exited by the
845
# exception, let's figure out if returnValue was called from it
846
# directly. returnValue itself consumes a stack frame, so the
847
# application code will have a tb_next, but it will *not* have a
849
if appCodeTrace.tb_next.tb_next:
850
# If returnValue was invoked non-local to the frame which it is
851
# exiting, identify the frame that ultimately invoked
852
# returnValue so that we can warn the user, as this behavior is
854
ultimateTrace = appCodeTrace
855
while ultimateTrace.tb_next.tb_next:
856
ultimateTrace = ultimateTrace.tb_next
857
filename = ultimateTrace.tb_frame.f_code.co_filename
858
lineno = ultimateTrace.tb_lineno
859
warnings.warn_explicit(
860
"returnValue() in %r causing %r to exit: "
861
"returnValue should only be invoked by functions decorated "
862
"with inlineCallbacks" % (
863
ultimateTrace.tb_frame.f_code.co_name,
864
appCodeTrace.tb_frame.f_code.co_name),
865
DeprecationWarning, filename, lineno)
866
deferred.callback(e.value)
872
if isinstance(result, Deferred):
873
# a deferred was yielded, get the result.
879
_inlineCallbacks(r, g, deferred)
881
result.addBoth(gotResult)
883
# Haven't called back yet, set flag so that we get reinvoked
884
# and return from the loop
889
# Reset waiting to initial values for next loop. gotResult uses
890
# waiting, but this isn't a problem because gotResult is only
891
# executed once, and if it hasn't been executed yet, the return
892
# branch above would have been taken.
903
def inlineCallbacks(f):
905
WARNING: this function will not work in Python 2.4 and earlier!
907
inlineCallbacks helps you write Deferred-using code that looks like a
908
regular sequential function. This function uses features of Python 2.5
909
generators. If you need to be compatible with Python 2.4 or before, use
910
the L{deferredGenerator} function instead, which accomplishes the same
911
thing, but with somewhat more boilerplate. For example::
914
thing = yield makeSomeRequestResultingInDeferred()
915
print thing #the result! hoorj!
916
thingummy = inlineCallbacks(thingummy)
918
When you call anything that results in a L{Deferred}, you can simply yield it;
919
your generator will automatically be resumed when the Deferred's result is
920
available. The generator will be sent the result of the L{Deferred} with the
921
'send' method on generators, or if the result was a failure, 'throw'.
923
Your inlineCallbacks-enabled generator will return a L{Deferred} object, which
924
will result in the return value of the generator (or will fail with a
925
failure object if your generator raises an unhandled exception). Note that
926
you can't use C{return result} to return a value; use C{returnValue(result)}
927
instead. Falling off the end of the generator, or simply using C{return}
928
will cause the L{Deferred} to have a result of C{None}.
930
The L{Deferred} returned from your deferred generator may errback if your
931
generator raised an exception::
934
thing = yield makeSomeRequestResultingInDeferred()
935
if thing == 'I love Twisted':
936
# will become the result of the Deferred
937
returnValue('TWISTED IS GREAT!')
939
# will trigger an errback
940
raise Exception('DESTROY ALL LIFE')
941
thingummy = inlineCallbacks(thingummy)
943
def unwindGenerator(*args, **kwargs):
944
return _inlineCallbacks(None, f(*args, **kwargs), Deferred())
945
return mergeFunctionMetadata(f, unwindGenerator)
948
## DeferredLock/DeferredQueue
950
class _ConcurrencyPrimitive(object):
955
def _releaseAndReturn(self, r):
960
def run(*args, **kwargs):
962
Acquire, run, release.
964
This function takes a callable as its first argument and any
965
number of other positional and keyword arguments. When the
966
lock or semaphore is acquired, the callable will be invoked
967
with those arguments.
969
The callable may return a L{Deferred}; if it does, the lock or
970
semaphore won't be released until that L{Deferred} fires.
972
@return: L{Deferred} of function result.
976
raise TypeError("run() takes at least 2 arguments, none given.")
977
raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
978
args[0].__class__.__name__,))
982
def execute(ignoredResult):
983
d = maybeDeferred(f, *args, **kwargs)
984
d.addBoth(self._releaseAndReturn)
988
d.addCallback(execute)
993
class DeferredLock(_ConcurrencyPrimitive):
995
A lock for event driven systems.
997
@ivar locked: C{True} when this Lock has been acquired, false at all
998
other times. Do not change this value, but it is useful to
999
examine for the equivalent of a "non-blocking" acquisition.
1007
Attempt to acquire the lock. Returns a L{Deferred} that fires on
1008
lock acquisition with the L{DeferredLock} as the value. If the lock
1009
is locked, then the Deferred is placed at the end of a waiting list.
1011
@return: a L{Deferred} which fires on lock acquisition.
1012
@rtype: a L{Deferred}
1016
self.waiting.append(d)
1025
Release the lock. If there is a waiting list, then the first
1026
L{Deferred} in that waiting list will be called back.
1028
Should be called by whomever did the L{acquire}() when the shared
1031
assert self.locked, "Tried to release an unlocked lock"
1034
# someone is waiting to acquire lock
1036
d = self.waiting.pop(0)
1041
class DeferredSemaphore(_ConcurrencyPrimitive):
1043
A semaphore for event driven systems.
1047
def __init__(self, tokens):
1048
_ConcurrencyPrimitive.__init__(self)
1049
self.tokens = tokens
1055
Attempt to acquire the token.
1057
@return: a L{Deferred} which fires on token acquisition.
1059
assert self.tokens >= 0, "Internal inconsistency?? tokens should never be negative"
1062
self.waiting.append(d)
1064
self.tokens = self.tokens - 1
1073
Should be called by whoever did the L{acquire}() when the shared
1076
assert self.tokens < self.limit, "Someone released me too many times: too many tokens!"
1077
self.tokens = self.tokens + 1
1079
# someone is waiting to acquire token
1080
self.tokens = self.tokens - 1
1081
d = self.waiting.pop(0)
1086
class QueueOverflow(Exception):
1091
class QueueUnderflow(Exception):
1096
class DeferredQueue(object):
1098
An event driven queue.
1100
Objects may be added as usual to this queue. When an attempt is
1101
made to retrieve an object when the queue is empty, a L{Deferred} is
1102
returned which will fire when an object becomes available.
1104
@ivar size: The maximum number of objects to allow into the queue
1105
at a time. When an attempt to add a new object would exceed this
1106
limit, L{QueueOverflow} is raised synchronously. C{None} for no limit.
1108
@ivar backlog: The maximum number of L{Deferred} gets to allow at
1109
one time. When an attempt is made to get an object which would
1110
exceed this limit, L{QueueUnderflow} is raised synchronously. C{None}
1114
def __init__(self, size=None, backlog=None):
1118
self.backlog = backlog
1123
Add an object to this queue.
1125
@raise QueueOverflow: Too many objects are in this queue.
1128
self.waiting.pop(0).callback(obj)
1129
elif self.size is None or len(self.pending) < self.size:
1130
self.pending.append(obj)
1132
raise QueueOverflow()
1137
Attempt to retrieve and remove an object from the queue.
1139
@return: a L{Deferred} which fires with the next object available in
1142
@raise QueueUnderflow: Too many (more than C{backlog})
1143
L{Deferred}s are already waiting for an object from this queue.
1146
return succeed(self.pending.pop(0))
1147
elif self.backlog is None or len(self.waiting) < self.backlog:
1149
self.waiting.append(d)
1152
raise QueueUnderflow()
1156
class AlreadyTryingToLockError(Exception):
1158
Raised when L{DeferredFilesystemLock.deferUntilLocked} is called twice on a
1159
single L{DeferredFilesystemLock}.
1164
class DeferredFilesystemLock(lockfile.FilesystemLock):
1166
A L{FilesystemLock} that allows for a L{Deferred} to be fired when the lock is
1169
@ivar _scheduler: The object in charge of scheduling retries. In this
1170
implementation this is parameterized for testing.
1172
@ivar _interval: The retry interval for an L{IReactorTime} based scheduler.
1174
@ivar _tryLockCall: A L{DelayedCall} based on C{_interval} that will manage
1175
the next retry for aquiring the lock.
1177
@ivar _timeoutCall: A L{DelayedCall} based on C{deferUntilLocked}'s timeout
1178
argument. This is in charge of timing out our attempt to acquire the
1186
def __init__(self, name, scheduler=None):
1188
@param name: The name of the lock to acquire
1189
@param scheduler: An object which provides L{IReactorTime}
1191
lockfile.FilesystemLock.__init__(self, name)
1193
if scheduler is None:
1194
from twisted.internet import reactor
1197
self._scheduler = scheduler
1200
def deferUntilLocked(self, timeout=None):
1202
Wait until we acquire this lock. This method is not safe for
1205
@type timeout: C{float} or C{int}
1206
@param timeout: the number of seconds after which to time out if the
1207
lock has not been acquired.
1209
@return: a L{Deferred} which will callback when the lock is acquired, or
1210
errback with a L{TimeoutError} after timing out or an
1211
L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already
1212
been called and not successfully locked the file.
1214
if self._tryLockCall is not None:
1216
AlreadyTryingToLockError(
1217
"deferUntilLocked isn't safe for concurrent use."))
1222
self._tryLockCall.cancel()
1223
self._tryLockCall = None
1224
self._timeoutCall = None
1229
d.errback(failure.Failure(
1230
TimeoutError("Timed out aquiring lock: %s after %fs" % (
1236
if self._timeoutCall is not None:
1237
self._timeoutCall.cancel()
1238
self._timeoutCall = None
1240
self._tryLockCall = None
1244
if timeout is not None and self._timeoutCall is None:
1245
self._timeoutCall = self._scheduler.callLater(
1246
timeout, _cancelLock)
1248
self._tryLockCall = self._scheduler.callLater(
1249
self._interval, _tryLock)
1257
__all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
1258
"AlreadyCalledError", "TimeoutError", "gatherResults",
1260
"waitForDeferred", "deferredGenerator", "inlineCallbacks",
1262
"DeferredLock", "DeferredSemaphore", "DeferredQueue",
1263
"DeferredFilesystemLock", "AlreadyTryingToLockError",