~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/test/test_internet.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
3
3
 
4
4
 
5
5
from twisted.trial import unittest
6
 
from twisted.internet import reactor, protocol, error, app, abstract, defer
 
6
from twisted.internet import reactor, protocol, error, abstract, defer
7
7
from twisted.internet import interfaces, base
8
8
 
9
9
from twisted.test.time_helpers import Clock
15
15
if ssl and not ssl.supported:
16
16
    ssl = None
17
17
 
18
 
from twisted.internet.defer import SUCCESS, FAILURE, Deferred, succeed, fail, maybeDeferred
19
 
from twisted.python import util, log
 
18
from twisted.internet.defer import Deferred, maybeDeferred
 
19
from twisted.python import util
20
20
 
21
21
import os
22
22
import sys
195
195
 
196
196
 
197
197
class InterfaceTestCase(unittest.TestCase):
198
 
 
199
 
    _called = 0
200
 
 
201
 
    def _callback(self, deferred, x, **d):
202
 
        """Callback for testCallLater"""
203
 
        self.assertEquals(x, 1)
204
 
        self.assertEquals(d, {'a': 1})
205
 
        self._called = 1
206
 
        self._calledTime = time.time()
207
 
        deferred.callback(None)
208
 
 
209
 
    def testCallLater(self):
210
 
        # add and remove a callback
211
 
        def bad():
212
 
            raise RuntimeError, "this shouldn't have been called"
213
 
        i = reactor.callLater(0.1, bad)
214
 
        i.cancel()
215
 
 
216
 
        self.assertRaises(error.AlreadyCancelled, i.cancel)
217
 
 
218
 
        d = defer.Deferred()
219
 
        i = reactor.callLater(0.5, self._callback, d, 1, a=1)
220
 
        start = time.time()
221
 
 
222
 
        def check(ignored):
223
 
            self.assertApproximates(self._calledTime, start + 0.5, 0.2 )
224
 
            self.assertRaises(error.AlreadyCalled, i.cancel)
225
 
            del self._called
226
 
            del self._calledTime
227
 
        d.addCallback(check)
 
198
    """
 
199
    Tests for a random pile of crap in twisted.internet, I suppose.
 
200
    """
 
201
 
 
202
    def test_callLater(self):
 
203
        """
 
204
        Test that a DelayedCall really calls the function it is supposed to call.
 
205
        """
 
206
        d = Deferred()
 
207
        reactor.callLater(0, d.callback, None)
 
208
        d.addCallback(self.assertEqual, None)
 
209
        return d
 
210
 
 
211
 
 
212
    def test_cancelDelayedCall(self):
 
213
        """
 
214
        Test that when a DelayedCall is cancelled it does not run.
 
215
        """
 
216
        called = []
 
217
        def function():
 
218
            called.append(None)
 
219
        call = reactor.callLater(0, function)
 
220
        call.cancel()
 
221
 
 
222
        # Schedule a call in two "iterations" to check to make sure that the
 
223
        # above call never ran.
 
224
        d = Deferred()
 
225
        def check():
 
226
            try:
 
227
                self.assertEqual(called, [])
 
228
            except:
 
229
                d.errback()
 
230
            else:
 
231
                d.callback(None)
 
232
        reactor.callLater(0, reactor.callLater, 0, check)
 
233
        return d
 
234
 
 
235
 
 
236
    def test_cancelCancelledDelayedCall(self):
 
237
        """
 
238
        Test that cancelling a DelayedCall which has already been cancelled
 
239
        raises the appropriate exception.
 
240
        """
 
241
        call = reactor.callLater(0, lambda: None)
 
242
        call.cancel()
 
243
        self.assertRaises(error.AlreadyCancelled, call.cancel)
 
244
 
 
245
 
 
246
    def test_cancelCalledDelayedCallSynchronous(self):
 
