1
# -*- test-case-name: twisted.test.test_stringtransport -*-
2
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Assorted functionality which is commonly useful when writing unit tests.
9
from StringIO import StringIO
11
from zope.interface import implements
12
from zope.interface.verify import verifyObject
14
from twisted.internet.interfaces import ITransport, IConsumer, IPushProducer
15
from twisted.internet.interfaces import IReactorTCP
16
from twisted.protocols import basic
17
from twisted.internet import protocol, error
20
class AccumulatingProtocol(protocol.Protocol):
22
L{AccumulatingProtocol} is an L{IProtocol} implementation which collects
23
the data delivered to it and can fire a Deferred when it is connected or
26
@ivar made: A flag indicating whether C{connectionMade} has been called.
27
@ivar data: A string giving all the data passed to C{dataReceived}.
28
@ivar closed: A flag indicated whether C{connectionLost} has been called.
29
@ivar closedReason: The value of the I{reason} parameter passed to
31
@ivar closedDeferred: If set to a L{Deferred}, this will be fired when
32
C{connectionLost} is called.
43
def connectionMade(self):
45
if (self.factory is not None and
46
self.factory.protocolConnectionMade is not None):
47
d = self.factory.protocolConnectionMade
48
self.factory.protocolConnectionMade = None
51
def dataReceived(self, data):
54
def connectionLost(self, reason):
56
self.closedReason = reason
57
if self.closedDeferred is not None:
58
d, self.closedDeferred = self.closedDeferred, None
62
class LineSendingProtocol(basic.LineReceiver):
65
def __init__(self, lines, start = True):
70
def connectionMade(self):
72
map(self.sendLine, self.lines)
74
def lineReceived(self, line):
76
map(self.sendLine, self.lines)
78
self.response.append(line)
80
def connectionLost(self, reason):
84
class FakeDatagramTransport:
90
def write(self, packet, addr=noAddr):
91
self.written.append((packet, addr))
94
class StringTransport:
96
A transport implementation which buffers data in memory and keeps track of
97
its other state without providing any behavior.
99
L{StringTransport} has a number of attributes which are not part of any of
100
the interfaces it claims to implement. These attributes are provided for
101
testing purposes. Implementation code should not use any of these
102
attributes; they are not provided by other transports.
104
@ivar disconnecting: A C{bool} which is C{False} until L{loseConnection} is
105
called, then C{True}.
107
@ivar producer: If a producer is currently registered, C{producer} is a
108
reference to it. Otherwise, C{None}.
110
@ivar streaming: If a producer is currently registered, C{streaming} refers
111
to the value of the second parameter passed to C{registerProducer}.
113
@ivar hostAddr: C{None} or an object which will be returned as the host
114
address of this transport. If C{None}, a nasty tuple will be returned
117
@ivar peerAddr: C{None} or an object which will be returned as the peer
118
address of this transport. If C{None}, a nasty tuple will be returned
121
@ivar producerState: The state of this L{StringTransport} in its capacity
122
as an L{IPushProducer}. One of C{'producing'}, C{'paused'}, or
125
@ivar io: A L{StringIO} which holds the data which has been written to this
126
transport since the last call to L{clear}. Use L{value} instead of
127
accessing this directly.
129
implements(ITransport, IConsumer, IPushProducer)
131
disconnecting = False
139
producerState = 'producing'
141
def __init__(self, hostAddress=None, peerAddress=None):
143
if hostAddress is not None:
144
self.hostAddr = hostAddress
145
if peerAddress is not None:
146
self.peerAddr = peerAddress
147
self.connected = True
151
Discard all data written to this transport so far.
153
This is not a transport method. It is intended for tests. Do not use
154
it in implementation code.
161
Retrieve all data which has been buffered by this transport.
163
This is not a transport method. It is intended for tests. Do not use
164
it in implementation code.
166
@return: A C{str} giving all data written to this transport since the
167
last call to L{clear}.
170
return self.io.getvalue()
174
def write(self, data):
175
if isinstance(data, unicode): # no, really, I mean it
176
raise TypeError("Data must not be unicode")
180
def writeSequence(self, data):
181
self.io.write(''.join(data))
184
def loseConnection(self):
186
Close the connection. Does nothing besides toggle the C{disconnecting}
187
instance variable to C{True}.
189
self.disconnecting = True
193
if self.peerAddr is None:
194
return ('StringIO', repr(self.io))
199
if self.hostAddr is None:
200
return ('StringIO', repr(self.io))
205
def registerProducer(self, producer, streaming):
206
if self.producer is not None:
207
raise RuntimeError("Cannot register two producers")
208
self.producer = producer
209
self.streaming = streaming
212
def unregisterProducer(self):
213
if self.producer is None:
215
"Cannot unregister a producer unless one is registered")
217
self.streaming = None
221
def _checkState(self):
222
if self.disconnecting:
224
"Cannot resume producing after loseConnection")
225
if self.producerState == 'stopped':
226
raise RuntimeError("Cannot resume a stopped producer")
229
def pauseProducing(self):
231
self.producerState = 'paused'
234
def stopProducing(self):
235
self.producerState = 'stopped'
238
def resumeProducing(self):
240
self.producerState = 'producing'
244
class StringTransportWithDisconnection(StringTransport):
245
def loseConnection(self):
247
self.connected = False
248
self.protocol.connectionLost(error.ConnectionDone("Bye."))
252
class StringIOWithoutClosing(StringIO):
254
A StringIO that can't be closed.
262
class MemoryReactor(object):
264
A fake reactor to be used in tests. This reactor doesn't actually do
265
much that's useful yet. It accepts TCP connection setup attempts, but
266
they will never succeed.
268
@ivar tcpClients: a list that keeps track of connection attempts (ie, calls
270
@type tcpClients: C{list}
272
@ivar tcpServers: a list that keeps track of server listen attempts (ie, calls
274
@type tcpServers: C{list}
276
implements(IReactorTCP)
280
Initialize the tracking lists.
286
def listenTCP(self, port, factory, backlog=50, interface=''):
288
Fake L{reactor.listenTCP}, that does nothing but log the call.
290
self.tcpServers.append((port, factory, backlog, interface))
293
def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
295
Fake L{reactor.connectTCP}, that does nothing but log the call.
297
self.tcpClients.append((host, port, factory, timeout, bindAddress))
299
verifyObject(IReactorTCP, MemoryReactor())