2
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
3
# See LICENSE for details.
7
Test cases for defer module.
12
from twisted.trial import unittest, util
13
from twisted.internet import reactor, defer
14
from twisted.python import failure, log
16
from twisted.internet.task import Clock
18
class GenericError(Exception):
22
_setTimeoutSuppression = util.suppress(
23
message="Deferred.setTimeout is deprecated. Look for timeout "
24
"support specific to the API you are using instead.",
25
category=DeprecationWarning)
28
class DeferredTestCase(unittest.TestCase):
31
self.callback_results = None
32
self.errback_results = None
33
self.callback2_results = None
35
def _callback(self, *args, **kw):
36
self.callback_results = args, kw
39
def _callback2(self, *args, **kw):
40
self.callback2_results = args, kw
42
def _errback(self, *args, **kw):
43
self.errback_results = args, kw
45
def testCallbackWithoutArgs(self):
46
deferred = defer.Deferred()
47
deferred.addCallback(self._callback)
48
deferred.callback("hello")
49
self.failUnlessEqual(self.errback_results, None)
50
self.failUnlessEqual(self.callback_results, (('hello',), {}))
52
def testCallbackWithArgs(self):
53
deferred = defer.Deferred()
54
deferred.addCallback(self._callback, "world")
55
deferred.callback("hello")
56
self.failUnlessEqual(self.errback_results, None)
57
self.failUnlessEqual(self.callback_results, (('hello', 'world'), {}))
59
def testCallbackWithKwArgs(self):
60
deferred = defer.Deferred()
61
deferred.addCallback(self._callback, world="world")
62
deferred.callback("hello")
63
self.failUnlessEqual(self.errback_results, None)
64
self.failUnlessEqual(self.callback_results,
65
(('hello',), {'world': 'world'}))
67
def testTwoCallbacks(self):
68
deferred = defer.Deferred()
69
deferred.addCallback(self._callback)
70
deferred.addCallback(self._callback2)
71
deferred.callback("hello")
72
self.failUnlessEqual(self.errback_results, None)
73
self.failUnlessEqual(self.callback_results,
75
self.failUnlessEqual(self.callback2_results,
78
def testDeferredList(self):
79
defr1 = defer.Deferred()
80
defr2 = defer.Deferred()
81
defr3 = defer.Deferred()
82
dl = defer.DeferredList([defr1, defr2, defr3])
84
def cb(resultList, result=result):
85
result.extend(resultList)
88
dl.addCallbacks(cb, cb)
90
defr2.addErrback(catch)
91
# "catch" is added to eat the GenericError that will be passed on by
92
# the DeferredList's callback on defr2. If left unhandled, the
93
# Failure object would cause a log.err() warning about "Unhandled
94
# error in Deferred". Twisted's pyunit watches for log.err calls and
95
# treats them as failures. So "catch" must eat the error to prevent
96
# it from flunking the test.
97
defr2.errback(GenericError("2"))
99
self.failUnlessEqual([result[0],
100
#result[1][1] is now a Failure instead of an Exception
101
(result[1][0], str(result[1][1].value)),
104
[(defer.SUCCESS, "1"),
105
(defer.FAILURE, "2"),
106
(defer.SUCCESS, "3")])
108
def testEmptyDeferredList(self):
110
def cb(resultList, result=result):
111
result.append(resultList)
113
dl = defer.DeferredList([])
115
self.failUnlessEqual(result, [[]])
118
dl = defer.DeferredList([], fireOnOneCallback=1)
120
self.failUnlessEqual(result, [])
122
def testDeferredListFireOnOneError(self):
123
defr1 = defer.Deferred()
124
defr2 = defer.Deferred()
125
defr3 = defer.Deferred()
126
dl = defer.DeferredList([defr1, defr2, defr3], fireOnOneErrback=1)
128
dl.addErrback(result.append)
130
# consume errors after they pass through the DeferredList (to avoid
131
# 'Unhandled error in Deferred'.
134
defr2.addErrback(catch)
136
# fire one Deferred's callback, no result yet
138
self.failUnlessEqual(result, [])
140
# fire one Deferred's errback -- now we have a result
141
defr2.errback(GenericError("from def2"))
142
self.failUnlessEqual(len(result), 1)
144
# extract the result from the list
147
# the type of the failure is a FirstError
148
self.failUnless(issubclass(failure.type, defer.FirstError),
149
'issubclass(failure.type, defer.FirstError) failed: '
150
'failure.type is %r' % (failure.type,)
153
firstError = failure.value
155
# check that the GenericError("2") from the deferred at index 1
156
# (defr2) is intact inside failure.value
157
self.failUnlessEqual(firstError.subFailure.type, GenericError)
158
self.failUnlessEqual(firstError.subFailure.value.args, ("from def2",))
159
self.failUnlessEqual(firstError.index, 1)
162
def testDeferredListDontConsumeErrors(self):
163
d1 = defer.Deferred()
164
dl = defer.DeferredList([d1])
167
d1.addErrback(errorTrap.append)
170
dl.addCallback(result.append)
172
d1.errback(GenericError('Bang'))
173
self.failUnlessEqual('Bang', errorTrap[0].value.args[0])
174
self.failUnlessEqual(1, len(result))
175
self.failUnlessEqual('Bang', result[0][0][1].value.args[0])
177
def testDeferredListConsumeErrors(self):
178
d1 = defer.Deferred()
179
dl = defer.DeferredList([d1], consumeErrors=True)
182
d1.addErrback(errorTrap.append)
185
dl.addCallback(result.append)
187
d1.errback(GenericError('Bang'))
188
self.failUnlessEqual([], errorTrap)
189
self.failUnlessEqual(1, len(result))
190
self.failUnlessEqual('Bang', result[0][0][1].value.args[0])
192
def testDeferredListFireOnOneErrorWithAlreadyFiredDeferreds(self):
193
# Create some deferreds, and errback one
194
d1 = defer.Deferred()
195
d2 = defer.Deferred()
196
d1.errback(GenericError('Bang'))
198
# *Then* build the DeferredList, with fireOnOneErrback=True
199
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
201
dl.addErrback(result.append)
202
self.failUnlessEqual(1, len(result))
204
d1.addErrback(lambda e: None) # Swallow error
206
def testDeferredListWithAlreadyFiredDeferreds(self):
207
# Create some deferreds, and err one, call the other
208
d1 = defer.Deferred()
209
d2 = defer.Deferred()
210
d1.errback(GenericError('Bang'))
213
# *Then* build the DeferredList
214
dl = defer.DeferredList([d1, d2])
217
dl.addCallback(result.append)
219
self.failUnlessEqual(1, len(result))
221
d1.addErrback(lambda e: None) # Swallow error
223
def testTimeOut(self):
225
Test that a Deferred which has setTimeout called on it and never has
226
C{callback} or C{errback} called on it eventually fails with a
227
L{error.TimeoutError}.
232
self.assertFailure(d, defer.TimeoutError)
233
d.addCallback(L.append)
234
self.failIf(L, "Deferred failed too soon.")
236
testTimeOut.suppress = [_setTimeoutSuppression]
239
def testImmediateSuccess(self):
241
d = defer.succeed("success")
242
d.addCallback(l.append)
243
self.assertEquals(l, ["success"])
246
def test_immediateSuccessBeforeTimeout(self):
248
Test that a synchronously successful Deferred is not affected by a
252
d = defer.succeed("success")
254
d.addCallback(l.append)
255
self.assertEquals(l, ["success"])
256
test_immediateSuccessBeforeTimeout.suppress = [_setTimeoutSuppression]
259
def testImmediateFailure(self):
261
d = defer.fail(GenericError("fail"))
262
d.addErrback(l.append)
263
self.assertEquals(str(l[0].value), "fail")
265
def testPausedFailure(self):
267
d = defer.fail(GenericError("fail"))
269
d.addErrback(l.append)
270
self.assertEquals(l, [])
272
self.assertEquals(str(l[0].value), "fail")
274
def testCallbackErrors(self):
276
d = defer.Deferred().addCallback(lambda _: 1/0).addErrback(l.append)
278
self.assert_(isinstance(l[0].value, ZeroDivisionError))
280
d = defer.Deferred().addCallback(
281
lambda _: failure.Failure(ZeroDivisionError())).addErrback(l.append)
283
self.assert_(isinstance(l[0].value, ZeroDivisionError))
285
def testUnpauseBeforeCallback(self):
288
d.addCallback(self._callback)
291
def testReturnDeferred(self):
293
d2 = defer.Deferred()
295
d.addCallback(lambda r, d2=d2: d2)
296
d.addCallback(self._callback)
298
assert self.callback_results is None, "Should not have been called yet."
300
assert self.callback_results is None, "Still should not have been called yet."
302
assert self.callback_results[0][0] == 2, "Result should have been from second deferred:%s"% (self.callback_results,)
304
def testGatherResults(self):
305
# test successful list of deferreds
307
defer.gatherResults([defer.succeed(1), defer.succeed(2)]).addCallback(l.append)
308
self.assertEquals(l, [[1, 2]])
309
# test failing list of deferreds
311
dl = [defer.succeed(1), defer.fail(ValueError)]
312
defer.gatherResults(dl).addErrback(l.append)
313
self.assertEquals(len(l), 1)
314
self.assert_(isinstance(l[0], failure.Failure))
316
dl[1].addErrback(lambda e: 1)
319
def test_maybeDeferredSync(self):
321
L{defer.maybeDeferred} should retrieve the result of a synchronous
322
function and pass it to its resulting L{defer.Deferred}.
325
d = defer.maybeDeferred((lambda x: x + 5), 10)
326
d.addCallbacks(S.append, E.append)
327
self.assertEquals(E, [])
328
self.assertEquals(S, [15])
332
def test_maybeDeferredSyncError(self):
334
L{defer.maybeDeferred} should catch exception raised by a synchronous
335
function and errback its resulting L{defer.Deferred} with it.
342
d = defer.maybeDeferred((lambda x: x + 5), '10')
343
d.addCallbacks(S.append, E.append)
344
self.assertEquals(S, [])
345
self.assertEquals(len(E), 1)
346
self.assertEquals(str(E[0].value), expected)
350
def test_maybeDeferredAsync(self):
352
L{defer.maybeDeferred} should let L{defer.Deferred} instance pass by
353
so that original result is the same.
356
d2 = defer.maybeDeferred(lambda: d)
357
d.callback('Success')
358
return d2.addCallback(self.assertEquals, 'Success')
361
def test_maybeDeferredAsyncError(self):
363
L{defer.maybeDeferred} should let L{defer.Deferred} instance pass by
364
so that L{failure.Failure} returned by the original instance is the
368
d2 = defer.maybeDeferred(lambda: d)
369
d.errback(failure.Failure(RuntimeError()))
370
return self.assertFailure(d2, RuntimeError)
373
def test_reentrantRunCallbacks(self):
375
A callback added to a L{Deferred} by a callback on that L{Deferred}
376
should be added to the end of the callback chain.
378
deferred = defer.Deferred()
380
def callback3(result):
382
def callback2(result):
384
def callback1(result):
386
deferred.addCallback(callback3)
387
deferred.addCallback(callback1)
388
deferred.addCallback(callback2)
389
deferred.callback(None)
390
self.assertEqual(called, [1, 2, 3])
393
def test_nonReentrantCallbacks(self):
395
A callback added to a L{Deferred} by a callback on that L{Deferred}
396
should not be executed until the running callback returns.
398
deferred = defer.Deferred()
400
def callback2(result):
402
def callback1(result):
404
deferred.addCallback(callback2)
405
self.assertEquals(called, [1])
406
deferred.addCallback(callback1)
407
deferred.callback(None)
408
self.assertEqual(called, [1, 2])
411
def test_reentrantRunCallbacksWithFailure(self):
413
After an exception is raised by a callback which was added to a
414
L{Deferred} by a callback on that L{Deferred}, the L{Deferred} should
415
call the first errback with a L{Failure} wrapping that exception.
417
exceptionMessage = "callback raised exception"
418
deferred = defer.Deferred()
419
def callback2(result):
420
raise Exception(exceptionMessage)
421
def callback1(result):
422
deferred.addCallback(callback2)
423
deferred.addCallback(callback1)
424
deferred.callback(None)
425
self.assertFailure(deferred, Exception)
426
def cbFailed(exception):
427
self.assertEqual(exception.args, (exceptionMessage,))
428
deferred.addCallback(cbFailed)
433
class FirstErrorTests(unittest.TestCase):
435
Tests for L{FirstError}.
439
The repr of a L{FirstError} instance includes the repr of the value of
440
the sub-failure and the index which corresponds to the L{FirstError}.
442
exc = ValueError("some text")
446
f = failure.Failure()
448
error = defer.FirstError(f, 3)
451
"FirstError[#3, %s]" % (repr(exc),))
456
The str of a L{FirstError} instance includes the str of the
457
sub-failure and the index which corresponds to the L{FirstError}.
459
exc = ValueError("some text")
463
f = failure.Failure()
465
error = defer.FirstError(f, 5)
468
"FirstError[#5, %s]" % (str(f),))
471
def test_comparison(self):
473
L{FirstError} instances compare equal to each other if and only if
474
their failure and index compare equal. L{FirstError} instances do not
475
compare equal to instances of other types.
480
firstFailure = failure.Failure()
482
one = defer.FirstError(firstFailure, 13)
483
anotherOne = defer.FirstError(firstFailure, 13)
486
raise ValueError("bar")
488
secondFailure = failure.Failure()
490
another = defer.FirstError(secondFailure, 9)
492
self.assertTrue(one == anotherOne)
493
self.assertFalse(one == another)
494
self.assertTrue(one != another)
495
self.assertFalse(one != anotherOne)
497
self.assertFalse(one == 10)
501
class AlreadyCalledTestCase(unittest.TestCase):
503
self._deferredWasDebugging = defer.getDebugging()
504
defer.setDebugging(True)
507
defer.setDebugging(self._deferredWasDebugging)
509
def _callback(self, *args, **kw):
511
def _errback(self, *args, **kw):
514
def _call_1(self, d):
516
def _call_2(self, d):
519
d.errback(failure.Failure(RuntimeError()))
521
d.errback(failure.Failure(RuntimeError()))
523
def testAlreadyCalled_CC(self):
525
d.addCallbacks(self._callback, self._errback)
527
self.failUnlessRaises(defer.AlreadyCalledError, self._call_2, d)
529
def testAlreadyCalled_CE(self):
531
d.addCallbacks(self._callback, self._errback)
533
self.failUnlessRaises(defer.AlreadyCalledError, self._err_2, d)
535
def testAlreadyCalled_EE(self):
537
d.addCallbacks(self._callback, self._errback)
539
self.failUnlessRaises(defer.AlreadyCalledError, self._err_2, d)
541
def testAlreadyCalled_EC(self):
543
d.addCallbacks(self._callback, self._errback)
545
self.failUnlessRaises(defer.AlreadyCalledError, self._call_2, d)
548
def _count(self, linetype, func, lines, expected):
551
if (line.startswith(' %s:' % linetype) and
552
line.endswith(' %s' % func)):
554
self.failUnless(count == expected)
556
def _check(self, e, caller, invoker1, invoker2):
557
# make sure the debugging information is vaguely correct
558
lines = e.args[0].split("\n")
559
# the creator should list the creator (testAlreadyCalledDebug) but not
560
# _call_1 or _call_2 or other invokers
561
self._count('C', caller, lines, 1)
562
self._count('C', '_call_1', lines, 0)
563
self._count('C', '_call_2', lines, 0)
564
self._count('C', '_err_1', lines, 0)
565
self._count('C', '_err_2', lines, 0)
566
# invoker should list the first invoker but not the second
567
self._count('I', invoker1, lines, 1)
568
self._count('I', invoker2, lines, 0)
570
def testAlreadyCalledDebug_CC(self):
572
d.addCallbacks(self._callback, self._errback)
576
except defer.AlreadyCalledError, e:
577
self._check(e, "testAlreadyCalledDebug_CC", "_call_1", "_call_2")
579
self.fail("second callback failed to raise AlreadyCalledError")
581
def testAlreadyCalledDebug_CE(self):
583
d.addCallbacks(self._callback, self._errback)
587
except defer.AlreadyCalledError, e:
588
self._check(e, "testAlreadyCalledDebug_CE", "_call_1", "_err_2")
590
self.fail("second errback failed to raise AlreadyCalledError")
592
def testAlreadyCalledDebug_EC(self):
594
d.addCallbacks(self._callback, self._errback)
598
except defer.AlreadyCalledError, e:
599
self._check(e, "testAlreadyCalledDebug_EC", "_err_1", "_call_2")
601
self.fail("second callback failed to raise AlreadyCalledError")
603
def testAlreadyCalledDebug_EE(self):
605
d.addCallbacks(self._callback, self._errback)
609
except defer.AlreadyCalledError, e:
610
self._check(e, "testAlreadyCalledDebug_EE", "_err_1", "_err_2")
612
self.fail("second errback failed to raise AlreadyCalledError")
614
def testNoDebugging(self):
615
defer.setDebugging(False)
617
d.addCallbacks(self._callback, self._errback)
621
except defer.AlreadyCalledError, e:
624
self.fail("second callback failed to raise AlreadyCalledError")
627
def testSwitchDebugging(self):
628
# Make sure Deferreds can deal with debug state flipping
629
# around randomly. This is covering a particular fixed bug.
630
defer.setDebugging(False)
632
d.addBoth(lambda ign: None)
633
defer.setDebugging(True)
636
defer.setDebugging(False)
639
defer.setDebugging(True)
640
d.addBoth(lambda ign: None)
644
class LogTestCase(unittest.TestCase):
646
Test logging of unhandled errors.
651
Add a custom observer to observer logging.
654
log.addObserver(self.c.append)
660
log.removeObserver(self.c.append)
664
Check the output of the log observer to see if the error is present.
666
c2 = [e for e in self.c if e["isError"]]
667
self.assertEquals(len(c2), 2)
668
c2[1]["failure"].trap(ZeroDivisionError)
669
self.flushLoggedErrors(ZeroDivisionError)
671
def test_errorLog(self):
673
Verify that when a Deferred with no references to it is fired, and its
674
final result (the one not handled by any callback) is an exception,
675
that exception will be logged immediately.
677
defer.Deferred().addCallback(lambda x: 1/0).callback(1)
681
def test_errorLogWithInnerFrameRef(self):
683
Same as L{test_errorLog}, but with an inner frame.
685
def _subErrorLogWithInnerFrameRef():
687
d.addCallback(lambda x: 1/0)
690
_subErrorLogWithInnerFrameRef()
694
def test_errorLogWithInnerFrameCycle(self):
696
Same as L{test_errorLogWithInnerFrameRef}, plus create a cycle.
698
def _subErrorLogWithInnerFrameCycle():
700
d.addCallback(lambda x, d=d: 1/0)
704
_subErrorLogWithInnerFrameCycle()
709
class DeferredTestCaseII(unittest.TestCase):
713
def testDeferredListEmpty(self):
714
"""Testing empty DeferredList."""
715
dl = defer.DeferredList([])
716
dl.addCallback(self.cb_empty)
718
def cb_empty(self, res):
720
self.failUnlessEqual([], res)
723
self.failUnless(self.callbackRan, "Callback was never run.")
725
class OtherPrimitives(unittest.TestCase):
726
def _incr(self, result):
733
lock = defer.DeferredLock()
734
lock.acquire().addCallback(self._incr)
735
self.failUnless(lock.locked)
736
self.assertEquals(self.counter, 1)
738
lock.acquire().addCallback(self._incr)
739
self.failUnless(lock.locked)
740
self.assertEquals(self.counter, 1)
743
self.failUnless(lock.locked)
744
self.assertEquals(self.counter, 2)
747
self.failIf(lock.locked)
748
self.assertEquals(self.counter, 2)
750
self.assertRaises(TypeError, lock.run)
752
firstUnique = object()
753
secondUnique = object()
755
controlDeferred = defer.Deferred()
758
return controlDeferred
760
resultDeferred = lock.run(helper, self=self, b=firstUnique)
761
self.failUnless(lock.locked)
762
self.assertEquals(self.b, firstUnique)
764
resultDeferred.addCallback(lambda x: setattr(self, 'result', x))
766
lock.acquire().addCallback(self._incr)
767
self.failUnless(lock.locked)
768
self.assertEquals(self.counter, 2)
770
controlDeferred.callback(secondUnique)
771
self.assertEquals(self.result, secondUnique)
772
self.failUnless(lock.locked)
773
self.assertEquals(self.counter, 3)
776
self.failIf(lock.locked)
778
def testSemaphore(self):
780
sem = defer.DeferredSemaphore(N)
782
controlDeferred = defer.Deferred()
783
def helper(self, arg):
785
return controlDeferred
788
uniqueObject = object()
789
resultDeferred = sem.run(helper, self=self, arg=uniqueObject)
790
resultDeferred.addCallback(results.append)
791
resultDeferred.addCallback(self._incr)
792
self.assertEquals(results, [])
793
self.assertEquals(self.arg, uniqueObject)
794
controlDeferred.callback(None)
795
self.assertEquals(results.pop(), None)
796
self.assertEquals(self.counter, 1)
799
for i in range(1, 1 + N):
800
sem.acquire().addCallback(self._incr)
801
self.assertEquals(self.counter, i)
803
sem.acquire().addCallback(self._incr)
804
self.assertEquals(self.counter, N)
807
self.assertEquals(self.counter, N + 1)
809
for i in range(1, 1 + N):
811
self.assertEquals(self.counter, N + 1)
815
queue = defer.DeferredQueue(N, M)
820
queue.get().addCallback(gotten.append)
821
self.assertRaises(defer.QueueUnderflow, queue.get)
825
self.assertEquals(gotten, range(i + 1))
828
self.assertEquals(gotten, range(M))
829
self.assertRaises(defer.QueueOverflow, queue.put, None)
833
queue.get().addCallback(gotten.append)
834
self.assertEquals(gotten, range(N, N + i + 1))
836
queue = defer.DeferredQueue()
839
queue.get().addCallback(gotten.append)
842
self.assertEquals(gotten, range(N))
844
queue = defer.DeferredQueue(size=0)
845
self.assertRaises(defer.QueueOverflow, queue.put, None)
847
queue = defer.DeferredQueue(backlog=0)
848
self.assertRaises(defer.QueueUnderflow, queue.get)
852
class DeferredFilesystemLockTestCase(unittest.TestCase):
854
Test the behavior of L{DeferredFilesystemLock}
858
self.lock = defer.DeferredFilesystemLock(self.mktemp(),
859
scheduler=self.clock)
862
def test_waitUntilLockedWithNoLock(self):
864
Test that the lock can be acquired when no lock is held
866
d = self.lock.deferUntilLocked(timeout=1)
871
def test_waitUntilLockedWithTimeoutLocked(self):
873
Test that the lock can not be acquired when the lock is held
874
for longer than the timeout.
876
self.failUnless(self.lock.lock())
878
d = self.lock.deferUntilLocked(timeout=5.5)
879
self.assertFailure(d, defer.TimeoutError)
881
self.clock.pump([1]*10)
886
def test_waitUntilLockedWithTimeoutUnlocked(self):
888
Test that a lock can be acquired while a lock is held
889
but the lock is unlocked before our timeout.
892
f.trap(defer.TimeoutError)
893
self.fail("Should not have timed out")
895
self.failUnless(self.lock.lock())
897
self.clock.callLater(1, self.lock.unlock)
898
d = self.lock.deferUntilLocked(timeout=10)
899
d.addErrback(onTimeout)
901
self.clock.pump([1]*10)
906
def test_defaultScheduler(self):
908
Test that the default scheduler is set up properly.
910
lock = defer.DeferredFilesystemLock(self.mktemp())
912
self.assertEquals(lock._scheduler, reactor)
915
def test_concurrentUsage(self):
917
Test that an appropriate exception is raised when attempting
918
to use deferUntilLocked concurrently.
921
self.clock.callLater(1, self.lock.unlock)
923
d = self.lock.deferUntilLocked()
924
d2 = self.lock.deferUntilLocked()
926
self.assertFailure(d2, defer.AlreadyTryingToLockError)
928
self.clock.advance(1)
933
def test_multipleUsages(self):
935
Test that a DeferredFilesystemLock can be used multiple times
937
def lockAquired(ign):
939
d = self.lock.deferUntilLocked()
943
self.clock.callLater(1, self.lock.unlock)
945
d = self.lock.deferUntilLocked()
946
d.addCallback(lockAquired)
948
self.clock.advance(1)