247
        """
 
248
        Test that cancelling a DelayedCall in the DelayedCall's function as
 
249
        that function is being invoked by the DelayedCall raises the
 
250
        appropriate exception.
 
251
        """
 
252
        d = Deferred()
 
253
        def later():
 
254
            try:
 
255
                self.assertRaises(error.AlreadyCalled, call.cancel)
 
256
            except:
 
257
                d.errback()
 
258
            else:
 
259
                d.callback(None)
 
260
        call = reactor.callLater(0, later)
 
261
        return d
 
262
 
 
263
 
 
264
    def test_cancelCalledDelayedCallAsynchronous(self):
 
265
        """
 
266
        Test that cancelling a DelayedCall after it has run its function
 
267
        raises the appropriate exception.
 
268
        """
 
269
        d = Deferred()
 
270
        def check():
 
271
            try:
 
272
                self.assertRaises(error.AlreadyCalled, call.cancel)
 
273
            except:
 
274
                d.errback()
 
275
            else:
 
276
                d.callback(None)
 
277
        def later():
 
278
            reactor.callLater(0, check)
 
279
        call = reactor.callLater(0, later)
228
280
        return d
229
281
 
230
282
 
233
285
        clock.install()
234
286
        try:
235
287
            callbackTimes = [None, None]
236
 
            
 
288
 
237
289
            def resetCallback():
238
290
                callbackTimes[0] = clock()
239
 
            
 
291
 
240
292
            def delayCallback():
241
293
                callbackTimes[1] = clock()
242
294
 
243
295
            ireset = reactor.callLater(2, resetCallback)
244
296
            idelay = reactor.callLater(3, delayCallback)
245
 
            
 
297
 
246
298
            clock.pump(reactor, [0, 1])
247
 
            
 
299
 
248
300
            ireset.reset(2) # (now)1 + 2 = 3
249
301
            idelay.delay(3) # (orig)3 + 3 = 6
250
302
 
270
322
            self.failUnless(d.getTime() - (time.time() + 10) < 1)
271
323
        finally:
272
324
            d.cancel()
273
 
    
 
325
 
274
326
    def testCallInNextIteration(self):
275
327
        calls = []
276
328
        def f1():
281
333
            reactor.callLater(0.0, f3)
282
334
        def f3():
283
335
            calls.append('f3')
284
 
        
 
336
 
285
337
        reactor.callLater(0, f1)
286
338
        self.assertEquals(calls, [])
287
339
        reactor.iterate()
314
366
        d = Deferred()
315
367
        reactor.callLater(0.2, d.callback, None)
316
368
        return d
317
 
    
 
369
 
318
370
    testCallLaterOrder.todo = "See bug 1396"
319
371
    testCallLaterOrder.skip = "Trial bug, todo doesn't work! See bug 1397"
320
372
    def testCallLaterOrder2(self):
323
375
 
324
376
        def seconds():
325
377
            return int(time.time())
326
 
        
 
378
 
327
379
        from twisted.internet import base
328
380
        from twisted.python import runtime
329
381
        base_original = base.seconds
336
388
            base.seconds = base_original
337
389
            return x
338
390
        return maybeDeferred(self.testCallLaterOrder).addBoth(cleanup)
339
 
        
 
391
 
340
392
    testCallLaterOrder2.todo = "See bug 1396"
341
393
    testCallLaterOrder2.skip = "Trial bug, todo doesn't work! See bug 1397"
342
 
    
 
394
 
343
395
    def testDelayedCallStringification(self):
344
396
        # Mostly just make sure str() isn't going to raise anything for
345
397
        # DelayedCalls within reason.
376
428
        self.assertEquals(dc.getTime(), 13)
377
429
 
378
430
 
 
431
class CallFromThreadTests(unittest.TestCase):
379
432
    def testWakeUp(self):
380
433
        # Make sure other threads can wake up the reactor
381
434
        d = Deferred()
389
442
    if interfaces.IReactorThreads(reactor, None) is None:
