1
# -*- test-case-name: twisted.test.test_loopback -*-
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Testing support for protocols -- loopback between client and server.
11
from zope.interface import implements
14
from twisted.protocols import policies
15
from twisted.internet import interfaces, protocol, main, defer
16
from twisted.internet.task import deferLater
17
from twisted.python import failure
18
from twisted.internet.interfaces import IAddress
21
class _LoopbackQueue(object):
23
Trivial wrapper around a list to give it an interface like a queue, which
24
the addition of also sending notifications by way of a Deferred whenever
25
the list has something added to it.
28
_notificationDeferred = None
37
if self._notificationDeferred is not None:
38
d, self._notificationDeferred = self._notificationDeferred, None
42
def __nonzero__(self):
43
return bool(self._queue)
47
return self._queue.pop(0)
51
class _LoopbackAddress(object):
55
class _LoopbackTransport(object):
56
implements(interfaces.ITransport, interfaces.IConsumer)
62
def __init__(self, q):
65
def write(self, bytes):
68
def writeSequence(self, iovec):
69
self.q.put(''.join(iovec))
71
def loseConnection(self):
72
self.q.disconnect = True
76
return _LoopbackAddress()
79
return _LoopbackAddress()
82
def registerProducer(self, producer, streaming):
83
assert self.producer is None
84
self.producer = producer
85
self.streamingProducer = streaming
88
def unregisterProducer(self):
89
assert self.producer is not None
92
def _pollProducer(self):
93
if self.producer is not None and not self.streamingProducer:
94
self.producer.resumeProducing()
98
def identityPumpPolicy(queue, target):
100
L{identityPumpPolicy} is a policy which delivers each chunk of data written
101
to the given queue as-is to the target.
103
This isn't a particularly realistic policy.
105
@see: L{loopbackAsync}
111
target.dataReceived(bytes)
115
def collapsingPumpPolicy(queue, target):
117
L{collapsingPumpPolicy} is a policy which collapses all outstanding chunks
118
into a single string and delivers it to the target.
120
@see: L{loopbackAsync}
129
target.dataReceived(''.join(bytes))
133
def loopbackAsync(server, client, pumpPolicy=identityPumpPolicy):
135
Establish a connection between C{server} and C{client} then transfer data
136
between them until the connection is closed. This is often useful for
139
@param server: The protocol instance representing the server-side of this
142
@param client: The protocol instance representing the client-side of this
145
@param pumpPolicy: When either C{server} or C{client} writes to its
146
transport, the string passed in is added to a queue of data for the
147
other protocol. Eventually, C{pumpPolicy} will be called with one such
148
queue and the corresponding protocol object. The pump policy callable
149
is responsible for emptying the queue and passing the strings it
150
contains to the given protocol's C{dataReceived} method. The signature
151
of C{pumpPolicy} is C{(queue, protocol)}. C{queue} is an object with a
152
C{get} method which will return the next string written to the
153
transport, or C{None} if the transport has been disconnected, and which
154
evaluates to C{True} if and only if there are more items to be
155
retrieved via C{get}.
157
@return: A L{Deferred} which fires when the connection has been closed and
158
both sides have received notification of this.
160
serverToClient = _LoopbackQueue()
161
clientToServer = _LoopbackQueue()
163
server.makeConnection(_LoopbackTransport(serverToClient))
164
client.makeConnection(_LoopbackTransport(clientToServer))
166
return _loopbackAsyncBody(
167
server, serverToClient, client, clientToServer, pumpPolicy)
171
def _loopbackAsyncBody(server, serverToClient, client, clientToServer,
174
Transfer bytes from the output queue of each protocol to the input of the other.
176
@param server: The protocol instance representing the server-side of this
179
@param serverToClient: The L{_LoopbackQueue} holding the server's output.
181
@param client: The protocol instance representing the client-side of this
184
@param clientToServer: The L{_LoopbackQueue} holding the client's output.
186
@param pumpPolicy: See L{loopbackAsync}.
188
@return: A L{Deferred} which fires when the connection has been closed and
189
both sides have received notification of this.
191
def pump(source, q, target):
194
pumpPolicy(q, target)
197
# A write buffer has now been emptied. Give any producer on that
198
# side an opportunity to produce more data.
199
source.transport._pollProducer()
204
disconnect = clientSent = serverSent = False
206
# Deliver the data which has been written.
207
serverSent = pump(server, serverToClient, client)
208
clientSent = pump(client, clientToServer, server)
210
if not clientSent and not serverSent:
211
# Neither side wrote any data. Wait for some new data to be added
212
# before trying to do anything further.
214
clientToServer._notificationDeferred = d
215
serverToClient._notificationDeferred = d
217
_loopbackAsyncContinue,
218
server, serverToClient, client, clientToServer, pumpPolicy)
220
if serverToClient.disconnect:
221
# The server wants to drop the connection. Flush any remaining
224
pump(server, serverToClient, client)
225
elif clientToServer.disconnect:
226
# The client wants to drop the connection. Flush any remaining
229
pump(client, clientToServer, server)
231
# Someone wanted to disconnect, so okay, the connection is gone.
232
server.connectionLost(failure.Failure(main.CONNECTION_DONE))
233
client.connectionLost(failure.Failure(main.CONNECTION_DONE))
234
return defer.succeed(None)
238
def _loopbackAsyncContinue(ignored, server, serverToClient, client,
239
clientToServer, pumpPolicy):
240
# Clear the Deferred from each message queue, since it has already fired
241
# and cannot be used again.
242
clientToServer._notificationDeferred = None
243
serverToClient._notificationDeferred = None
245
# Schedule some more byte-pushing to happen. This isn't done
246
# synchronously because no actual transport can re-enter dataReceived as
247
# a result of calling write, and doing this synchronously could result
249
from twisted.internet import reactor
253
server, serverToClient, client, clientToServer, pumpPolicy)
259
implements(interfaces.ITransport, interfaces.IConsumer)
266
def __init__(self, target, logFile=None):
268
self.logFile = logFile
270
def write(self, data):
271
self.buffer = self.buffer + data
273
self.logFile.write("loopback writing %s\n" % repr(data))
275
def writeSequence(self, iovec):
276
self.write("".join(iovec))
278
def clearBuffer(self):
279
if self.shouldLose == -1:
283
self.producer.resumeProducing()
286
self.logFile.write("loopback receiving %s\n" % repr(self.buffer))
289
self.target.dataReceived(buffer)
290
if self.shouldLose == 1:
292
self.target.connectionLost(failure.Failure(main.CONNECTION_DONE))
294
def loseConnection(self):
295
if self.shouldLose != -1:
304
def registerProducer(self, producer, streaming):
305
self.producer = producer
307
def unregisterProducer(self):
311
return 'Loopback(%r)' % (self.target.__class__.__name__,)
313
def loopback(server, client, logFile=None):
314
"""Run session between server and client.
315
DEPRECATED in Twisted 2.5. Use loopbackAsync instead.
318
warnings.warn('loopback() is deprecated (since Twisted 2.5). '
319
'Use loopbackAsync() instead.',
320
stacklevel=2, category=DeprecationWarning)
321
from twisted.internet import reactor
322
serverToClient = LoopbackRelay(client, logFile)
323
clientToServer = LoopbackRelay(server, logFile)
324
server.makeConnection(serverToClient)
325
client.makeConnection(clientToServer)
327
reactor.iterate(0.01) # this is to clear any deferreds
328
serverToClient.clearBuffer()
329
clientToServer.clearBuffer()
330
if serverToClient.shouldLose:
331
serverToClient.clearBuffer()
332
server.connectionLost(failure.Failure(main.CONNECTION_DONE))
334
elif clientToServer.shouldLose:
335
client.connectionLost(failure.Failure(main.CONNECTION_DONE))
337
reactor.iterate() # last gasp before I go away
340
class LoopbackClientFactory(protocol.ClientFactory):
342
def __init__(self, protocol):
343
self.disconnected = 0
344
self.deferred = defer.Deferred()
345
self.protocol = protocol
347
def buildProtocol(self, addr):
350
def clientConnectionLost(self, connector, reason):
351
self.disconnected = 1
352
self.deferred.callback(None)
355
class _FireOnClose(policies.ProtocolWrapper):
356
def __init__(self, protocol, factory):
357
policies.ProtocolWrapper.__init__(self, protocol, factory)
358
self.deferred = defer.Deferred()
360
def connectionLost(self, reason):
361
policies.ProtocolWrapper.connectionLost(self, reason)
362
self.deferred.callback(None)
365
def loopbackTCP(server, client, port=0, noisy=True):
366
"""Run session between server and client protocol instances over TCP."""
367
from twisted.internet import reactor
368
f = policies.WrappingFactory(protocol.Factory())
369
serverWrapper = _FireOnClose(f, server)
371
f.buildProtocol = lambda addr: serverWrapper
372
serverPort = reactor.listenTCP(port, f, interface='127.0.0.1')
373
clientF = LoopbackClientFactory(client)
374
clientF.noisy = noisy
375
reactor.connectTCP('127.0.0.1', serverPort.getHost().port, clientF)
377
d.addCallback(lambda x: serverWrapper.deferred)
378
d.addCallback(lambda x: serverPort.stopListening())
382
def loopbackUNIX(server, client, noisy=True):
383
"""Run session between server and client protocol instances over UNIX socket."""
384
path = tempfile.mktemp()
385
from twisted.internet import reactor
386
f = policies.WrappingFactory(protocol.Factory())
387
serverWrapper = _FireOnClose(f, server)
389
f.buildProtocol = lambda addr: serverWrapper
390
serverPort = reactor.listenUNIX(path, f)
391
clientF = LoopbackClientFactory(client)
392
clientF.noisy = noisy
393
reactor.connectUNIX(path, clientF)
395
d.addCallback(lambda x: serverWrapper.deferred)
396
d.addCallback(lambda x: serverPort.stopListening())