~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/test/test_iocp.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from twisted.internet.protocol import ServerFactory, Protocol, ClientCreator
 
2
from twisted.internet.defer import DeferredList, maybeDeferred, Deferred
 
3
from twisted.trial import unittest
 
4
from twisted.internet import reactor
 
5
from twisted.python import log
 
6
 
 
7
from zope.interface.verify import verifyClass
 
8
 
 
9
class StopStartReadingProtocol(Protocol):
 
10
    def connectionMade(self):
 
11
        self.transport.pauseProducing()
 
12
        self.transport.resumeProducing()
 
13
        reactor.callLater(0, self._beTerrible)
 
14
        self.data = ''
 
15
 
 
16
 
 
17
    def _beTerrible(self):
 
18
        self.transport.pauseProducing()
 
19
        self.transport.resumeProducing()
 
20
        reactor.callLater(0, self._beMoreTerrible)
 
21
 
 
22
 
 
23
    def _beMoreTerrible(self):
 
24
        self.transport.pauseProducing()
 
25
        self.transport.resumeProducing()
 
26
        reactor.callLater(0, self.factory.ready_d.callback, self)
 
27
 
 
28
 
 
29
    def dataReceived(self, data):
 
30
        log.msg('got data', len(data))
 
31
        self.data += data
 
32
        if len(self.data) == 4*self.transport.readBufferSize:
 
33
            self.factory.stop_d.callback(self.data)
 
34
 
 
35
 
 
36
 
 
37
class IOCPReactorTestCase(unittest.TestCase):
 
38
    def test_noPendingTimerEvents(self):
 
39
        """
 
40
        Test reactor behavior (doIteration) when there are no pending time
 
41
        events.
 
42
        """
 
43
        from twisted.internet.iocpreactor.reactor import IOCPReactor
 
44
        ir = IOCPReactor()
 
45
        ir.wakeUp()
 
46
        self.failIf(ir.doIteration(None))
 
47
 
 
48
 
 
49
    def test_stopStartReading(self):
 
50
        """
 
51
        This test checks transport read state! There are three bits
 
52
        of it:
 
53
        1) The transport producer is paused -- transport.reading
 
54
           is False)
 
55
        2) The transport is about to schedule an OS read, on the next
 
56
           reactor iteration -- transport._readScheduled
 
57
        3) The OS has a pending asynchronous read on our behalf --
 
58
           transport._readScheduledInOS
 
59
        if 3) is not implemented, it is possible to trick IOCPReactor into
 
60
        scheduling an OS read before the previous one finishes
 
61
        """
 
62
        sf = ServerFactory()
 
63
        sf.protocol = StopStartReadingProtocol
 
64
        sf.ready_d = Deferred()
 
65
        sf.stop_d = Deferred()
 
66
        p = reactor.listenTCP(0, sf)
 
67
        port = p.getHost().port
 
68
        cc = ClientCreator(reactor, Protocol)
 
69
        def proceed(protos, port):
 
70
            log.msg('PROCEEDING WITH THE TESTATHRON')
 
71
            self.assert_(protos[0])
 
72
            self.assert_(protos[1])
 
73
            protos = protos[0][1], protos[1][1]
 
74
            protos[0].transport.write(
 
75
                    'x' * (2 * protos[0].transport.readBufferSize) +
 
76
                    'y' * (2 * protos[0].transport.readBufferSize))
 
77
            return sf.stop_d.addCallback(cleanup, protos, port)
 
78
        
 
79
        def cleanup(data, protos, port):
 
80
            self.assert_(data == 'x'*(2*protos[0].transport.readBufferSize)+
 
81
                                 'y'*(2*protos[0].transport.readBufferSize),
 
82
                                 'did not get the right data')
 
83
            return DeferredList([
 
84
                    maybeDeferred(protos[0].transport.loseConnection),
 
85
                    maybeDeferred(protos[1].transport.loseConnection),
 
86
                    maybeDeferred(port.stopListening)])
 
87
 
 
88
        return (DeferredList([cc.connectTCP('127.0.0.1', port), sf.ready_d])
 
89
                .addCallback(proceed, p))
 
90
 
 
91
 
 
92
    def test_reactorInterfaces(self):
 
93
        """
 
94
        Verify that IOCP socket-representing classes implement IReadWriteHandle
 
95
        """
 
96
        from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
 
97
        from twisted.internet.iocpreactor import tcp, udp
 
98
        verifyClass(IReadWriteHandle, tcp.Connection)
 
99
        verifyClass(IReadWriteHandle, udp.Port)
 
100
 
 
101
 
 
102
 
 
103
if reactor.__class__.__name__ != 'IOCPReactor':
 
104
    IOCPReactorTestCase.skip = 'This test only applies to IOCPReactor'
 
105