390
443
        testWakeUp.skip = "Nothing to wake up for without thread support"
391
444
 
 
445
    def _stopCallFromThreadCallback(self):
 
446
        self.stopped = True
 
447
 
 
448
    def _callFromThreadCallback(self, d):
 
449
        reactor.callFromThread(self._callFromThreadCallback2, d)
 
450
        reactor.callLater(0, self._stopCallFromThreadCallback)
 
451
 
 
452
    def _callFromThreadCallback2(self, d):
 
453
        try:
 
454
            self.assert_(self.stopped)
 
455
        except:
 
456
            # Send the error to the deferred
 
457
            d.errback()
 
458
        else:
 
459
            d.callback(None)
 
460
 
 
461
    def testCallFromThreadStops(self):
 
462
        """
 
463
        Ensure that callFromThread from inside a callFromThread
 
464
        callback doesn't sit in an infinite loop and lets other
 
465
        things happen too.
 
466
        """
 
467
        self.stopped = False
 
468
        d = defer.Deferred()
 
469
        reactor.callFromThread(self._callFromThreadCallback, d)
 
470
        return d
 
471
 
392
472
 
393
473
class ReactorCoreTestCase(unittest.TestCase):
394
474
    def setUp(self):
395
475
        self.triggers = []
396
476
        self.timers = []
 
477
 
 
478
 
397
479
    def addTrigger(self, event, phase, func):
398
480
        t = reactor.addSystemEventTrigger(event, phase, func)
399
481
        self.triggers.append(t)
400
482
        return t
 
483
 
 
484
 
401
485
    def removeTrigger(self, trigger):
402
486
        reactor.removeSystemEventTrigger(trigger)
403
487
        self.triggers.remove(trigger)
 
488
 
 
489
 
404
490
    def addTimer(self, when, func):
405
491
        t = reactor.callLater(when, func)
406
492
        self.timers.append(t)
407
493
        return t
 
494
 
 
495
 
408
496
    def removeTimer(self, timer):
409
497
        try:
410
498
            timer.cancel()
412
500
            pass
413
501
        self.timers.remove(timer)
414
502
 
 
503
 
415
504
    def tearDown(self):
416
505
        for t in self.triggers:
417
506
            try:
418
507
                reactor.removeSystemEventTrigger(t)
419
508
            except:
420
509
                pass
421
 
    def crash(self):
422
 
        reactor.crash()
423
 
    def stop(self):
424
 
        reactor.stop()
 
510
 
425
511
 
426
512
    def testRun(self):
427
 
        """Test that reactor.crash terminates reactor.run"""
428
 
        reactor.callLater(0.1, self.crash)
429
 
        reactor.run() # returns once .crash is called
430
 
        reactor.callLater(0.1, self.crash)
431
 
        reactor.run() # returns once .crash is called
 
513
        """
 
514
        Test that reactor.crash terminates reactor.run
 
515
        """
 
516
        for i in xrange(3):
 
517
            reactor.callLater(0.01, reactor.crash)
 
518
            reactor.run()
 
519
 
432
520
 
433
521
    def testIterate(self):
434
 
        """Test that reactor.iterate(0) doesn't block"""
 
522
        """
 
523
        Test that reactor.iterate(0) doesn't block
 
524
        """
435
525
        start = time.time()
436
526
        # twisted timers are distinct from the underlying event loop's
437
527
        # timers, so this fail-safe probably won't keep a failure from
438
528
        # hanging the test
439
 
        t = reactor.callLater(10, self.crash)
 
529
        t = reactor.callLater(10, reactor.crash)
440
530
        reactor.iterate(0) # shouldn't block
441
531
        stop = time.time()
442
532
        elapsed = stop - start
444
534
        self.failUnless(elapsed < 8)
445
535
        t.cancel()
446
536
 
447
 
    def timeout(self):
448
 
        print "test timed out"
449
 
        self.problem = 1
450
 
        self.fail("test timed out")
