1
# -*- test-case-name: twisted.test.test_loopback -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
7
Testing support for protocols -- loopback between client and server.
12
from zope.interface import implements
15
from twisted.protocols import policies
16
from twisted.internet import interfaces, protocol, main, defer
17
from twisted.python import failure
18
from twisted.internet.interfaces import IAddress
21
class _LoopbackQueue(object):
23
Trivial wrapper around a list to give it an interface like a queue, which
24
the addition of also sending notifications by way of a Deferred whenever
25
the list has something added to it.
28
_notificationDeferred = None
37
if self._notificationDeferred is not None:
38
d, self._notificationDeferred = self._notificationDeferred, None
42
def __nonzero__(self):
43
return bool(self._queue)
47
return self._queue.pop(0)
51
class _LoopbackAddress(object):
55
class _LoopbackTransport(object):
56
implements(interfaces.ITransport, interfaces.IConsumer)
62
def __init__(self, q):
65
def write(self, bytes):
68
def writeSequence(self, iovec):
69
self.q.put(''.join(iovec))
71
def loseConnection(self):
72
self.q.disconnect = True
76
return _LoopbackAddress()
79
return _LoopbackAddress()
82
def registerProducer(self, producer, streaming):
83
assert self.producer is None
84
self.producer = producer
85
self.streamingProducer = streaming
88
def unregisterProducer(self):
89
assert self.producer is not None
92
def _pollProducer(self):
93
if self.producer is not None and not self.streamingProducer:
94
self.producer.resumeProducing()
98
def loopbackAsync(server, client):
100
Establish a connection between C{server} and C{client} then transfer data
101
between them until the connection is closed. This is often useful for
104
@param server: The protocol instance representing the server-side of this
107
@param client: The protocol instance representing the client-side of this
110
@return: A L{Deferred} which fires when the connection has been closed and
111
both sides have received notification of this.
113
serverToClient = _LoopbackQueue()
114
clientToServer = _LoopbackQueue()
116
server.makeConnection(_LoopbackTransport(serverToClient))
117
client.makeConnection(_LoopbackTransport(clientToServer))
119
return _loopbackAsyncBody(server, serverToClient, client, clientToServer)
123
def _loopbackAsyncBody(server, serverToClient, client, clientToServer):
125
Transfer bytes from the output queue of each protocol to the input of the other.
127
@param server: The protocol instance representing the server-side of this
130
@param serverToClient: The L{_LoopbackQueue} holding the server's output.
132
@param client: The protocol instance representing the client-side of this
135
@param clientToServer: The L{_LoopbackQueue} holding the client's output.
137
@return: A L{Deferred} which fires when the connection has been closed and
138
both sides have received notification of this.
140
def pump(source, q, target):
146
target.dataReceived(bytes)
148
# A write buffer has now been emptied. Give any producer on that side
149
# an opportunity to produce more data.
150
source.transport._pollProducer()
155
disconnect = clientSent = serverSent = False
157
# Deliver the data which has been written.
158
serverSent = pump(server, serverToClient, client)
159
clientSent = pump(client, clientToServer, server)
161
if not clientSent and not serverSent:
162
# Neither side wrote any data. Wait for some new data to be added
163
# before trying to do anything further.
164
d = clientToServer._notificationDeferred = serverToClient._notificationDeferred = defer.Deferred()
165
d.addCallback(_loopbackAsyncContinue, server, serverToClient, client, clientToServer)
167
if serverToClient.disconnect:
168
# The server wants to drop the connection. Flush any remaining
171
pump(server, serverToClient, client)
172
elif clientToServer.disconnect:
173
# The client wants to drop the connection. Flush any remaining
176
pump(client, clientToServer, server)
178
# Someone wanted to disconnect, so okay, the connection is gone.
179
server.connectionLost(failure.Failure(main.CONNECTION_DONE))
180
client.connectionLost(failure.Failure(main.CONNECTION_DONE))
181
return defer.succeed(None)
185
def _loopbackAsyncContinue(ignored, server, serverToClient, client, clientToServer):
186
# Clear the Deferred from each message queue, since it has already fired
187
# and cannot be used again.
188
clientToServer._notificationDeferred = serverToClient._notificationDeferred = None
190
# Push some more bytes around.
191
return _loopbackAsyncBody(server, serverToClient, client, clientToServer)
197
implements(interfaces.ITransport, interfaces.IConsumer)
204
def __init__(self, target, logFile=None):
206
self.logFile = logFile
208
def write(self, data):
209
self.buffer = self.buffer + data
211
self.logFile.write("loopback writing %s\n" % repr(data))
213
def writeSequence(self, iovec):
214
self.write("".join(iovec))
216
def clearBuffer(self):
217
if self.shouldLose == -1:
221
self.producer.resumeProducing()
224
self.logFile.write("loopback receiving %s\n" % repr(self.buffer))
227
self.target.dataReceived(buffer)
228
if self.shouldLose == 1:
230
self.target.connectionLost(failure.Failure(main.CONNECTION_DONE))
232
def loseConnection(self):
233
if self.shouldLose != -1:
242
def registerProducer(self, producer, streaming):
243
self.producer = producer
245
def unregisterProducer(self):
249
return 'Loopback(%r)' % (self.target.__class__.__name__,)
251
def loopback(server, client, logFile=None):
252
"""Run session between server and client.
253
DEPRECATED in Twisted 2.5. Use loopbackAsync instead.
256
warnings.warn('loopback() is deprecated (since Twisted 2.5). '
257
'Use loopbackAsync() instead.',
258
stacklevel=2, category=DeprecationWarning)
259
from twisted.internet import reactor
260
serverToClient = LoopbackRelay(client, logFile)
261
clientToServer = LoopbackRelay(server, logFile)
262
server.makeConnection(serverToClient)
263
client.makeConnection(clientToServer)
265
reactor.iterate(0.01) # this is to clear any deferreds
266
serverToClient.clearBuffer()
267
clientToServer.clearBuffer()
268
if serverToClient.shouldLose:
269
serverToClient.clearBuffer()
270
server.connectionLost(failure.Failure(main.CONNECTION_DONE))
272
elif clientToServer.shouldLose:
273
client.connectionLost(failure.Failure(main.CONNECTION_DONE))
275
reactor.iterate() # last gasp before I go away
278
class LoopbackClientFactory(protocol.ClientFactory):
280
def __init__(self, protocol):
281
self.disconnected = 0
282
self.deferred = defer.Deferred()
283
self.protocol = protocol
285
def buildProtocol(self, addr):
288
def clientConnectionLost(self, connector, reason):
289
self.disconnected = 1
290
self.deferred.callback(None)
293
class _FireOnClose(policies.ProtocolWrapper):
294
def __init__(self, protocol, factory):
295
policies.ProtocolWrapper.__init__(self, protocol, factory)
296
self.deferred = defer.Deferred()
298
def connectionLost(self, reason):
299
policies.ProtocolWrapper.connectionLost(self, reason)
300
self.deferred.callback(None)
303
def loopbackTCP(server, client, port=0, noisy=True):
304
"""Run session between server and client protocol instances over TCP."""
305
from twisted.internet import reactor
306
f = policies.WrappingFactory(protocol.Factory())
307
serverWrapper = _FireOnClose(f, server)
309
f.buildProtocol = lambda addr: serverWrapper
310
serverPort = reactor.listenTCP(port, f, interface='127.0.0.1')
311
clientF = LoopbackClientFactory(client)
312
clientF.noisy = noisy
313
reactor.connectTCP('127.0.0.1', serverPort.getHost().port, clientF)
315
d.addCallback(lambda x: serverWrapper.deferred)
316
d.addCallback(lambda x: serverPort.stopListening())
320
def loopbackUNIX(server, client, noisy=True):
321
"""Run session between server and client protocol instances over UNIX socket."""
322
path = tempfile.mktemp()
323
from twisted.internet import reactor
324
f = policies.WrappingFactory(protocol.Factory())
325
serverWrapper = _FireOnClose(f, server)
327
f.buildProtocol = lambda addr: serverWrapper
328
serverPort = reactor.listenUNIX(path, f)
329
clientF = LoopbackClientFactory(client)
330
clientF.noisy = noisy
331
reactor.connectUNIX(path, clientF)
333
d.addCallback(lambda x: serverWrapper.deferred)
334
d.addCallback(lambda x: serverPort.stopListening())