~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/protocols/loopback.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
3
3
# See LICENSE for details.
4
4
 
5
5
 
6
 
# These class's names should have been based on Onanism, but were
7
 
# censored by the PSU
8
 
 
9
 
"""Testing support for protocols -- loopback between client and server."""
 
6
"""
 
7
Testing support for protocols -- loopback between client and server.
 
8
"""
10
9
 
11
10
# system imports
12
11
import tempfile
15
14
# Twisted Imports
16
15
from twisted.protocols import policies
17
16
from twisted.internet import interfaces, protocol, main, defer
18
 
from twisted.python import failure, components
 
17
from twisted.python import failure
 
18
from twisted.internet.interfaces import IAddress
 
19
 
 
20
 
 
21
class _LoopbackQueue(object):
 
22
    """
 
23
    Trivial wrapper around a list to give it an interface like a queue, which
 
24
    the addition of also sending notifications by way of a Deferred whenever
 
25
    the list has something added to it.
 
26
    """
 
27
 
 
28
    _notificationDeferred = None
 
29
    disconnect = False
 
30
 
 
31
    def __init__(self):
 
32
        self._queue = []
 
33
 
 
34
 
 
35
    def put(self, v):
 
36
        self._queue.append(v)
 
37
        if self._notificationDeferred is not None:
 
38
            d, self._notificationDeferred = self._notificationDeferred, None
 
39
            d.callback(None)
 
40
 
 
41
 
 
42
    def __nonzero__(self):
 
43
        return bool(self._queue)
 
44
 
 
45
 
 
46
    def get(self):
 
47
        return self._queue.pop(0)
 
48
 
 
49
 
 
50
 
 
51
class _LoopbackAddress(object):
 
52
    implements(IAddress)
 
53
 
 
54
 
 
55
class _LoopbackTransport(object):
 
56
    implements(interfaces.ITransport, interfaces.IConsumer)
 
57
 
 
58
    disconnecting = False
 
59
    producer = None
 
60
 
 
61
    # ITransport
 
62
    def __init__(self, q):
 
63
        self.q = q
 
64
 
 
65
    def write(self, bytes):
 
66
        self.q.put(bytes)
 
67
 
 
68
    def writeSequence(self, iovec):
 
69
        self.q.put(''.join(iovec))
 
70
 
 
71
    def loseConnection(self):
 
72
        self.q.disconnect = True
 
73
        self.q.put('')
 
74
 
 
75
    def getPeer(self):
 
76
        return _LoopbackAddress()
 
77
 
 
78
    def getHost(self):
 
79
        return _LoopbackAddress()
 
80
 
 
81
    # IConsumer
 
82
    def registerProducer(self, producer, streaming):
 
83
        assert self.producer is None
 
84
        self.producer = producer
 
85
        self.streamingProducer = streaming
 
86
        self._pollProducer()
 
87
 
 
88
    def unregisterProducer(self):
 
89
        assert self.producer is not None
 
90
        self.producer = None
 
91
 
 
92
    def _pollProducer(self):
 
93
        if self.producer is not None and not self.streamingProducer:
 
94
            self.producer.resumeProducing()
 
95
 
 
96
 
 
97
 
 
98
def loopbackAsync(server, client):
 
99
    """
 
100
    Establish a connection between C{server} and C{client} then transfer data
 
101
    between them until the connection is closed. This is often useful for
 
102
    testing a protocol.
 
103
 
 
104
    @param server: The protocol instance representing the server-side of this
 
105
    connection.
 
106
    
 
107
    @param client: The protocol instance representing the client-side of this
 
108
    connection.
 
109
 
 
110
    @return: A L{Deferred} which fires when the connection has been closed and
 
111
    both sides have received notification of this.
 
112
    """
 
113
    serverToClient = _LoopbackQueue()
 
114
    clientToServer = _LoopbackQueue()
 
115
 
 
116
    server.makeConnection(_LoopbackTransport(serverToClient))
 
117
    client.makeConnection(_LoopbackTransport(clientToServer))
 
118
 
 
119
    return _loopbackAsyncBody(server, serverToClient, client, clientToServer)
 
120
 
 
121
 
 
122
 
 
123
def _loopbackAsyncBody(server, serverToClient, client, clientToServer):
 