451
 
    def count(self):
452
 
        self.counter += 1
453
 
 
454
 
# XXX calling reactor.stop() in test suite causes problems with other tests
455
 
#     def testStop(self):
456
 
#         """reactor.stop should fire shutdown triggers"""
457
 
#         # make sure shutdown triggers are run when the reactor is stopped
458
 
#         self.counter = 0
459
 
#         self.problem = 0
460
 
#         self.addTrigger("before", "shutdown", self.count)
461
 
#         self.addTimer(0.1, self.stop)
462
 
#         t = self.addTimer(5, self.timeout)
463
 
#         reactor.run()
464
 
#         self.failUnless(self.counter == 1,
465
 
#                         "reactor.stop didn't invoke shutdown triggers")
466
 
#         self.failIf(self.problem, "the test timed out")
467
 
#         self.removeTimer(t)
468
 
 
469
 
    def testCrash(self):
470
 
        """reactor.crash should NOT fire shutdown triggers"""
471
 
        self.counter = 0
472
 
        self.problem = 0
473
 
        self.addTrigger("before", "shutdown", self.count)
 
537
 
 
538
    def test_crash(self):
 
539
        """
 
540
        reactor.crash should NOT fire shutdown triggers
 
541
        """
 
542
        events = []
 
543
        self.addTrigger(
 
544
            "before", "shutdown",
 
545
            lambda: events.append(("before", "shutdown")))
 
546
 
474
547
        # reactor.crash called from an "after-startup" trigger is too early
475
548
        # for the gtkreactor: gtk_mainloop is not yet running. Same is true
476
549
        # when called with reactor.callLater(0). Must be >0 seconds in the
477
550
        # future to let gtk_mainloop start first.
478
 
        self.addTimer(0.1, self.crash)
479
 
        t = self.addTimer(5, self.timeout)
 
551
        reactor.callWhenRunning(
 
552
            reactor.callLater, 0, reactor.crash)
480
553
        reactor.run()
481
 
        # this will fire reactor.crash, which ought to exit .run without
482
 
        # running the event triggers
483
 
        self.failUnless(self.counter == 0,
484
 
                        "reactor.crash invoked shutdown triggers, "
485
 
                        "but it isn't supposed to")
486
 
        self.failIf(self.problem, "the test timed out")
487
 
        self.removeTimer(t)
 
554
        self.failIf(events, "reactor.crash invoked shutdown triggers, but it "
 
555
                            "isn't supposed to.")
 
556
 
 
557
    # XXX Test that reactor.stop() invokes shutdown triggers
 
558
 
488
559
 
489
560
 
490
561
class DelayedTestCase(unittest.TestCase):
619
690
        # started, fail if it does not complete in a timely fashion.
620
691
        helperPath = os.path.abspath(self.mktemp())
621
692
        helperFile = open(helperPath, 'w')
622
 
        
 
693
 
623
694
        # Eeueuuggg
624
695
        reactorName = reactor.__module__
625
696
 
730
801
        self.assert_( isinstance(protocol, factory.protocol) )
731
802
 
732
803
 
733
 
class DummyProducer:
734
 
    resumed = 0
735
 
    stopped = 0
 
804
class DummyProducer(object):
 
805
    """
 
806
    Very uninteresting producer implementation used by tests to ensure the
 
807
    right methods are called by the consumer with which it is registered.
 
808
 
 
809
    @type events: C{list} of C{str}
 
810
    @ivar events: The producer/consumer related events which have happened to
 
811
    this producer.  Strings in this list may be C{'resume'}, C{'stop'}, or
 
812
    C{'pause'}.  Elements are added as they occur.
 
813
    """
 
814
 
 
815
    def __init__(self):
 
816
        self.events = []
 
817
 
 
818
 
736
819
    def resumeProducing(self):
737
 
         self.resumed += 1
 
820
        self.events.append('resume')
 
821
 
738
822
 
739
823
    def stopProducing(self):
