1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
This module contains tests for L{twisted.internet.task.Cooperator} and
9
from twisted.internet import reactor, defer, task
10
from twisted.trial import unittest
14
class FakeDelayedCall(object):
16
Fake delayed call which lets us simulate the scheduler.
18
def __init__(self, func):
20
A function to run, later.
23
self.cancelled = False
28
Don't run my function later.
34
class FakeScheduler(object):
36
A fake scheduler for testing against.
40
Create a fake scheduler with a list of work to do.
45
def __call__(self, thunk):
47
Schedule a unit of work to be done later.
49
unit = FakeDelayedCall(thunk)
50
self.work.append(unit)
56
Do all of the work that is currently available to be done.
58
work, self.work = self.work, []
60
if not unit.cancelled:
65
class TestCooperator(unittest.TestCase):
68
def ebIter(self, err):
69
err.trap(task.SchedulerStopped)
73
def cbIter(self, ign):
77
def testStoppedRejectsNewTasks(self):
79
Test that Cooperators refuse new tasks when they have been stopped.
84
d = c.coiterate(iter(()), stuff)
85
d.addCallback(self.cbIter)
86
d.addErrback(self.ebIter)
87
return d.addCallback(lambda result:
88
self.assertEquals(result, self.RESULT))
89
return testwith(None).addCallback(lambda ign: testwith(defer.Deferred()))
92
def testStopRunning(self):
94
Test that a running iterator will not run to completion when the
95
cooperator is stopped.
99
for myiter.value in range(3):
102
d = c.coiterate(myiter())
103
d.addCallback(self.cbIter)
104
d.addErrback(self.ebIter)
106
def doasserts(result):
107
self.assertEquals(result, self.RESULT)
108
self.assertEquals(myiter.value, -1)
109
d.addCallback(doasserts)
113
def testStopOutstanding(self):
115
An iterator run with L{Cooperator.coiterate} paused on a L{Deferred}
116
yielded by that iterator will fire its own L{Deferred} (the one
117
returned by C{coiterate}) when L{Cooperator.stop} is called.
119
testControlD = defer.Deferred()
120
outstandingD = defer.Deferred()
122
reactor.callLater(0, testControlD.callback, None)
125
c = task.Cooperator()
126
d = c.coiterate(myiter())
129
outstandingD.callback('arglebargle')
131
testControlD.addCallback(stopAndGo)
132
d.addCallback(self.cbIter)
133
d.addErrback(self.ebIter)
135
return d.addCallback(
136
lambda result: self.assertEquals(result, self.RESULT))
139
def testUnexpectedError(self):
140
c = task.Cooperator()
146
d = c.coiterate(myiter())
147
return self.assertFailure(d, RuntimeError)
150
def testUnexpectedErrorActuallyLater(self):
153
reactor.callLater(0, D.errback, RuntimeError())
156
c = task.Cooperator()
157
d = c.coiterate(myiter())
158
return self.assertFailure(d, RuntimeError)
161
def testUnexpectedErrorNotActuallyLater(self):
163
yield defer.fail(RuntimeError())
165
c = task.Cooperator()
166
d = c.coiterate(myiter())
167
return self.assertFailure(d, RuntimeError)
170
def testCooperation(self):
177
groupsOfThings = ['abc', (1, 2, 3), 'def', (4, 5, 6)]
179
c = task.Cooperator()
181
for stuff in groupsOfThings:
182
tasks.append(c.coiterate(myiter(stuff)))
184
return defer.DeferredList(tasks).addCallback(
185
lambda ign: self.assertEquals(tuple(L), sum(zip(*groupsOfThings), ())))
188
def testResourceExhaustion(self):
202
c = task.Cooperator(terminationPredicateFactory=_TPF)
203
c.coiterate(myiter()).addErrback(self.ebIter)
204
c._delayedCall.cancel()
205
# testing a private method because only the test case will ever care
206
# about this, so we have to carefully clean up after ourselves.
209
self.failUnless(_TPF.stopped)
210
self.assertEquals(output, range(10))
213
def testCallbackReCoiterate(self):
215
If a callback to a deferred returned by coiterate calls coiterate on
216
the same Cooperator, we should make sure to only do the minimal amount
217
of scheduling work. (This test was added to demonstrate a specific bug
218
that was found while writing the scheduler.)
223
def __init__(self, func):
227
return '<FakeCall %r>' % (self.func,)
230
self.failIf(calls, repr(calls))
231
calls.append(FakeCall(f))
234
c = task.Cooperator(scheduler=sched, terminationPredicateFactory=lambda: lambda: True)
235
d = c.coiterate(iter(()))
238
def anotherTask(ign):
239
c.coiterate(iter(())).addBoth(done.append)
241
d.addCallback(anotherTask)
250
self.fail("Cooperator took too long")
254
class UnhandledException(Exception):
256
An exception that should go unhandled.
261
class AliasTests(unittest.TestCase):
263
Integration test to verify that the global singleton aliases do what
267
def test_cooperate(self):
269
L{twisted.internet.task.cooperate} ought to run the generator that it is
278
theTask = task.cooperate(it)
279
self.assertIn(theTask, task._theCooperator._tasks)
284
class RunStateTests(unittest.TestCase):
286
Tests to verify the behavior of L{CooperativeTask.pause},
287
L{CooperativeTask.resume}, L{CooperativeTask.stop}, exhausting the
288
underlying iterator, and their interactions with each other.
293
Create a cooperator with a fake scheduler and a termination predicate
294
that ensures only one unit of work will take place per tick.
296
self._doDeferNext = False
297
self._doStopNext = False
298
self._doDieNext = False
300
self.scheduler = FakeScheduler()
301
self.cooperator = task.Cooperator(
302
scheduler=self.scheduler,
303
# Always stop after one iteration of work (return a function which
304
# returns a function which always returns True)
305
terminationPredicateFactory=lambda: lambda: True)
306
self.task = self.cooperator.cooperate(self.worker())
307
self.cooperator.start()
312
This is a sample generator which yields Deferreds when we are testing
313
deferral and an ascending integer count otherwise.
318
if self._doDeferNext:
319
self._doDeferNext = False
323
elif self._doStopNext:
325
elif self._doDieNext:
326
raise UnhandledException()
334
Drop references to interesting parts of the fixture to allow Deferred
335
errors to be noticed when things start failing.
343
Defer the next result from my worker iterator.
345
self._doDeferNext = True
350
Make the next result from my worker iterator be completion (raising
353
self._doStopNext = True
358
Make the next result from my worker iterator be raising an
359
L{UnhandledException}.
361
def ignoreUnhandled(failure):
362
failure.trap(UnhandledException)
364
self._doDieNext = True
367
def test_pauseResume(self):
369
Cooperators should stop running their tasks when they're paused, and
370
start again when they're resumed.
372
# first, sanity check
373
self.scheduler.pump()
374
self.assertEquals(self.work, [1])
375
self.scheduler.pump()
376
self.assertEquals(self.work, [1, 2])
380
self.scheduler.pump()
381
self.assertEquals(self.work, [1, 2])
383
# Resuming itself shoult not do any work
384
self.assertEquals(self.work, [1, 2])
385
self.scheduler.pump()
386
# But when the scheduler rolls around again...
387
self.assertEquals(self.work, [1, 2, 3])
390
def test_resumeNotPaused(self):
392
L{CooperativeTask.resume} should raise a L{TaskNotPaused} exception if
393
it was not paused; e.g. if L{CooperativeTask.pause} was not invoked
394
more times than L{CooperativeTask.resume} on that object.
396
self.assertRaises(task.NotPaused, self.task.resume)
399
self.assertRaises(task.NotPaused, self.task.resume)
402
def test_pauseTwice(self):
404
Pauses on tasks should behave like a stack. If a task is paused twice,
405
it needs to be resumed twice.
409
self.scheduler.pump()
410
self.assertEquals(self.work, [])
413
self.scheduler.pump()
414
self.assertEquals(self.work, [])
415
# resume once (it shouldn't)
417
self.scheduler.pump()
418
self.assertEquals(self.work, [])
419
# resume twice (now it should go)
421
self.scheduler.pump()
422
self.assertEquals(self.work, [1])
425
def test_pauseWhileDeferred(self):
427
C{pause()}ing a task while it is waiting on an outstanding
428
L{defer.Deferred} should put the task into a state where the
429
outstanding L{defer.Deferred} must be called back I{and} the task is
430
C{resume}d before it will continue processing.
433
self.scheduler.pump()
434
self.assertEquals(len(self.work), 1)
435
self.failUnless(isinstance(self.work[0], defer.Deferred))
436
self.scheduler.pump()
437
self.assertEquals(len(self.work), 1)
439
self.scheduler.pump()
440
self.assertEquals(len(self.work), 1)
442
self.scheduler.pump()
443
self.assertEquals(len(self.work), 1)
444
self.work[0].callback("STUFF!")
445
self.scheduler.pump()
446
self.assertEquals(len(self.work), 2)
447
self.assertEquals(self.work[1], 2)
450
def test_whenDone(self):
452
L{CooperativeTask.whenDone} returns a Deferred which fires when the
453
Cooperator's iterator is exhausted. It returns a new Deferred each
454
time it is called; callbacks added to other invocations will not modify
455
the value that subsequent invocations will fire with.
458
deferred1 = self.task.whenDone()
459
deferred2 = self.task.whenDone()
465
def callbackOne(result):
466
results1.append(result)
469
def callbackTwo(result):
470
results2.append(result)
473
deferred1.addCallback(callbackOne)
474
deferred2.addCallback(callbackTwo)
476
deferred1.addCallback(final1.append)
477
deferred2.addCallback(final2.append)
479
# exhaust the task iterator
482
self.scheduler.pump()
484
self.assertEquals(len(results1), 1)
485
self.assertEquals(len(results2), 1)
487
self.assertIdentical(results1[0], self.task._iterator)
488
self.assertIdentical(results2[0], self.task._iterator)
490
self.assertEquals(final1, [1])
491
self.assertEquals(final2, [2])
494
def test_whenDoneError(self):
496
L{CooperativeTask.whenDone} returns a L{defer.Deferred} that will fail
497
when the iterable's C{next} method raises an exception, with that
500
deferred1 = self.task.whenDone()
502
deferred1.addErrback(results.append)
504
self.scheduler.pump()
505
self.assertEquals(len(results), 1)
506
self.assertEquals(results[0].check(UnhandledException), UnhandledException)
509
def test_whenDoneStop(self):
511
L{CooperativeTask.whenDone} returns a L{defer.Deferred} that fails with
512
L{TaskStopped} when the C{stop} method is called on that
515
deferred1 = self.task.whenDone()
517
deferred1.addErrback(errors.append)
519
self.assertEquals(len(errors), 1)
520
self.assertEquals(errors[0].check(task.TaskStopped), task.TaskStopped)
523
def test_whenDoneAlreadyDone(self):
525
L{CooperativeTask.whenDone} will return a L{defer.Deferred} that will
526
succeed immediately if its iterator has already completed.
529
self.scheduler.pump()
531
self.task.whenDone().addCallback(results.append)
532
self.assertEquals(results, [self.task._iterator])
535
def test_stopStops(self):
537
C{stop()}ping a task should cause it to be removed from the run just as
538
C{pause()}ing, with the distinction that C{resume()} will raise a
539
L{TaskStopped} exception.
542
self.scheduler.pump()
543
self.assertEquals(len(self.work), 0)
544
self.assertRaises(task.TaskStopped, self.task.stop)
545
self.assertRaises(task.TaskStopped, self.task.pause)
546
# Sanity check - it's still not scheduled, is it?
547
self.scheduler.pump()
548
self.assertEquals(self.work, [])
551
def test_pauseStopResume(self):
553
C{resume()}ing a paused, stopped task should be a no-op; it should not
554
raise an exception, because it's paused, but neither should it actually
555
do more work from the task.
560
self.scheduler.pump()
561
self.assertEquals(self.work, [])
564
def test_stopDeferred(self):
566
As a corrolary of the interaction of C{pause()} and C{unpause()},
567
C{stop()}ping a task which is waiting on a L{Deferred} should cause the
568
task to gracefully shut down, meaning that it should not be unpaused
569
when the deferred fires.
572
self.scheduler.pump()
574
self.assertEquals(self.task._pauseCount, 1)
576
d.addBoth(results.append)
577
self.scheduler.pump()
579
self.scheduler.pump()
581
self.scheduler.pump()
582
# Let's make sure that Deferred doesn't come out fried with an
583
# unhandled error that will be logged. The value is None, rather than
584
# our test value, 7, because this Deferred is returned to and consumed
585
# by the cooperator code. Its callback therefore has no contract.
586
self.assertEquals(results, [None])
587
# But more importantly, no further work should have happened.
588
self.assertEquals(self.work, [])
591
def test_stopExhausted(self):
593
C{stop()}ping a L{CooperativeTask} whose iterator has been exhausted
594
should raise L{TaskDone}.
597
self.scheduler.pump()
598
self.assertRaises(task.TaskDone, self.task.stop)
601
def test_stopErrored(self):
603
C{stop()}ping a L{CooperativeTask} whose iterator has encountered an
604
error should raise L{TaskFailed}.
607
self.scheduler.pump()
608
self.assertRaises(task.TaskFailed, self.task.stop)
611
def test_stopCooperatorReentrancy(self):
613
If a callback of a L{Deferred} from L{CooperativeTask.whenDone} calls
614
C{Cooperator.stop} on its L{CooperativeTask._cooperator}, the
615
L{Cooperator} will stop, but the L{CooperativeTask} whose callback is
616
calling C{stop} should already be considered 'stopped' by the time the
617
callback is running, and therefore removed from the
622
callbackPhases.append(result)
623
self.cooperator.stop()
624
# "done" here is a sanity check to make sure that we get all the
625
# way through the callback; i.e. stop() shouldn't be raising an
626
# exception due to the stopped-ness of our main task.
627
callbackPhases.append("done")
628
self.task.whenDone().addCallback(stopit)
630
self.scheduler.pump()
631
self.assertEquals(callbackPhases, [self.task._iterator, "done"])