124
    """
 
125
    Transfer bytes from the output queue of each protocol to the input of the other.
 
126
 
 
127
    @param server: The protocol instance representing the server-side of this
 
128
    connection.
 
129
 
 
130
    @param serverToClient: The L{_LoopbackQueue} holding the server's output.
 
131
 
 
132
    @param client: The protocol instance representing the client-side of this
 
133
    connection.
 
134
 
 
135
    @param clientToServer: The L{_LoopbackQueue} holding the client's output.
 
136
 
 
137
    @return: A L{Deferred} which fires when the connection has been closed and
 
138
    both sides have received notification of this.
 
139
    """
 
140
    def pump(source, q, target):
 
141
        sent = False
 
142
        while q:
 
143
            sent = True
 
144
            bytes = q.get()
 
145
            if bytes:
 
146
                target.dataReceived(bytes)
 
147
 
 
148
        # A write buffer has now been emptied.  Give any producer on that side
 
149
        # an opportunity to produce more data.
 
150
        source.transport._pollProducer()
 
151
 
 
152
        return sent
 
153
 
 
154
    while 1:
 
155
        disconnect = clientSent = serverSent = False
 
156
 
 
157
        # Deliver the data which has been written.
 
158
        serverSent = pump(server, serverToClient, client)
 
159
        clientSent = pump(client, clientToServer, server)
 
160
 
 
161
        if not clientSent and not serverSent:
 
162
            # Neither side wrote any data.  Wait for some new data to be added
 
163
            # before trying to do anything further.
 
164
            d = clientToServer._notificationDeferred = serverToClient._notificationDeferred = defer.Deferred()
 
165
            d.addCallback(_loopbackAsyncContinue, server, serverToClient, client, clientToServer)
 
166
            return d
 
167
        if serverToClient.disconnect:
 
168
            # The server wants to drop the connection.  Flush any remaining
 
169
            # data it has.
 
170
            disconnect = True
 
171
            pump(server, serverToClient, client)
 
172
        elif clientToServer.disconnect:
 
173
            # The client wants to drop the connection.  Flush any remaining
 
174
            # data it has.
 
175
            disconnect = True
 
176
            pump(client, clientToServer, server)
 
177
        if disconnect:
 
178
            # Someone wanted to disconnect, so okay, the connection is gone.
 
179
            server.connectionLost(failure.Failure(main.CONNECTION_DONE))
 
180
            client.connectionLost(failure.Failure(main.CONNECTION_DONE))
 
181
            return defer.succeed(None)
 
182
 
 
183
 
 
184
 
 
185
def _loopbackAsyncContinue(ignored, server, serverToClient, client, clientToServer):
 
186
    # Clear the Deferred from each message queue, since it has already fired
 
187
    # and cannot be used again.
 
188
    clientToServer._notificationDeferred = serverToClient._notificationDeferred = None
 
189
 
 
190
    # Push some more bytes around.
 
191
    return _loopbackAsyncBody(server, serverToClient, client, clientToServer)
 
192
 
 
193
 
19
194
 
20
195
class LoopbackRelay:
21
196
 
75
250
 
76
251
def loopback(server, client, logFile=None):
77
252
    """Run session between server and client.
 
253
    DEPRECATED in Twisted 2.5. Use loopbackAsync instead.
78
254
    """
 
255
    import warnings
 
256
    warnings.warn('loopback() is deprecated (since Twisted 2.5). '
 
257
                  'Use loopbackAsync() instead.',
 
258
                  stacklevel=2, category=DeprecationWarning)
79
259
    from twisted.internet import reactor
80
260
    serverToClient = LoopbackRelay(client, logFile)
81
261
    clientToServer = LoopbackRelay(server, logFile)
87
267
        clientToServer.clearBuffer()
88
268
        if serverToClient.shouldLose:
89
269
            serverToClient.clearBuffer()
 
270
            server.connectionLost(failure.Failure(main.CONNECTION_DONE))
90
271
            break
91
272
        elif clientToServer.shouldLose:
 
273
            client.connectionLost(failure.Failure(main.CONNECTION_DONE))
92
274
            break
93
 
    client.connectionLost(failure.Failure(main.CONNECTION_DONE))
94
 
    server.connectionLost(failure.Failure(main.CONNECTION_DONE))
95
275
    reactor.iterate() # last gasp before I go away
96
276
 
97
277