740
 
         self.stopped += 1
 
824
        self.events.append('stop')
 
825
 
 
826
 
 
827
    def pauseProducing(self):
 
828
        self.events.append('pause')
 
829
 
 
830
 
 
831
 
 
832
class SillyDescriptor(abstract.FileDescriptor):
 
833
    """
 
834
    A descriptor whose data buffer gets filled very fast.
 
835
 
 
836
    Useful for testing FileDescriptor's IConsumer interface, since
 
837
    the data buffer fills as soon as at least four characters are
 
838
    written to it, and gets emptied in a single doWrite() cycle.
 
839
    """
 
840
    bufferSize = 3
 
841
    connected = True
 
842
 
 
843
    def writeSomeData(self, data):
 
844
        """
 
845
        Always write all data.
 
846
        """
 
847
        return len(data)
 
848
 
 
849
 
 
850
    def startWriting(self):
 
851
        """
 
852
        Do nothing: bypass the reactor.
 
853
        """
 
854
    stopWriting = startWriting
 
855
 
 
856
 
 
857
 
 
858
class ReentrantProducer(DummyProducer):
 
859
    """
 
860
    Similar to L{DummyProducer}, but with a resumeProducing method which calls
 
861
    back into an L{IConsumer} method of the consumer against which it is
 
862
    registered.
 
863
 
 
864
    @ivar consumer: The consumer with which this producer has been or will
 
865
    be registered.
 
866
 
 
867
    @ivar methodName: The name of the method to call on the consumer inside
 
868
    C{resumeProducing}.
 
869
 
 
870
    @ivar methodArgs: The arguments to pass to the consumer method invoked in
 
871
    C{resumeProducing}.
 
872
    """
 
873
    def __init__(self, consumer, methodName, *methodArgs):
 
874
        super(ReentrantProducer, self).__init__()
 
875
        self.consumer = consumer
 
876
        self.methodName = methodName
 
877
        self.methodArgs = methodArgs
 
878
 
 
879
 
 
880
    def resumeProducing(self):
 
881
        super(ReentrantProducer, self).resumeProducing()
 
882
        getattr(self.consumer, self.methodName)(*self.methodArgs)
 
883
 
 
884
 
741
885
 
742
886
class TestProducer(unittest.TestCase):
743
 
 
744
 
    def testDoubleProducer(self):
 
887
    """
 
888
    Test abstract.FileDescriptor's consumer interface.
 
889
    """
 
890
    def test_doubleProducer(self):
 
891
        """
 
892
        Verify that registering a non-streaming producer invokes its
 
893
        resumeProducing() method and that you can only register one producer
 
894
        at a time.
 
895
        """
745
896
        fd = abstract.FileDescriptor()
746
897
        fd.connected = 1
747
898
        dp = DummyProducer()
748
899
        fd.registerProducer(dp, 0)
749
 
        self.assertEquals(dp.resumed, 1)
 
900
        self.assertEquals(dp.events, ['resume'])
750
901
        self.assertRaises(RuntimeError, fd.registerProducer, DummyProducer(), 0)
751
902
 
752
 
    def testUnconnectedFileDescriptor(self):
 
903
 
 
904
    def test_unconnectedFileDescriptor(self):
 
905
        """
 
906
        Verify that registering a producer when the connection has already
 
907
        been closed invokes its stopProducing() method.
 
908
        """
753
909
        fd = abstract.FileDescriptor()
754
910
        fd.disconnected = 1
755
911
        dp = DummyProducer()
756
912
        fd.registerProducer(dp, 0)
757
 
        self.assertEquals(dp.stopped, 1)
 
913
        self.assertEquals(dp.events, ['stop'])
 
914
 
 
915
 
 
916
    def _dontPausePullConsumerTest(self, methodName):
 
917
        descriptor = SillyDescriptor()
 
918
        producer = DummyProducer()
 
919
        descriptor.registerProducer(producer, streaming=False)
 
