~vishvananda/nova/network-refactor

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/test/test_cooperator.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
This module contains tests for L{twisted.internet.task.Cooperator} and
 
6
related functionality.
 
7
"""
 
8
 
 
9
from twisted.internet import reactor, defer, task
 
10
from twisted.trial import unittest
 
11
 
 
12
 
 
13
 
 
14
class FakeDelayedCall(object):
 
15
    """
 
16
    Fake delayed call which lets us simulate the scheduler.
 
17
    """
 
18
    def __init__(self, func):
 
19
        """
 
20
        A function to run, later.
 
21
        """
 
22
        self.func = func
 
23
        self.cancelled = False
 
24
 
 
25
 
 
26
    def cancel(self):
 
27
        """
 
28
        Don't run my function later.
 
29
        """
 
30
        self.cancelled = True
 
31
 
 
32
 
 
33
 
 
34
class FakeScheduler(object):
 
35
    """
 
36
    A fake scheduler for testing against.
 
37
    """
 
38
    def __init__(self):
 
39
        """
 
40
        Create a fake scheduler with a list of work to do.
 
41
        """
 
42
        self.work = []
 
43
 
 
44
 
 
45
    def __call__(self, thunk):
 
46
        """
 
47
        Schedule a unit of work to be done later.
 
48
        """
 
49
        unit = FakeDelayedCall(thunk)
 
50
        self.work.append(unit)
 
51
        return unit
 
52
 
 
53
 
 
54
    def pump(self):
 
55
        """
 
56
        Do all of the work that is currently available to be done.
 
57
        """
 
58
        work, self.work = self.work, []
 
59
        for unit in work:
 
60
            if not unit.cancelled:
 
61
                unit.func()
 
62
 
 
63
 
 
64
 
 
65
class TestCooperator(unittest.TestCase):
 
66
    RESULT = 'done'
 
67
 
 
68
    def ebIter(self, err):
 
69
        err.trap(task.SchedulerStopped)
 
70
        return self.RESULT
 
71
 
 
72
 
 
73
    def cbIter(self, ign):
 
74
        self.fail()
 
75
 
 
76
 
 
77
    def testStoppedRejectsNewTasks(self):
 
78
        """
 
79
        Test that Cooperators refuse new tasks when they have been stopped.
 
80
        """
 
81
        def testwith(stuff):
 
82
            c = task.Cooperator()
 
83
            c.stop()
 
84
            d = c.coiterate(iter(()), stuff)
 
85
            d.addCallback(self.cbIter)
 
86
            d.addErrback(self.ebIter)
 
87
            return d.addCallback(lambda result:
 
88
                                 self.assertEquals(result, self.RESULT))
 
89
        return testwith(None).addCallback(lambda ign: testwith(defer.Deferred()))
 
90
 
 
91
 
 
92
    def testStopRunning(self):
 
93
        """
 
94
        Test that a running iterator will not run to completion when the
 
95
        cooperator is stopped.
 
96
        """
 
97
        c = task.Cooperator()
 
98
        def myiter():
 
99
            for myiter.value in range(3):
 
100
                yield myiter.value
 
101
        myiter.value = -1
 
102
        d = c.coiterate(myiter())
 
103
        d.addCallback(self.cbIter)
 
104
        d.addErrback(self.ebIter)
 
105
        c.stop()
 
106
        def doasserts(result):
 
107
            self.assertEquals(result, self.RESULT)
 
108
            self.assertEquals(myiter.value, -1)
 
109
        d.addCallback(doasserts)
 
110
        return d
 
111
 
 
112
 
 
113
    def testStopOutstanding(self):
 
114
        """
 
115
        An iterator run with L{Cooperator.coiterate} paused on a L{Deferred}
 
116
        yielded by that iterator will fire its own L{Deferred} (the one
 
117
        returned by C{coiterate}) when L{Cooperator.stop} is called.
 
118
        """
 
119
        testControlD = defer.Deferred()
 
120
        outstandingD = defer.Deferred()
 
121
        def myiter():
 
122
            reactor.callLater(0, testControlD.callback, None)
 
123
            yield outstandingD
 
124
            self.fail()
 
125
        c = task.Cooperator()
 
126
        d = c.coiterate(myiter())
 
127
        def stopAndGo(ign):
 
128
            c.stop()
 
129
            outstandingD.callback('arglebargle')
 
130
 
 
131
        testControlD.addCallback(stopAndGo)
 
132
        d.addCallback(self.cbIter)
 
133
        d.addErrback(self.ebIter)
 
134
 
 
135
        return d.addCallback(
 
136
            lambda result: self.assertEquals(result, self.RESULT))
 
137
 
 
138
 
 
139
    def testUnexpectedError(self):
 
140
        c = task.Cooperator()
 
141
        def myiter():
 
142
            if 0:
 
143
                yield None
 
144
            else:
 
145
                raise RuntimeError()
 
146
        d = c.coiterate(myiter())
 
147
        return self.assertFailure(d, RuntimeError)
 
148
 
 
149
 
 
150
    def testUnexpectedErrorActuallyLater(self):
 
151
        def myiter():
 
152
            D = defer.Deferred()
 
153
            reactor.callLater(0, D.errback, RuntimeError())
 
154
            yield D
 
155
 
 
156
        c = task.Cooperator()
 
157
        d = c.coiterate(myiter())
 
158
        return self.assertFailure(d, RuntimeError)
 
159
 
 
160
 
 
161
    def testUnexpectedErrorNotActuallyLater(self):
 
162
        def myiter():
 
163
            yield defer.fail(RuntimeError())
 
164
 
 
165
        c = task.Cooperator()
 
166
        d = c.coiterate(myiter())
 
167
        return self.assertFailure(d, RuntimeError)
 
168
 
 
169
 
 
170
    def testCooperation(self):
 
171
        L = []
 
172
        def myiter(things):
 
173
            for th in things:
 
174
                L.append(th)
 
175
                yield None
 
176
 
 
177
        groupsOfThings = ['abc', (1, 2, 3), 'def', (4, 5, 6)]
 
178
 
 
179
        c = task.Cooperator()
 
180
        tasks = []
 
181
        for stuff in groupsOfThings:
 
182
            tasks.append(c.coiterate(myiter(stuff)))
 
183
 
 
184
        return defer.DeferredList(tasks).addCallback(
 
185
            lambda ign: self.assertEquals(tuple(L), sum(zip(*groupsOfThings), ())))
 
186
 
 
187
 
 
188
    def testResourceExhaustion(self):
 
189
        output = []
 
190
        def myiter():
 
191
            for i in range(100):
 
192
                output.append(i)
 
193
                if i == 9:
 
194
                    _TPF.stopped = True
 
195
                yield i
 
196
 
 
197
        class _TPF:
 
198
            stopped = False
 
199
            def __call__(self):
 
200
                return self.stopped
 
201
 
 
202
        c = task.Cooperator(terminationPredicateFactory=_TPF)
 
203
        c.coiterate(myiter()).addErrback(self.ebIter)
 
204
        c._delayedCall.cancel()
 
205
        # testing a private method because only the test case will ever care
 
206
        # about this, so we have to carefully clean up after ourselves.
 
207
        c._tick()
 
208
        c.stop()
 
209
        self.failUnless(_TPF.stopped)
 
210
        self.assertEquals(output, range(10))
 
211
 
 
212
 
 
213
    def testCallbackReCoiterate(self):
 
214
        """
 
215
        If a callback to a deferred returned by coiterate calls coiterate on
 
216
        the same Cooperator, we should make sure to only do the minimal amount
 
217
        of scheduling work.  (This test was added to demonstrate a specific bug
 
218
        that was found while writing the scheduler.)
 
219
        """
 
220
        calls = []
 
221
 
 
222
        class FakeCall:
 
223
            def __init__(self, func):
 
224
                self.func = func
 
225
 
 
226
            def __repr__(self):
 
227
                return '<FakeCall %r>' % (self.func,)
 
228
 
 
229
        def sched(f):
 
230
            self.failIf(calls, repr(calls))
 
231
            calls.append(FakeCall(f))
 
232
            return calls[-1]
 
233
 
 
234
        c = task.Cooperator(scheduler=sched, terminationPredicateFactory=lambda: lambda: True)
 
235
        d = c.coiterate(iter(()))
 
236
 
 
237
        done = []
 
238
        def anotherTask(ign):
 
239
            c.coiterate(iter(())).addBoth(done.append)
 
240
 
 
241
        d.addCallback(anotherTask)
 
242
 
 
243
        work = 0
 
244
        while not done:
 
245
            work += 1
 
246
            while calls:
 
247
                calls.pop(0).func()
 
248
                work += 1
 
249
            if work > 50:
 
250
                self.fail("Cooperator took too long")
 
251
 
 
252
 
 
253
 
 
254
class UnhandledException(Exception):
 
255
    """
 
256
    An exception that should go unhandled.
 
257
    """
 
258
 
 
259
 
 
260
 
 
261
class AliasTests(unittest.TestCase):
 
262
    """
 
263
    Integration test to verify that the global singleton aliases do what
 
264
    they're supposed to.
 
265
    """
 
266
 
 
267
    def test_cooperate(self):
 
268
        """
 
269
        L{twisted.internet.task.cooperate} ought to run the generator that it is
 
270
        """
 
271
        d = defer.Deferred()
 
272
        def doit():
 
273
            yield 1
 
274
            yield 2
 
275
            yield 3
 
276
            d.callback("yay")
 
277
        it = doit()
 
278
        theTask = task.cooperate(it)
 
279
        self.assertIn(theTask, task._theCooperator._tasks)
 
280
        return d
 
281
 
 
282
 
 
283
 
 
284
class RunStateTests(unittest.TestCase):
 
285
    """
 
286
    Tests to verify the behavior of L{CooperativeTask.pause},
 
287
    L{CooperativeTask.resume}, L{CooperativeTask.stop}, exhausting the
 
288
    underlying iterator, and their interactions with each other.
 
289
    """
 
290
 
 
291
    def setUp(self):
 
292
        """
 
293
        Create a cooperator with a fake scheduler and a termination predicate
 
294
        that ensures only one unit of work will take place per tick.
 
295
        """
 
296
        self._doDeferNext = False
 
297
        self._doStopNext = False
 
298
        self._doDieNext = False
 
299
        self.work = []
 
300
        self.scheduler = FakeScheduler()
 
301
        self.cooperator = task.Cooperator(
 
302
            scheduler=self.scheduler,
 
303
            # Always stop after one iteration of work (return a function which
 
304
            # returns a function which always returns True)
 
305
            terminationPredicateFactory=lambda: lambda: True)
 
306
        self.task = self.cooperator.cooperate(self.worker())
 
307
        self.cooperator.start()
 
308
 
 
309
 
 
310
    def worker(self):
 
311
        """
 
312
        This is a sample generator which yields Deferreds when we are testing
 
313
        deferral and an ascending integer count otherwise.
 
314
        """
 
315
        i = 0
 
316
        while True:
 
317
            i += 1
 
318
            if self._doDeferNext:
 
319
                self._doDeferNext = False
 
320
                d = defer.Deferred()
 
321
                self.work.append(d)
 
322
                yield d
 
323
            elif self._doStopNext:
 
324
                return
 
325
            elif self._doDieNext:
 
326
                raise UnhandledException()
 
327
            else:
 
328
                self.work.append(i)
 
329
                yield i
 
330
 
 
331
 
 
332
    def tearDown(self):
 
333
        """
 
334
        Drop references to interesting parts of the fixture to allow Deferred
 
335
        errors to be noticed when things start failing.
 
336
        """
 
337
        del self.task
 
338
        del self.scheduler
 
339
 
 
340
 
 
341
    def deferNext(self):
 
342
        """
 
343
        Defer the next result from my worker iterator.
 
344
        """
 
345
        self._doDeferNext = True
 
346
 
 
347
 
 
348
    def stopNext(self):
 
349
        """
 
350
        Make the next result from my worker iterator be completion (raising
 
351
        StopIteration).
 
352
        """
 
353
        self._doStopNext = True
 
354
 
 
355
 
 
356
    def dieNext(self):
 
357
        """
 
358
        Make the next result from my worker iterator be raising an
 
359
        L{UnhandledException}.
 
360
        """
 
361
        def ignoreUnhandled(failure):
 
362
            failure.trap(UnhandledException)
 
363
            return None
 
364
        self._doDieNext = True
 
365
 
 
366
 
 
367
    def test_pauseResume(self):
 
368
        """
 
369
        Cooperators should stop running their tasks when they're paused, and
 
370
        start again when they're resumed.
 
371
        """
 
372
        # first, sanity check
 
373
        self.scheduler.pump()
 
374
        self.assertEquals(self.work, [1])
 
375
        self.scheduler.pump()
 
376
        self.assertEquals(self.work, [1, 2])
 
377
 
 
378
        # OK, now for real
 
379
        self.task.pause()
 
380
        self.scheduler.pump()
 
381
        self.assertEquals(self.work, [1, 2])
 
382
        self.task.resume()
 
383
        # Resuming itself shoult not do any work
 
384
        self.assertEquals(self.work, [1, 2])
 
385
        self.scheduler.pump()
 
386
        # But when the scheduler rolls around again...
 
387
        self.assertEquals(self.work, [1, 2, 3])
 
388
 
 
389
 
 
390
    def test_resumeNotPaused(self):
 
391
        """
 
392
        L{CooperativeTask.resume} should raise a L{TaskNotPaused} exception if
 
393
        it was not paused; e.g. if L{CooperativeTask.pause} was not invoked
 
394
        more times than L{CooperativeTask.resume} on that object.
 
395
        """
 
396
        self.assertRaises(task.NotPaused, self.task.resume)
 
397
        self.task.pause()
 
398
        self.task.resume()
 
399
        self.assertRaises(task.NotPaused, self.task.resume)
 
400
 
 
401
 
 
402
    def test_pauseTwice(self):
 
403
        """
 
404
        Pauses on tasks should behave like a stack. If a task is paused twice,
 
405
        it needs to be resumed twice.
 
406
        """
 
407
        # pause once
 
408
        self.task.pause()
 
409
        self.scheduler.pump()
 
410
        self.assertEquals(self.work, [])
 
411
        # pause twice
 
412
        self.task.pause()
 
413
        self.scheduler.pump()
 
414
        self.assertEquals(self.work, [])
 
415
        # resume once (it shouldn't)
 
416
        self.task.resume()
 
417
        self.scheduler.pump()
 
418
        self.assertEquals(self.work, [])
 
419
        # resume twice (now it should go)
 
420
        self.task.resume()
 
421
        self.scheduler.pump()
 
422
        self.assertEquals(self.work, [1])
 
423
 
 
424
 
 
425
    def test_pauseWhileDeferred(self):
 
426
        """
 
427
        C{pause()}ing a task while it is waiting on an outstanding
 
428
        L{defer.Deferred} should put the task into a state where the
 
429
        outstanding L{defer.Deferred} must be called back I{and} the task is
 
430
        C{resume}d before it will continue processing.
 
431
        """
 
432
        self.deferNext()
 
433
        self.scheduler.pump()
 
434
        self.assertEquals(len(self.work), 1)
 
435
        self.failUnless(isinstance(self.work[0], defer.Deferred))
 
436
        self.scheduler.pump()
 
437
        self.assertEquals(len(self.work), 1)
 
438
        self.task.pause()
 
439
        self.scheduler.pump()
 
440
        self.assertEquals(len(self.work), 1)
 
441
        self.task.resume()
 
442
        self.scheduler.pump()
 
443
        self.assertEquals(len(self.work), 1)
 
444
        self.work[0].callback("STUFF!")
 
445
        self.scheduler.pump()
 
446
        self.assertEquals(len(self.work), 2)
 
447
        self.assertEquals(self.work[1], 2)
 
448
 
 
449
 
 
450
    def test_whenDone(self):
 
451
        """
 
452
        L{CooperativeTask.whenDone} returns a Deferred which fires when the
 
453
        Cooperator's iterator is exhausted.  It returns a new Deferred each
 
454
        time it is called; callbacks added to other invocations will not modify
 
455
        the value that subsequent invocations will fire with.
 
456
        """
 
457
 
 
458
        deferred1 = self.task.whenDone()
 
459
        deferred2 = self.task.whenDone()
 
460
        results1 = []
 
461
        results2 = []
 
462
        final1 = []
 
463
        final2 = []
 
464
 
 
465
        def callbackOne(result):
 
466
            results1.append(result)
 
467
            return 1
 
468
 
 
469
        def callbackTwo(result):
 
470
            results2.append(result)
 
471
            return 2
 
472
 
 
473
        deferred1.addCallback(callbackOne)
 
474
        deferred2.addCallback(callbackTwo)
 
475
 
 
476
        deferred1.addCallback(final1.append)
 
477
        deferred2.addCallback(final2.append)
 
478
 
 
479
        # exhaust the task iterator
 
480
        # callbacks fire
 
481
        self.stopNext()
 
482
        self.scheduler.pump()
 
483
 
 
484
        self.assertEquals(len(results1), 1)
 
485
        self.assertEquals(len(results2), 1)
 
486
 
 
487
        self.assertIdentical(results1[0], self.task._iterator)
 
488
        self.assertIdentical(results2[0], self.task._iterator)
 
489
 
 
490
        self.assertEquals(final1, [1])
 
491
        self.assertEquals(final2, [2])
 
492
 
 
493
 
 
494
    def test_whenDoneError(self):
 
495
        """
 
496
        L{CooperativeTask.whenDone} returns a L{defer.Deferred} that will fail
 
497
        when the iterable's C{next} method raises an exception, with that
 
498
        exception.
 
499
        """
 
500
        deferred1 = self.task.whenDone()
 
501
        results = []
 
502
        deferred1.addErrback(results.append)
 
503
        self.dieNext()
 
504
        self.scheduler.pump()
 
505
        self.assertEquals(len(results), 1)
 
506
        self.assertEquals(results[0].check(UnhandledException), UnhandledException)
 
507
 
 
508
 
 
509
    def test_whenDoneStop(self):
 
510
        """
 
511
        L{CooperativeTask.whenDone} returns a L{defer.Deferred} that fails with
 
512
        L{TaskStopped} when the C{stop} method is called on that
 
513
        L{CooperativeTask}.
 
514
        """
 
515
        deferred1 = self.task.whenDone()
 
516
        errors = []
 
517
        deferred1.addErrback(errors.append)
 
518
        self.task.stop()
 
519
        self.assertEquals(len(errors), 1)
 
520
        self.assertEquals(errors[0].check(task.TaskStopped), task.TaskStopped)
 
521
 
 
522
 
 
523
    def test_whenDoneAlreadyDone(self):
 
524
        """
 
525
        L{CooperativeTask.whenDone} will return a L{defer.Deferred} that will
 
526
        succeed immediately if its iterator has already completed.
 
527
        """
 
528
        self.stopNext()
 
529
        self.scheduler.pump()
 
530
        results = []
 
531
        self.task.whenDone().addCallback(results.append)
 
532
        self.assertEquals(results, [self.task._iterator])
 
533
 
 
534
 
 
535
    def test_stopStops(self):
 
536
        """
 
537
        C{stop()}ping a task should cause it to be removed from the run just as
 
538
        C{pause()}ing, with the distinction that C{resume()} will raise a
 
539
        L{TaskStopped} exception.
 
540
        """
 
541
        self.task.stop()
 
542
        self.scheduler.pump()
 
543
        self.assertEquals(len(self.work), 0)
 
544
        self.assertRaises(task.TaskStopped, self.task.stop)
 
545
        self.assertRaises(task.TaskStopped, self.task.pause)
 
546
        # Sanity check - it's still not scheduled, is it?
 
547
        self.scheduler.pump()
 
548
        self.assertEquals(self.work, [])
 
549
 
 
550
 
 
551
    def test_pauseStopResume(self):
 
552
        """
 
553
        C{resume()}ing a paused, stopped task should be a no-op; it should not
 
554
        raise an exception, because it's paused, but neither should it actually
 
555
        do more work from the task.
 
556
        """
 
557
        self.task.pause()
 
558
        self.task.stop()
 
559
        self.task.resume()
 
560
        self.scheduler.pump()
 
561
        self.assertEquals(self.work, [])
 
562
 
 
563
 
 
564
    def test_stopDeferred(self):
 
565
        """
 
566
        As a corrolary of the interaction of C{pause()} and C{unpause()},
 
567
        C{stop()}ping a task which is waiting on a L{Deferred} should cause the
 
568
        task to gracefully shut down, meaning that it should not be unpaused
 
569
        when the deferred fires.
 
570
        """
 
571
        self.deferNext()
 
572
        self.scheduler.pump()
 
573
        d = self.work.pop()
 
574
        self.assertEquals(self.task._pauseCount, 1)
 
575
        results = []
 
576
        d.addBoth(results.append)
 
577
        self.scheduler.pump()
 
578
        self.task.stop()
 
579
        self.scheduler.pump()
 
580
        d.callback(7)
 
581
        self.scheduler.pump()
 
582
        # Let's make sure that Deferred doesn't come out fried with an
 
583
        # unhandled error that will be logged.  The value is None, rather than
 
584
        # our test value, 7, because this Deferred is returned to and consumed
 
585
        # by the cooperator code.  Its callback therefore has no contract.
 
586
        self.assertEquals(results, [None])
 
587
        # But more importantly, no further work should have happened.
 
588
        self.assertEquals(self.work, [])
 
589
 
 
590
 
 
591
    def test_stopExhausted(self):
 
592
        """
 
593
        C{stop()}ping a L{CooperativeTask} whose iterator has been exhausted
 
594
        should raise L{TaskDone}.
 
595
        """
 
596
        self.stopNext()
 
597
        self.scheduler.pump()
 
598
        self.assertRaises(task.TaskDone, self.task.stop)
 
599
 
 
600
 
 
601
    def test_stopErrored(self):
 
602
        """
 
603
        C{stop()}ping a L{CooperativeTask} whose iterator has encountered an
 
604
        error should raise L{TaskFailed}.
 
605
        """
 
606
        self.dieNext()
 
607
        self.scheduler.pump()
 
608
        self.assertRaises(task.TaskFailed, self.task.stop)
 
609
 
 
610
 
 
611
    def test_stopCooperatorReentrancy(self):
 
612
        """
 
613
        If a callback of a L{Deferred} from L{CooperativeTask.whenDone} calls
 
614
        C{Cooperator.stop} on its L{CooperativeTask._cooperator}, the
 
615
        L{Cooperator} will stop, but the L{CooperativeTask} whose callback is
 
616
        calling C{stop} should already be considered 'stopped' by the time the
 
617
        callback is running, and therefore removed from the
 
618
        L{CoooperativeTask}.
 
619
        """
 
620
        callbackPhases = []
 
621
        def stopit(result):
 
622
            callbackPhases.append(result)
 
623
            self.cooperator.stop()
 
624
            # "done" here is a sanity check to make sure that we get all the
 
625
            # way through the callback; i.e. stop() shouldn't be raising an
 
626
            # exception due to the stopped-ness of our main task.
 
627
            callbackPhases.append("done")
 
628
        self.task.whenDone().addCallback(stopit)
 
629
        self.stopNext()
 
630
        self.scheduler.pump()
 
631
        self.assertEquals(callbackPhases, [self.task._iterator, "done"])
 
632
 
 
633
 
 
634