~certify-web-dev/twisted/certify-staging

« back to all changes in this revision

Viewing changes to twisted/protocols/loopback.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2010-01-02 19:38:17 UTC
  • mfrom: (2.2.4 sid)
  • Revision ID: james.westby@ubuntu.com-20100102193817-jphp464ppwh7dulg
Tags: 9.0.0-1
* python-twisted: Depend on the python-twisted-* 9.0 packages.
* python-twisted: Depend on python-zope.interface only. Closes: #557781.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# -*- test-case-name: twisted.test.test_loopback -*-
2
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
3
# See LICENSE for details.
4
4
 
5
 
 
6
5
"""
7
6
Testing support for protocols -- loopback between client and server.
8
7
"""
71
70
 
72
71
    def loseConnection(self):
73
72
        self.q.disconnect = True
74
 
        self.q.put('')
 
73
        self.q.put(None)
75
74
 
76
75
    def getPeer(self):
77
76
        return _LoopbackAddress()
96
95
 
97
96
 
98
97
 
99
 
def loopbackAsync(server, client):
 
98
def identityPumpPolicy(queue, target):
 
99
    """
 
100
    L{identityPumpPolicy} is a policy which delivers each chunk of data written
 
101
    to the given queue as-is to the target.
 
102
 
 
103
    This isn't a particularly realistic policy.
 
104
 
 
105
    @see: L{loopbackAsync}
 
106
    """
 
107
    while queue:
 
108
        bytes = queue.get()
 
109
        if bytes is None:
 
110
            break
 
111
        target.dataReceived(bytes)
 
112
 
 
113
 
 
114
 
 
115
def collapsingPumpPolicy(queue, target):
 
116
    """
 
117
    L{collapsingPumpPolicy} is a policy which collapses all outstanding chunks
 
118
    into a single string and delivers it to the target.
 
119
 
 
120
    @see: L{loopbackAsync}
 
121
    """
 
122
    bytes = []
 
123
    while queue:
 
124
        chunk = queue.get()
 
125
        if chunk is None:
 
126
            break
 
127
        bytes.append(chunk)
 
128
    if bytes:
 
129
        target.dataReceived(''.join(bytes))
 
130
 
 
131
 
 
132
 
 
133
def loopbackAsync(server, client, pumpPolicy=identityPumpPolicy):
100
134
    """
101
135
    Establish a connection between C{server} and C{client} then transfer data
102
136
    between them until the connection is closed. This is often useful for
108
142
    @param client: The protocol instance representing the client-side of this
109
143
        connection.
110
144
 
 
145
    @param pumpPolicy: When either C{server} or C{client} writes to its
 
146
        transport, the string passed in is added to a queue of data for the
 
147
        other protocol.  Eventually, C{pumpPolicy} will be called with one such
 
148
        queue and the corresponding protocol object.  The pump policy callable
 
149
        is responsible for emptying the queue and passing the strings it
 
150
        contains to the given protocol's C{dataReceived} method.  The signature
 
151
        of C{pumpPolicy} is C{(queue, protocol)}.  C{queue} is an object with a
 
152
        C{get} method which will return the next string written to the
 
153
        transport, or C{None} if the transport has been disconnected, and which
 
154
        evaluates to C{True} if and only if there are more items to be
 
155
        retrieved via C{get}.
 
156
 
111
157
    @return: A L{Deferred} which fires when the connection has been closed and
112
158
        both sides have received notification of this.
113
159
    """
117
163
    server.makeConnection(_LoopbackTransport(serverToClient))
118
164
    client.makeConnection(_LoopbackTransport(clientToServer))
119
165
 
120
 
    return _loopbackAsyncBody(server, serverToClient, client, clientToServer)
121
 
 
122
 
 
123
 
 
124
 
def _loopbackAsyncBody(server, serverToClient, client, clientToServer):
 
166
    return _loopbackAsyncBody(
 
167
        server, serverToClient, client, clientToServer, pumpPolicy)
 
168
 
 
169
 
 
170
 
 
171
def _loopbackAsyncBody(server, serverToClient, client, clientToServer,
 
172
                       pumpPolicy):
125
173
    """
126
174
    Transfer bytes from the output queue of each protocol to the input of the other.
127
175
 
135
183
 
136
184
    @param clientToServer: The L{_LoopbackQueue} holding the client's output.
137
185
 
 
186
    @param pumpPolicy: See L{loopbackAsync}.
 
187
 
138
188
    @return: A L{Deferred} which fires when the connection has been closed and
139
 
    both sides have received notification of this.
 
189
        both sides have received notification of this.
140
190
    """
141
191
    def pump(source, q, target):
142
192
        sent = False
143
 
        while q:
 
193
        if q:
 
194
            pumpPolicy(q, target)
144
195
            sent = True
145
 
            bytes = q.get()
146
 
            if bytes:
147
 
                target.dataReceived(bytes)
148
 
 
149
 
        # A write buffer has now been emptied.  Give any producer on that side
150
 
        # an opportunity to produce more data.
151
 
        source.transport._pollProducer()
 
196
        if sent and not q:
 
197
            # A write buffer has now been emptied.  Give any producer on that
 
198
            # side an opportunity to produce more data.
 
199
            source.transport._pollProducer()
152
200
 
153
201
        return sent
154
202
 
162
210
        if not clientSent and not serverSent:
163
211
            # Neither side wrote any data.  Wait for some new data to be added
164
212
            # before trying to do anything further.
165
 
            d = clientToServer._notificationDeferred = serverToClient._notificationDeferred = defer.Deferred()
166
 
            d.addCallback(_loopbackAsyncContinue, server, serverToClient, client, clientToServer)
 
213
            d = defer.Deferred()
 
214
            clientToServer._notificationDeferred = d
 
215
            serverToClient._notificationDeferred = d
 
216
            d.addCallback(
 
217
                _loopbackAsyncContinue,
 
218
                server, serverToClient, client, clientToServer, pumpPolicy)
167
219
            return d
168
220
        if serverToClient.disconnect:
169
221
            # The server wants to drop the connection.  Flush any remaining
183
235
 
184
236
 
185
237
 
186
 
def _loopbackAsyncContinue(ignored, server, serverToClient, client, clientToServer):
 
238
def _loopbackAsyncContinue(ignored, server, serverToClient, client,
 
239
                           clientToServer, pumpPolicy):
187
240
    # Clear the Deferred from each message queue, since it has already fired
188
241
    # and cannot be used again.
189
 
    clientToServer._notificationDeferred = serverToClient._notificationDeferred = None
 
242
    clientToServer._notificationDeferred = None
 
243
    serverToClient._notificationDeferred = None
190
244
 
191
245
    # Schedule some more byte-pushing to happen.  This isn't done
192
246
    # synchronously because no actual transport can re-enter dataReceived as
195
249
    from twisted.internet import reactor
196
250
    return deferLater(
197
251
        reactor, 0,
198
 
        _loopbackAsyncBody, server, serverToClient, client, clientToServer)
 
252
        _loopbackAsyncBody,
 
253
        server, serverToClient, client, clientToServer, pumpPolicy)
199
254
 
200
255
 
201
256