920
        self.assertEqual(producer.events, ['resume'])
 
921
        del producer.events[:]
 
922
 
 
923
        # Fill up the descriptor's write buffer so we can observe whether or
 
924
        # not it pauses its producer in that case.
 
925
        getattr(descriptor, methodName)('1234')
 
926
 
 
927
        self.assertEqual(producer.events, [])
 
928
 
 
929
 
 
930
    def test_dontPausePullConsumerOnWrite(self):
 
931
        """
 
932
        Verify that FileDescriptor does not call producer.pauseProducing() on a
 
933
        non-streaming pull producer in response to a L{IConsumer.write} call
 
934
        which results in a full write buffer. Issue #2286.
 
935
        """
 
936
        return self._dontPausePullConsumerTest('write')
 
937
 
 
938
 
 
939
    def test_dontPausePullConsumerOnWriteSequence(self):
 
940
        """
 
941
        Like L{test_dontPausePullConsumerOnWrite}, but for a call to
 
942
        C{writeSequence} rather than L{IConsumer.write}.
 
943
 
 
944
        C{writeSequence} is not part of L{IConsumer}, but
 
945
        L{abstract.FileDescriptor} has supported consumery behavior in response
 
946
        to calls to L{writeSequence} forever.
 
947
        """
 
948
        return self._dontPausePullConsumerTest('writeSequence')
 
949
 
 
950
 
 
951
    def _reentrantStreamingProducerTest(self, methodName):
 
952
        descriptor = SillyDescriptor()
 
953
        producer = ReentrantProducer(descriptor, methodName, 'spam')
 
954
        descriptor.registerProducer(producer, streaming=True)
 
955
 
 
956
        # Start things off by filling up the descriptor's buffer so it will
 
957
        # pause its producer.
 
958
        getattr(descriptor, methodName)('spam')
 
959
 
 
960
        # Sanity check - make sure that worked.
 
961
        self.assertEqual(producer.events, ['pause'])
 
962
        del producer.events[:]
 
963
 
 
964
        # After one call to doWrite, the buffer has been emptied so the
 
965
        # FileDescriptor should resume its producer.  That will result in an
 
966
        # immediate call to FileDescriptor.write which will again fill the
 
967
        # buffer and result in the producer being paused.
 
968
        descriptor.doWrite()
 
969
        self.assertEqual(producer.events, ['resume', 'pause'])
 
970
        del producer.events[:]
 
971
 
 
972
        # After a second call to doWrite, the exact same thing should have
 
973
        # happened.  Prior to the bugfix for which this test was written,
 
974
        # FileDescriptor would have incorrectly believed its producer was
 
975
        # already resumed (it was paused) and so not resume it again.
 
976
        descriptor.doWrite()
 
977
        self.assertEqual(producer.events, ['resume', 'pause'])
 
978
 
 
979
 
 
980
    def test_reentrantStreamingProducerUsingWrite(self):
 
981
        """
 
982
        Verify that FileDescriptor tracks producer's paused state correctly.
 
983
        Issue #811, fixed in revision r12857.
 
984
        """
 
985
        return self._reentrantStreamingProducerTest('write')
 
986
 
 
987
 
 
988
    def test_reentrantStreamingProducerUsingWriteSequence(self):
 
989
        """
 
990
        Like L{test_reentrantStreamingProducerUsingWrite}, but for calls to
 
991
        C{writeSequence}.
 
992
 
 
993
        C{writeSequence} is B{not} part of L{IConsumer}, however
 
994
        C{abstract.FileDescriptor} has supported consumery behavior in response
 
995
        to calls to C{writeSequence} forever.
 
996
        """
 
997
        return self._reentrantStreamingProducerTest('writeSequence')
 
998
 
 
999
 
758
1000
 
759
1001
class PortStringification(unittest.TestCase):
760
1002
    if interfaces.IReactorTCP(reactor, None) is not None: