1
from twisted.internet.protocol import ServerFactory, Protocol, ClientCreator
2
from twisted.internet.defer import DeferredList, maybeDeferred, Deferred
3
from twisted.trial import unittest
4
from twisted.internet import reactor
5
from twisted.python import log
7
from zope.interface.verify import verifyClass
9
class StopStartReadingProtocol(Protocol):
10
def connectionMade(self):
11
self.transport.pauseProducing()
12
self.transport.resumeProducing()
13
reactor.callLater(0, self._beTerrible)
17
def _beTerrible(self):
18
self.transport.pauseProducing()
19
self.transport.resumeProducing()
20
reactor.callLater(0, self._beMoreTerrible)
23
def _beMoreTerrible(self):
24
self.transport.pauseProducing()
25
self.transport.resumeProducing()
26
reactor.callLater(0, self.factory.ready_d.callback, self)
29
def dataReceived(self, data):
30
log.msg('got data', len(data))
32
if len(self.data) == 4*self.transport.readBufferSize:
33
self.factory.stop_d.callback(self.data)
37
class IOCPReactorTestCase(unittest.TestCase):
38
def test_noPendingTimerEvents(self):
40
Test reactor behavior (doIteration) when there are no pending time
43
from twisted.internet.iocpreactor.reactor import IOCPReactor
46
self.failIf(ir.doIteration(None))
49
def test_stopStartReading(self):
51
This test checks transport read state! There are three bits
53
1) The transport producer is paused -- transport.reading
55
2) The transport is about to schedule an OS read, on the next
56
reactor iteration -- transport._readScheduled
57
3) The OS has a pending asynchronous read on our behalf --
58
transport._readScheduledInOS
59
if 3) is not implemented, it is possible to trick IOCPReactor into
60
scheduling an OS read before the previous one finishes
63
sf.protocol = StopStartReadingProtocol
64
sf.ready_d = Deferred()
65
sf.stop_d = Deferred()
66
p = reactor.listenTCP(0, sf)
67
port = p.getHost().port
68
cc = ClientCreator(reactor, Protocol)
69
def proceed(protos, port):
70
log.msg('PROCEEDING WITH THE TESTATHRON')
71
self.assert_(protos[0])
72
self.assert_(protos[1])
73
protos = protos[0][1], protos[1][1]
74
protos[0].transport.write(
75
'x' * (2 * protos[0].transport.readBufferSize) +
76
'y' * (2 * protos[0].transport.readBufferSize))
77
return sf.stop_d.addCallback(cleanup, protos, port)
79
def cleanup(data, protos, port):
80
self.assert_(data == 'x'*(2*protos[0].transport.readBufferSize)+
81
'y'*(2*protos[0].transport.readBufferSize),
82
'did not get the right data')
84
maybeDeferred(protos[0].transport.loseConnection),
85
maybeDeferred(protos[1].transport.loseConnection),
86
maybeDeferred(port.stopListening)])
88
return (DeferredList([cc.connectTCP('127.0.0.1', port), sf.ready_d])
89
.addCallback(proceed, p))
92
def test_reactorInterfaces(self):
94
Verify that IOCP socket-representing classes implement IReadWriteHandle
96
from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
97
from twisted.internet.iocpreactor import tcp, udp
98
verifyClass(IReadWriteHandle, tcp.Connection)
99
verifyClass(IReadWriteHandle, udp.Port)
103
if reactor.__class__.__name__ != 'IOCPReactor':
104
IOCPReactorTestCase.skip = 'This test only applies to IOCPReactor'