1
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for lots of functionality provided by L{twisted.internet}.
12
from twisted.trial import unittest
13
from twisted.internet import reactor, protocol, error, abstract, defer
14
from twisted.internet import interfaces, base
17
from twisted.internet import ssl
20
if ssl and not ssl.supported:
23
from twisted.internet.defer import Deferred, maybeDeferred
24
from twisted.python import util, runtime
28
class ThreePhaseEventTests(unittest.TestCase):
30
Tests for the private implementation helpers for system event triggers.
34
Create a trigger, an argument, and an event to be used by tests.
36
self.trigger = lambda x: None
38
self.event = base._ThreePhaseEvent()
41
def test_addInvalidPhase(self):
43
L{_ThreePhaseEvent.addTrigger} should raise L{KeyError} when called
44
with an invalid phase.
48
self.event.addTrigger, 'xxx', self.trigger, self.arg)
51
def test_addBeforeTrigger(self):
53
L{_ThreePhaseEvent.addTrigger} should accept C{'before'} as a phase, a
54
callable, and some arguments and add the callable with the arguments to
57
self.event.addTrigger('before', self.trigger, self.arg)
60
[(self.trigger, (self.arg,), {})])
63
def test_addDuringTrigger(self):
65
L{_ThreePhaseEvent.addTrigger} should accept C{'during'} as a phase, a
66
callable, and some arguments and add the callable with the arguments to
69
self.event.addTrigger('during', self.trigger, self.arg)
72
[(self.trigger, (self.arg,), {})])
75
def test_addAfterTrigger(self):
77
L{_ThreePhaseEvent.addTrigger} should accept C{'after'} as a phase, a
78
callable, and some arguments and add the callable with the arguments to
81
self.event.addTrigger('after', self.trigger, self.arg)
84
[(self.trigger, (self.arg,), {})])
87
def test_removeTrigger(self):
89
L{_ThreePhaseEvent.removeTrigger} should accept an opaque object
90
previously returned by L{_ThreePhaseEvent.addTrigger} and remove the
93
handle = self.event.addTrigger('before', self.trigger, self.arg)
94
self.event.removeTrigger(handle)
95
self.assertEqual(self.event.before, [])
98
def test_removeNonexistentTrigger(self):
100
L{_ThreePhaseEvent.removeTrigger} should raise L{ValueError} when given
101
an object not previously returned by L{_ThreePhaseEvent.addTrigger}.
103
self.assertRaises(ValueError, self.event.removeTrigger, object())
106
def test_removeRemovedTrigger(self):
108
L{_ThreePhaseEvent.removeTrigger} should raise L{ValueError} the second
109
time it is called with an object returned by
110
L{_ThreePhaseEvent.addTrigger}.
112
handle = self.event.addTrigger('before', self.trigger, self.arg)
113
self.event.removeTrigger(handle)
114
self.assertRaises(ValueError, self.event.removeTrigger, handle)
117
def test_removeAlmostValidTrigger(self):
119
L{_ThreePhaseEvent.removeTrigger} should raise L{ValueError} if it is
120
given a trigger handle which resembles a valid trigger handle aside
121
from its phase being incorrect.
125
self.event.removeTrigger, ('xxx', self.trigger, (self.arg,), {}))
128
def test_fireEvent(self):
130
L{_ThreePhaseEvent.fireEvent} should call I{before}, I{during}, and
131
I{after} phase triggers in that order.
134
self.event.addTrigger('after', events.append, ('first', 'after'))
135
self.event.addTrigger('during', events.append, ('first', 'during'))
136
self.event.addTrigger('before', events.append, ('first', 'before'))
137
self.event.addTrigger('before', events.append, ('second', 'before'))
138
self.event.addTrigger('during', events.append, ('second', 'during'))
139
self.event.addTrigger('after', events.append, ('second', 'after'))
141
self.assertEqual(events, [])
142
self.event.fireEvent()
143
self.assertEqual(events,
144
[('first', 'before'), ('second', 'before'),
145
('first', 'during'), ('second', 'during'),
146
('first', 'after'), ('second', 'after')])
149
def test_asynchronousBefore(self):
151
L{_ThreePhaseEvent.fireEvent} should wait for any L{Deferred} returned
152
by a I{before} phase trigger before proceeding to I{during} events.
155
beforeResult = Deferred()
156
self.event.addTrigger('before', lambda: beforeResult)
157
self.event.addTrigger('during', events.append, 'during')
158
self.event.addTrigger('after', events.append, 'after')
160
self.assertEqual(events, [])
161
self.event.fireEvent()
162
self.assertEqual(events, [])
163
beforeResult.callback(None)
164
self.assertEqual(events, ['during', 'after'])
167
def test_beforeTriggerException(self):
169
If a before-phase trigger raises a synchronous exception, it should be
170
logged and the remaining triggers should be run.
174
class DummyException(Exception):
177
def raisingTrigger():
178
raise DummyException()
180
self.event.addTrigger('before', raisingTrigger)
181
self.event.addTrigger('before', events.append, 'before')
182
self.event.addTrigger('during', events.append, 'during')
183
self.event.fireEvent()
184
self.assertEqual(events, ['before', 'during'])
185
errors = self.flushLoggedErrors(DummyException)
186
self.assertEqual(len(errors), 1)
189
def test_duringTriggerException(self):
191
If a during-phase trigger raises a synchronous exception, it should be
192
logged and the remaining triggers should be run.
196
class DummyException(Exception):
199
def raisingTrigger():
200
raise DummyException()
202
self.event.addTrigger('during', raisingTrigger)
203
self.event.addTrigger('during', events.append, 'during')
204
self.event.addTrigger('after', events.append, 'after')
205
self.event.fireEvent()
206
self.assertEqual(events, ['during', 'after'])
207
errors = self.flushLoggedErrors(DummyException)
208
self.assertEqual(len(errors), 1)
211
def test_synchronousRemoveAlreadyExecutedBefore(self):
213
If a before-phase trigger tries to remove another before-phase trigger
214
which has already run, a warning should be emitted.
219
self.event.removeTrigger(beforeHandle)
221
beforeHandle = self.event.addTrigger('before', events.append, ('first', 'before'))
222
self.event.addTrigger('before', removeTrigger)
223
self.event.addTrigger('before', events.append, ('second', 'before'))
226
"Removing already-fired system event triggers will raise an "
227
"exception in a future version of Twisted.",
229
self.event.fireEvent)
230
self.assertEqual(events, [('first', 'before'), ('second', 'before')])
233
def test_synchronousRemovePendingBefore(self):
235
If a before-phase trigger removes another before-phase trigger which
236
has not yet run, the removed trigger should not be run.
239
self.event.addTrigger(
240
'before', lambda: self.event.removeTrigger(beforeHandle))
241
beforeHandle = self.event.addTrigger(
242
'before', events.append, ('first', 'before'))
243
self.event.addTrigger('before', events.append, ('second', 'before'))
244
self.event.fireEvent()
245
self.assertEqual(events, [('second', 'before')])
248
def test_synchronousBeforeRemovesDuring(self):
250
If a before-phase trigger removes a during-phase trigger, the
251
during-phase trigger should not be run.
254
self.event.addTrigger(
255
'before', lambda: self.event.removeTrigger(duringHandle))
256
duringHandle = self.event.addTrigger('during', events.append, 'during')
257
self.event.addTrigger('after', events.append, 'after')
258
self.event.fireEvent()
259
self.assertEqual(events, ['after'])
262
def test_asynchronousBeforeRemovesDuring(self):
264
If a before-phase trigger returns a L{Deferred} and later removes a
265
during-phase trigger before the L{Deferred} fires, the during-phase
266
trigger should not be run.
269
beforeResult = Deferred()
270
self.event.addTrigger('before', lambda: beforeResult)
271
duringHandle = self.event.addTrigger('during', events.append, 'during')
272
self.event.addTrigger('after', events.append, 'after')
273
self.event.fireEvent()
274
self.event.removeTrigger(duringHandle)
275
beforeResult.callback(None)
276
self.assertEqual(events, ['after'])
279
def test_synchronousBeforeRemovesConspicuouslySimilarDuring(self):
281
If a before-phase trigger removes a during-phase trigger which is
282
identical to an already-executed before-phase trigger aside from their
283
phases, no warning should be emitted and the during-phase trigger
288
events.append('trigger')
289
self.event.addTrigger('before', trigger)
290
self.event.addTrigger(
291
'before', lambda: self.event.removeTrigger(duringTrigger))
292
duringTrigger = self.event.addTrigger('during', trigger)
293
self.event.fireEvent()
294
self.assertEqual(events, ['trigger'])
297
def test_synchronousRemovePendingDuring(self):
299
If a during-phase trigger removes another during-phase trigger which
300
has not yet run, the removed trigger should not be run.
303
self.event.addTrigger(
304
'during', lambda: self.event.removeTrigger(duringHandle))
305
duringHandle = self.event.addTrigger(
306
'during', events.append, ('first', 'during'))
307
self.event.addTrigger(
308
'during', events.append, ('second', 'during'))
309
self.event.fireEvent()
310
self.assertEqual(events, [('second', 'during')])
313
def test_triggersRunOnce(self):
315
A trigger should only be called on the first call to
316
L{_ThreePhaseEvent.fireEvent}.
319
self.event.addTrigger('before', events.append, 'before')
320
self.event.addTrigger('during', events.append, 'during')
321
self.event.addTrigger('after', events.append, 'after')
322
self.event.fireEvent()
323
self.event.fireEvent()
324
self.assertEqual(events, ['before', 'during', 'after'])
327
def test_finishedBeforeTriggersCleared(self):
329
The temporary list L{_ThreePhaseEvent.finishedBefore} should be emptied
330
and the state reset to C{'BASE'} before the first during-phase trigger
335
events.append('during')
336
self.assertEqual(self.event.finishedBefore, [])
337
self.assertEqual(self.event.state, 'BASE')
338
self.event.addTrigger('before', events.append, 'before')
339
self.event.addTrigger('during', duringTrigger)
340
self.event.fireEvent()
341
self.assertEqual(events, ['before', 'during'])
345
class SystemEventTestCase(unittest.TestCase):
347
Tests for the reactor's implementation of the C{fireSystemEvent},
348
C{addSystemEventTrigger}, and C{removeSystemEventTrigger} methods of the
349
L{IReactorCore} interface.
351
@ivar triggers: A list of the handles to triggers which have been added to
356
Create an empty list in which to store trigger handles.
363
Remove all remaining triggers from the reactor.
366
trigger = self.triggers.pop()
368
reactor.removeSystemEventTrigger(trigger)
369
except (ValueError, KeyError):
373
def addTrigger(self, event, phase, func):
375
Add a trigger to the reactor and remember it in C{self.triggers}.
377
t = reactor.addSystemEventTrigger(event, phase, func)
378
self.triggers.append(t)
382
def removeTrigger(self, trigger):
384
Remove a trigger by its handle from the reactor and from
387
reactor.removeSystemEventTrigger(trigger)
388
self.triggers.remove(trigger)
391
def _addSystemEventTriggerTest(self, phase):
396
self.addTrigger(phase, eventType, trigger)
397
self.assertEqual(events, [])
398
reactor.fireSystemEvent(eventType)
399
self.assertEqual(events, [None])
402
def test_beforePhase(self):
404
L{IReactorCore.addSystemEventTrigger} should accept the C{'before'}
405
phase and not call the given object until the right event is fired.
407
self._addSystemEventTriggerTest('before')
410
def test_duringPhase(self):
412
L{IReactorCore.addSystemEventTrigger} should accept the C{'during'}
413
phase and not call the given object until the right event is fired.
415
self._addSystemEventTriggerTest('during')
418
def test_afterPhase(self):
420
L{IReactorCore.addSystemEventTrigger} should accept the C{'after'}
421
phase and not call the given object until the right event is fired.
423
self._addSystemEventTriggerTest('after')
426
def test_unknownPhase(self):
428
L{IReactorCore.addSystemEventTrigger} should reject phases other than
429
C{'before'}, C{'during'}, or C{'after'}.
433
KeyError, self.addTrigger, 'xxx', eventType, lambda: None)
436
def test_beforePreceedsDuring(self):
438
L{IReactorCore.addSystemEventTrigger} should call triggers added to the
439
C{'before'} phase before it calls triggers added to the C{'during'}
445
events.append('before')
447
events.append('during')
448
self.addTrigger('before', eventType, beforeTrigger)
449
self.addTrigger('during', eventType, duringTrigger)
450
self.assertEqual(events, [])
451
reactor.fireSystemEvent(eventType)
452
self.assertEqual(events, ['before', 'during'])
455
def test_duringPreceedsAfter(self):
457
L{IReactorCore.addSystemEventTrigger} should call triggers added to the
458
C{'during'} phase before it calls triggers added to the C{'after'}
464
events.append('during')
466
events.append('after')
467
self.addTrigger('during', eventType, duringTrigger)
468
self.addTrigger('after', eventType, afterTrigger)
469
self.assertEqual(events, [])
470
reactor.fireSystemEvent(eventType)
471
self.assertEqual(events, ['during', 'after'])
474
def test_beforeReturnsDeferred(self):
476
If a trigger added to the C{'before'} phase of an event returns a
477
L{Deferred}, the C{'during'} phase should be delayed until it is called
480
triggerDeferred = Deferred()
484
return triggerDeferred
486
events.append('during')
487
self.addTrigger('before', eventType, beforeTrigger)
488
self.addTrigger('during', eventType, duringTrigger)
489
self.assertEqual(events, [])
490
reactor.fireSystemEvent(eventType)
491
self.assertEqual(events, [])
492
triggerDeferred.callback(None)
493
self.assertEqual(events, ['during'])
496
def test_multipleBeforeReturnDeferred(self):
498
If more than one trigger added to the C{'before'} phase of an event
499
return L{Deferred}s, the C{'during'} phase should be delayed until they
502
firstDeferred = Deferred()
503
secondDeferred = Deferred()
506
def firstBeforeTrigger():
508
def secondBeforeTrigger():
509
return secondDeferred
511
events.append('during')
512
self.addTrigger('before', eventType, firstBeforeTrigger)
513
self.addTrigger('before', eventType, secondBeforeTrigger)
514
self.addTrigger('during', eventType, duringTrigger)
515
self.assertEqual(events, [])
516
reactor.fireSystemEvent(eventType)
517
self.assertEqual(events, [])
518
firstDeferred.callback(None)
519
self.assertEqual(events, [])
520
secondDeferred.callback(None)
521
self.assertEqual(events, ['during'])
524
def test_subsequentBeforeTriggerFiresPriorBeforeDeferred(self):
526
If a trigger added to the C{'before'} phase of an event calls back a
527
L{Deferred} returned by an earlier trigger in the C{'before'} phase of
528
the same event, the remaining C{'before'} triggers for that event
529
should be run and any further L{Deferred}s waited on before proceeding
530
to the C{'during'} events.
534
firstDeferred = Deferred()
535
secondDeferred = Deferred()
536
def firstBeforeTrigger():
538
def secondBeforeTrigger():
539
firstDeferred.callback(None)
540
def thirdBeforeTrigger():
541
events.append('before')
542
return secondDeferred
544
events.append('during')
545
self.addTrigger('before', eventType, firstBeforeTrigger)
546
self.addTrigger('before', eventType, secondBeforeTrigger)
547
self.addTrigger('before', eventType, thirdBeforeTrigger)
548
self.addTrigger('during', eventType, duringTrigger)
549
self.assertEqual(events, [])
550
reactor.fireSystemEvent(eventType)
551
self.assertEqual(events, ['before'])
552
secondDeferred.callback(None)
553
self.assertEqual(events, ['before', 'during'])
556
def test_removeSystemEventTrigger(self):
558
A trigger removed with L{IReactorCore.removeSystemEventTrigger} should
559
not be called when the event fires.
563
def firstBeforeTrigger():
564
events.append('first')
565
def secondBeforeTrigger():
566
events.append('second')
567
self.addTrigger('before', eventType, firstBeforeTrigger)
569
self.addTrigger('before', eventType, secondBeforeTrigger))
570
self.assertEqual(events, [])
571
reactor.fireSystemEvent(eventType)
572
self.assertEqual(events, ['first'])
575
def test_removeNonExistentSystemEventTrigger(self):
577
Passing an object to L{IReactorCore.removeSystemEventTrigger} which was
578
not returned by a previous call to
579
L{IReactorCore.addSystemEventTrigger} or which has already been passed
580
to C{removeSystemEventTrigger} should result in L{TypeError},
581
L{KeyError}, or L{ValueError} being raised.
583
b = self.addTrigger('during', 'test', lambda: None)
584
self.removeTrigger(b)
586
TypeError, reactor.removeSystemEventTrigger, None)
588
ValueError, reactor.removeSystemEventTrigger, b)
591
reactor.removeSystemEventTrigger,
592
(b[0], ('xxx',) + b[1][1:]))
595
def test_interactionBetweenDifferentEvents(self):
597
L{IReactorCore.fireSystemEvent} should behave the same way for a
598
particular system event regardless of whether Deferreds are being
599
waited on for a different system event.
603
firstEvent = 'first-event'
604
firstDeferred = Deferred()
605
def beforeFirstEvent():
606
events.append(('before', 'first'))
608
def afterFirstEvent():
609
events.append(('after', 'first'))
611
secondEvent = 'second-event'
612
secondDeferred = Deferred()
613
def beforeSecondEvent():
614
events.append(('before', 'second'))
615
return secondDeferred
616
def afterSecondEvent():
617
events.append(('after', 'second'))
619
self.addTrigger('before', firstEvent, beforeFirstEvent)
620
self.addTrigger('after', firstEvent, afterFirstEvent)
621
self.addTrigger('before', secondEvent, beforeSecondEvent)
622
self.addTrigger('after', secondEvent, afterSecondEvent)
624
self.assertEqual(events, [])
626
# After this, firstEvent should be stuck before 'during' waiting for
628
reactor.fireSystemEvent(firstEvent)
629
self.assertEqual(events, [('before', 'first')])
631
# After this, secondEvent should be stuck before 'during' waiting for
633
reactor.fireSystemEvent(secondEvent)
634
self.assertEqual(events, [('before', 'first'), ('before', 'second')])
636
# After this, firstEvent should have finished completely, but
637
# secondEvent should be at the same place.
638
firstDeferred.callback(None)
639
self.assertEqual(events, [('before', 'first'), ('before', 'second'),
642
# After this, secondEvent should have finished completely.
643
secondDeferred.callback(None)
644
self.assertEqual(events, [('before', 'first'), ('before', 'second'),
645
('after', 'first'), ('after', 'second')])
649
class TimeTestCase(unittest.TestCase):
651
Tests for the IReactorTime part of the reactor.
655
def test_seconds(self):
657
L{twisted.internet.reactor.seconds} should return something
660
1. This test specifically does not assert any relation to the
661
"system time" as returned by L{time.time} or
662
L{twisted.python.runtime.seconds}, because at some point we
663
may find a better option for scheduling calls than
665
2. This test *also* does not assert anything about the type of
666
the result, because operations may not return ints or
667
floats: For example, datetime-datetime == timedelta(0).
669
now = reactor.seconds()
670
self.assertEquals(now-now+now, now)
673
def test_callLaterUsesReactorSecondsInDelayedCall(self):
675
L{reactor.callLater} should use the reactor's seconds factory
676
to produce the time at which the DelayedCall will be called.
678
oseconds = reactor.seconds
679
reactor.seconds = lambda: 100
681
call = reactor.callLater(5, lambda: None)
682
self.assertEquals(call.getTime(), 105)
684
reactor.seconds = oseconds
687
def test_callLaterUsesReactorSecondsAsDelayedCallSecondsFactory(self):
689
L{reactor.callLater} should propagate its own seconds factory
690
to the DelayedCall to use as its own seconds factory.
692
oseconds = reactor.seconds
693
reactor.seconds = lambda: 100
695
call = reactor.callLater(5, lambda: None)
696
self.assertEquals(call.seconds(), 100)
698
reactor.seconds = oseconds
701
def test_callLater(self):
703
Test that a DelayedCall really calls the function it is
707
reactor.callLater(0, d.callback, None)
708
d.addCallback(self.assertEqual, None)
712
def test_cancelDelayedCall(self):
714
Test that when a DelayedCall is cancelled it does not run.
719
call = reactor.callLater(0, function)
722
# Schedule a call in two "iterations" to check to make sure that the
723
# above call never ran.
727
self.assertEqual(called, [])
732
reactor.callLater(0, reactor.callLater, 0, check)
736
def test_cancelCancelledDelayedCall(self):
738
Test that cancelling a DelayedCall which has already been cancelled
739
raises the appropriate exception.
741
call = reactor.callLater(0, lambda: None)
743
self.assertRaises(error.AlreadyCancelled, call.cancel)
746
def test_cancelCalledDelayedCallSynchronous(self):
748
Test that cancelling a DelayedCall in the DelayedCall's function as
749
that function is being invoked by the DelayedCall raises the
750
appropriate exception.
755
self.assertRaises(error.AlreadyCalled, call.cancel)
760
call = reactor.callLater(0, later)
764
def test_cancelCalledDelayedCallAsynchronous(self):
766
Test that cancelling a DelayedCall after it has run its function
767
raises the appropriate exception.
772
self.assertRaises(error.AlreadyCalled, call.cancel)
778
reactor.callLater(0, check)
779
call = reactor.callLater(0, later)
783
def testCallLaterTime(self):
784
d = reactor.callLater(10, lambda: None)
786
self.failUnless(d.getTime() - (time.time() + 10) < 1)
790
def testCallLaterOrder(self):
798
self.assertEquals(l, range(20))
800
self.assertEquals(l2, range(10))
803
reactor.callLater(0, f, n)
805
reactor.callLater(0, f, n+10)
806
reactor.callLater(0.1, f2, n)
808
reactor.callLater(0, done)
809
reactor.callLater(0.1, done2)
811
reactor.callLater(0.2, d.callback, None)
814
testCallLaterOrder.todo = "See bug 1396"
815
testCallLaterOrder.skip = "Trial bug, todo doesn't work! See bug 1397"
816
def testCallLaterOrder2(self):
817
# This time destroy the clock resolution so that it fails reliably
818
# even on systems that don't have a crappy clock resolution.
821
return int(time.time())
823
base_original = base.seconds
824
runtime_original = runtime.seconds
825
base.seconds = seconds
826
runtime.seconds = seconds
829
runtime.seconds = runtime_original
830
base.seconds = base_original
832
return maybeDeferred(self.testCallLaterOrder).addBoth(cleanup)
834
testCallLaterOrder2.todo = "See bug 1396"
835
testCallLaterOrder2.skip = "Trial bug, todo doesn't work! See bug 1397"
837
def testDelayedCallStringification(self):
838
# Mostly just make sure str() isn't going to raise anything for
839
# DelayedCalls within reason.
840
dc = reactor.callLater(0, lambda x, y: None, 'x', y=10)
847
dc = reactor.callLater(0, lambda: None, x=[({'hello': u'world'}, 10j), reactor], *range(10))
852
def calledBack(ignored):
854
d = Deferred().addCallback(calledBack)
855
dc = reactor.callLater(0, d.callback, None)
860
def testDelayedCallSecondsOverride(self):
862
Test that the C{seconds} argument to DelayedCall gets used instead of
863
the default timing function, if it is not None.
867
dc = base.DelayedCall(5, lambda: None, (), {}, lambda dc: None,
868
lambda dc: None, seconds)
869
self.assertEquals(dc.getTime(), 5)
871
self.assertEquals(dc.getTime(), 13)
874
class CallFromThreadTests(unittest.TestCase):
875
def testWakeUp(self):
876
# Make sure other threads can wake up the reactor
880
# callFromThread will call wakeUp for us
881
reactor.callFromThread(d.callback, None)
882
reactor.callInThread(wake)
885
if interfaces.IReactorThreads(reactor, None) is None:
886
testWakeUp.skip = "Nothing to wake up for without thread support"
888
def _stopCallFromThreadCallback(self):
891
def _callFromThreadCallback(self, d):
892
reactor.callFromThread(self._callFromThreadCallback2, d)
893
reactor.callLater(0, self._stopCallFromThreadCallback)
895
def _callFromThreadCallback2(self, d):
897
self.assert_(self.stopped)
899
# Send the error to the deferred
904
def testCallFromThreadStops(self):
906
Ensure that callFromThread from inside a callFromThread
907
callback doesn't sit in an infinite loop and lets other
912
reactor.callFromThread(self._callFromThreadCallback, d)
916
class DelayedTestCase(unittest.TestCase):
921
self.deferred = defer.Deferred()
924
for t in self.timers.values():
927
def checkTimers(self):
928
l1 = self.timers.values()
929
l2 = list(reactor.getDelayedCalls())
931
# There should be at least the calls we put in. There may be other
932
# calls that are none of our business and that we should ignore,
941
self.failIf(missing, "Should have been missing no calls, instead was missing " + repr(missing))
943
def callback(self, tag):
947
def addCallback(self, tag):
949
self.addTimer(15, self.callback)
954
self.deferred.callback(None)
956
def addTimer(self, when, callback):
957
self.timers[self.counter] = reactor.callLater(when * 0.01, callback,
962
def testGetDelayedCalls(self):
963
if not hasattr(reactor, "getDelayedCalls"):
965
# This is not a race because we don't do anything which might call
966
# the reactor until we have all the timers set up. If we did, this
967
# test might fail on slow systems.
969
self.addTimer(35, self.done)
970
self.addTimer(20, self.callback)
971
self.addTimer(30, self.callback)
973
self.addTimer(29, self.callback)
974
self.addTimer(25, self.addCallback)
975
self.addTimer(26, self.callback)
977
self.timers[which].cancel()
978
del self.timers[which]
981
self.deferred.addCallback(lambda x : self.checkTimers())
985
def test_active(self):
987
L{IDelayedCall.active} returns False once the call has run.
989
dcall = reactor.callLater(0.01, self.deferred.callback, True)
990
self.assertEquals(dcall.active(), True)
992
def checkDeferredCall(success):
993
self.assertEquals(dcall.active(), False)
996
self.deferred.addCallback(checkDeferredCall)
1002
resolve_helper = """
1004
%(reactor)s.install()
1005
from twisted.internet import reactor
1009
reactor.callWhenRunning(self.start)
1010
self.timer = reactor.callLater(3, self.failed)
1012
reactor.resolve('localhost').addBoth(self.done)
1013
def done(self, res):
1024
class ChildResolveProtocol(protocol.ProcessProtocol):
1025
def __init__(self, onCompletion):
1026
self.onCompletion = onCompletion
1028
def connectionMade(self):
1032
def outReceived(self, out):
1033
self.output.append(out)
1035
def errReceived(self, err):
1036
self.error.append(err)
1038
def processEnded(self, reason):
1039
self.onCompletion.callback((reason, self.output, self.error))
1040
self.onCompletion = None
1043
class Resolve(unittest.TestCase):
1044
def testChildResolve(self):
1045
# I've seen problems with reactor.run under gtk2reactor. Spawn a
1046
# child which just does reactor.resolve after the reactor has
1047
# started, fail if it does not complete in a timely fashion.
1048
helperPath = os.path.abspath(self.mktemp())
1049
helperFile = open(helperPath, 'w')
1052
reactorName = reactor.__module__
1054
helperFile.write(resolve_helper % {'reactor': reactorName})
1057
env = os.environ.copy()
1058
env['PYTHONPATH'] = os.pathsep.join(sys.path)
1060
helperDeferred = Deferred()
1061
helperProto = ChildResolveProtocol(helperDeferred)
1063
reactor.spawnProcess(helperProto, sys.executable, ("python", "-u", helperPath), env)
1065
def cbFinished((reason, output, error)):
1066
# If the output is "done 127.0.0.1\n" we don't really care what
1068
output = ''.join(output)
1069
if output != 'done 127.0.0.1\n':
1071
"The child process failed to produce the desired results:\n"
1072
" Reason for termination was: %r\n"
1073
" Output stream was: %r\n"
1074
" Error stream was: %r\n") % (reason.getErrorMessage(), output, ''.join(error)))
1076
helperDeferred.addCallback(cbFinished)
1077
return helperDeferred
1079
if not interfaces.IReactorProcess(reactor, None):
1080
Resolve.skip = "cannot run test: reactor doesn't support IReactorProcess"
1084
class CallFromThreadTestCase(unittest.TestCase):
1086
Task scheduling from threads tests.
1088
if interfaces.IReactorThreads(reactor, None) is None:
1089
skip = "Nothing to test without thread support"
1093
self.deferred = Deferred()
1096
def schedule(self, *args, **kwargs):
1098
Override in subclasses.
1100
reactor.callFromThread(*args, **kwargs)
1103
def test_lotsOfThreadsAreScheduledCorrectly(self):
1105
L{IReactorThreads.callFromThread} can be used to schedule a large
1106
number of calls in the reactor thread.
1108
def addAndMaybeFinish():
1110
if self.counter == 100:
1111
self.deferred.callback(True)
1113
for i in xrange(100):
1114
self.schedule(addAndMaybeFinish)
1116
return self.deferred
1119
def test_threadsAreRunInScheduledOrder(self):
1121
Callbacks should be invoked in the order they were scheduled.
1126
self.assertEquals(order, [1, 2, 3])
1128
self.deferred.addCallback(check)
1129
self.schedule(order.append, 1)
1130
self.schedule(order.append, 2)
1131
self.schedule(order.append, 3)
1132
self.schedule(reactor.callFromThread, self.deferred.callback, None)
1134
return self.deferred
1137
def test_scheduledThreadsNotRunUntilReactorRuns(self):
1139
Scheduled tasks should not be run until the reactor starts running.
1143
self.deferred.callback(True)
1144
self.schedule(incAndFinish)
1146
# Callback shouldn't have fired yet.
1147
self.assertEquals(self.counter, 0)
1149
return self.deferred
1153
class MyProtocol(protocol.Protocol):
1158
class MyFactory(protocol.Factory):
1163
protocol = MyProtocol
1166
class ProtocolTestCase(unittest.TestCase):
1168
def testFactory(self):
1169
factory = MyFactory()
1170
protocol = factory.buildProtocol(None)
1171
self.assertEquals(protocol.factory, factory)
1172
self.assert_( isinstance(protocol, factory.protocol) )
1175
class DummyProducer(object):
1177
Very uninteresting producer implementation used by tests to ensure the
1178
right methods are called by the consumer with which it is registered.
1180
@type events: C{list} of C{str}
1181
@ivar events: The producer/consumer related events which have happened to
1182
this producer. Strings in this list may be C{'resume'}, C{'stop'}, or
1183
C{'pause'}. Elements are added as they occur.
1190
def resumeProducing(self):
1191
self.events.append('resume')
1194
def stopProducing(self):
1195
self.events.append('stop')
1198
def pauseProducing(self):
1199
self.events.append('pause')
1203
class SillyDescriptor(abstract.FileDescriptor):
1205
A descriptor whose data buffer gets filled very fast.
1207
Useful for testing FileDescriptor's IConsumer interface, since
1208
the data buffer fills as soon as at least four characters are
1209
written to it, and gets emptied in a single doWrite() cycle.
1214
def writeSomeData(self, data):
1216
Always write all data.
1221
def startWriting(self):
1223
Do nothing: bypass the reactor.
1225
stopWriting = startWriting
1229
class ReentrantProducer(DummyProducer):
1231
Similar to L{DummyProducer}, but with a resumeProducing method which calls
1232
back into an L{IConsumer} method of the consumer against which it is
1235
@ivar consumer: The consumer with which this producer has been or will
1238
@ivar methodName: The name of the method to call on the consumer inside
1241
@ivar methodArgs: The arguments to pass to the consumer method invoked in
1244
def __init__(self, consumer, methodName, *methodArgs):
1245
super(ReentrantProducer, self).__init__()
1246
self.consumer = consumer
1247
self.methodName = methodName
1248
self.methodArgs = methodArgs
1251
def resumeProducing(self):
1252
super(ReentrantProducer, self).resumeProducing()
1253
getattr(self.consumer, self.methodName)(*self.methodArgs)
1257
class TestProducer(unittest.TestCase):
1259
Test abstract.FileDescriptor's consumer interface.
1261
def test_doubleProducer(self):
1263
Verify that registering a non-streaming producer invokes its
1264
resumeProducing() method and that you can only register one producer
1267
fd = abstract.FileDescriptor()
1269
dp = DummyProducer()
1270
fd.registerProducer(dp, 0)
1271
self.assertEquals(dp.events, ['resume'])
1272
self.assertRaises(RuntimeError, fd.registerProducer, DummyProducer(), 0)
1275
def test_unconnectedFileDescriptor(self):
1277
Verify that registering a producer when the connection has already
1278
been closed invokes its stopProducing() method.
1280
fd = abstract.FileDescriptor()
1282
dp = DummyProducer()
1283
fd.registerProducer(dp, 0)
1284
self.assertEquals(dp.events, ['stop'])
1287
def _dontPausePullConsumerTest(self, methodName):
1288
descriptor = SillyDescriptor()
1289
producer = DummyProducer()
1290
descriptor.registerProducer(producer, streaming=False)
1291
self.assertEqual(producer.events, ['resume'])
1292
del producer.events[:]
1294
# Fill up the descriptor's write buffer so we can observe whether or
1295
# not it pauses its producer in that case.
1296
getattr(descriptor, methodName)('1234')
1298
self.assertEqual(producer.events, [])
1301
def test_dontPausePullConsumerOnWrite(self):
1303
Verify that FileDescriptor does not call producer.pauseProducing() on a
1304
non-streaming pull producer in response to a L{IConsumer.write} call
1305
which results in a full write buffer. Issue #2286.
1307
return self._dontPausePullConsumerTest('write')
1310
def test_dontPausePullConsumerOnWriteSequence(self):
1312
Like L{test_dontPausePullConsumerOnWrite}, but for a call to
1313
C{writeSequence} rather than L{IConsumer.write}.
1315
C{writeSequence} is not part of L{IConsumer}, but
1316
L{abstract.FileDescriptor} has supported consumery behavior in response
1317
to calls to L{writeSequence} forever.
1319
return self._dontPausePullConsumerTest('writeSequence')
1322
def _reentrantStreamingProducerTest(self, methodName):
1323
descriptor = SillyDescriptor()
1324
producer = ReentrantProducer(descriptor, methodName, 'spam')
1325
descriptor.registerProducer(producer, streaming=True)
1327
# Start things off by filling up the descriptor's buffer so it will
1328
# pause its producer.
1329
getattr(descriptor, methodName)('spam')
1331
# Sanity check - make sure that worked.
1332
self.assertEqual(producer.events, ['pause'])
1333
del producer.events[:]
1335
# After one call to doWrite, the buffer has been emptied so the
1336
# FileDescriptor should resume its producer. That will result in an
1337
# immediate call to FileDescriptor.write which will again fill the
1338
# buffer and result in the producer being paused.
1339
descriptor.doWrite()
1340
self.assertEqual(producer.events, ['resume', 'pause'])
1341
del producer.events[:]
1343
# After a second call to doWrite, the exact same thing should have
1344
# happened. Prior to the bugfix for which this test was written,
1345
# FileDescriptor would have incorrectly believed its producer was
1346
# already resumed (it was paused) and so not resume it again.
1347
descriptor.doWrite()
1348
self.assertEqual(producer.events, ['resume', 'pause'])
1351
def test_reentrantStreamingProducerUsingWrite(self):
1353
Verify that FileDescriptor tracks producer's paused state correctly.
1354
Issue #811, fixed in revision r12857.
1356
return self._reentrantStreamingProducerTest('write')
1359
def test_reentrantStreamingProducerUsingWriteSequence(self):
1361
Like L{test_reentrantStreamingProducerUsingWrite}, but for calls to
1364
C{writeSequence} is B{not} part of L{IConsumer}, however
1365
C{abstract.FileDescriptor} has supported consumery behavior in response
1366
to calls to C{writeSequence} forever.
1368
return self._reentrantStreamingProducerTest('writeSequence')
1372
class PortStringification(unittest.TestCase):
1373
if interfaces.IReactorTCP(reactor, None) is not None:
1375
p = reactor.listenTCP(0, protocol.ServerFactory())
1376
portNo = p.getHost().port
1377
self.assertNotEqual(str(p).find(str(portNo)), -1,
1378
"%d not found in %s" % (portNo, p))
1379
return p.stopListening()
1381
if interfaces.IReactorUDP(reactor, None) is not None:
1383
p = reactor.listenUDP(0, protocol.DatagramProtocol())
1384
portNo = p.getHost().port
1385
self.assertNotEqual(str(p).find(str(portNo)), -1,
1386
"%d not found in %s" % (portNo, p))
1387
return p.stopListening()
1389
if interfaces.IReactorSSL(reactor, None) is not None and ssl:
1390
def testSSL(self, ssl=ssl):
1391
pem = util.sibpath(__file__, 'server.pem')
1392
p = reactor.listenSSL(0, protocol.ServerFactory(), ssl.DefaultOpenSSLContextFactory(pem, pem))
1393
portNo = p.getHost().port
1394
self.assertNotEqual(str(p).find(str(portNo)), -1,
1395
"%d not found in %s" % (portNo, p))
1396
return p.stopListening()