16
15
from twisted.protocols import policies
17
16
from twisted.internet import interfaces, protocol, main, defer
18
from twisted.python import failure, components
17
from twisted.python import failure
18
from twisted.internet.interfaces import IAddress
21
class _LoopbackQueue(object):
23
Trivial wrapper around a list to give it an interface like a queue, which
24
the addition of also sending notifications by way of a Deferred whenever
25
the list has something added to it.
28
_notificationDeferred = None
37
if self._notificationDeferred is not None:
38
d, self._notificationDeferred = self._notificationDeferred, None
42
def __nonzero__(self):
43
return bool(self._queue)
47
return self._queue.pop(0)
51
class _LoopbackAddress(object):
55
class _LoopbackTransport(object):
56
implements(interfaces.ITransport, interfaces.IConsumer)
62
def __init__(self, q):
65
def write(self, bytes):
68
def writeSequence(self, iovec):
69
self.q.put(''.join(iovec))
71
def loseConnection(self):
72
self.q.disconnect = True
76
return _LoopbackAddress()
79
return _LoopbackAddress()
82
def registerProducer(self, producer, streaming):
83
assert self.producer is None
84
self.producer = producer
85
self.streamingProducer = streaming
88
def unregisterProducer(self):
89
assert self.producer is not None
92
def _pollProducer(self):
93
if self.producer is not None and not self.streamingProducer:
94
self.producer.resumeProducing()
98
def loopbackAsync(server, client):
100
Establish a connection between C{server} and C{client} then transfer data
101
between them until the connection is closed. This is often useful for
104
@param server: The protocol instance representing the server-side of this
107
@param client: The protocol instance representing the client-side of this
110
@return: A L{Deferred} which fires when the connection has been closed and
111
both sides have received notification of this.
113
serverToClient = _LoopbackQueue()
114
clientToServer = _LoopbackQueue()
116
server.makeConnection(_LoopbackTransport(serverToClient))
117
client.makeConnection(_LoopbackTransport(clientToServer))
119
return _loopbackAsyncBody(server, serverToClient, client, clientToServer)
123
def _loopbackAsyncBody(server, serverToClient, client, clientToServer):
125
Transfer bytes from the output queue of each protocol to the input of the other.
127
@param server: The protocol instance representing the server-side of this
130
@param serverToClient: The L{_LoopbackQueue} holding the server's output.
132
@param client: The protocol instance representing the client-side of this
135
@param clientToServer: The L{_LoopbackQueue} holding the client's output.
137
@return: A L{Deferred} which fires when the connection has been closed and
138
both sides have received notification of this.
140
def pump(source, q, target):
146
target.dataReceived(bytes)
148
# A write buffer has now been emptied. Give any producer on that side
149
# an opportunity to produce more data.
150
source.transport._pollProducer()
155
disconnect = clientSent = serverSent = False
157
# Deliver the data which has been written.
158
serverSent = pump(server, serverToClient, client)
159
clientSent = pump(client, clientToServer, server)
161
if not clientSent and not serverSent:
162
# Neither side wrote any data. Wait for some new data to be added
163
# before trying to do anything further.
164
d = clientToServer._notificationDeferred = serverToClient._notificationDeferred = defer.Deferred()
165
d.addCallback(_loopbackAsyncContinue, server, serverToClient, client, clientToServer)
167
if serverToClient.disconnect:
168
# The server wants to drop the connection. Flush any remaining
171
pump(server, serverToClient, client)
172
elif clientToServer.disconnect:
173
# The client wants to drop the connection. Flush any remaining
176
pump(client, clientToServer, server)
178
# Someone wanted to disconnect, so okay, the connection is gone.
179
server.connectionLost(failure.Failure(main.CONNECTION_DONE))
180
client.connectionLost(failure.Failure(main.CONNECTION_DONE))
181
return defer.succeed(None)
185
def _loopbackAsyncContinue(ignored, server, serverToClient, client, clientToServer):
186
# Clear the Deferred from each message queue, since it has already fired
187
# and cannot be used again.
188
clientToServer._notificationDeferred = serverToClient._notificationDeferred = None
190
# Push some more bytes around.
191
return _loopbackAsyncBody(server, serverToClient, client, clientToServer)
20
195
class LoopbackRelay: