1
"""Stream-related things."""
3
__all__ = ['StreamReader', 'StreamReaderProtocol',
4
'open_connection', 'start_server',
11
from . import protocols
15
_DEFAULT_LIMIT = 2**16
19
def open_connection(host=None, port=None, *,
20
loop=None, limit=_DEFAULT_LIMIT, **kwds):
21
"""A wrapper for create_connection() returning a (reader, writer) pair.
23
The reader returned is a StreamReader instance; the writer is a
26
The arguments are all the usual arguments to create_connection()
27
except protocol_factory; most common are positional host and port,
28
with various optional keyword arguments following.
30
Additional optional keyword arguments are loop (to set the event loop
31
instance to use) and limit (to set the buffer limit passed to the
34
(If you want to customize the StreamReader and/or
35
StreamReaderProtocol classes, just copy the code -- there's
36
really nothing special here except some convenience.)
39
loop = events.get_event_loop()
40
reader = StreamReader(limit=limit, loop=loop)
41
protocol = StreamReaderProtocol(reader)
42
transport, _ = yield from loop.create_connection(
43
lambda: protocol, host, port, **kwds)
44
writer = StreamWriter(transport, protocol, reader, loop)
49
def start_server(client_connected_cb, host=None, port=None, *,
50
loop=None, limit=_DEFAULT_LIMIT, **kwds):
51
"""Start a socket server, call back for each client connected.
53
The first parameter, `client_connected_cb`, takes two parameters:
54
client_reader, client_writer. client_reader is a StreamReader
55
object, while client_writer is a StreamWriter object. This
56
parameter can either be a plain callback function or a coroutine;
57
if it is a coroutine, it will be automatically converted into a
60
The rest of the arguments are all the usual arguments to
61
loop.create_server() except protocol_factory; most common are
62
positional host and port, with various optional keyword arguments
63
following. The return value is the same as loop.create_server().
65
Additional optional keyword arguments are loop (to set the event loop
66
instance to use) and limit (to set the buffer limit passed to the
69
The return value is the same as loop.create_server(), i.e. a
70
Server object which can be used to stop the service.
73
loop = events.get_event_loop()
76
reader = StreamReader(limit=limit, loop=loop)
77
protocol = StreamReaderProtocol(reader, client_connected_cb,
81
return (yield from loop.create_server(factory, host, port, **kwds))
84
class StreamReaderProtocol(protocols.Protocol):
85
"""Trivial helper class to adapt between Protocol and StreamReader.
87
(This is a helper class instead of making StreamReader itself a
88
Protocol subclass, because the StreamReader has other potential
89
uses, and to prevent the user of the StreamReader to accidentally
90
call inappropriate methods of the protocol.)
93
def __init__(self, stream_reader, client_connected_cb=None, loop=None):
94
self._stream_reader = stream_reader
95
self._stream_writer = None
96
self._drain_waiter = None
98
self._client_connected_cb = client_connected_cb
99
self._loop = loop # May be None; we may never need it.
101
def connection_made(self, transport):
102
self._stream_reader.set_transport(transport)
103
if self._client_connected_cb is not None:
104
self._stream_writer = StreamWriter(transport, self,
107
res = self._client_connected_cb(self._stream_reader,
109
if tasks.iscoroutine(res):
110
tasks.Task(res, loop=self._loop)
112
def connection_lost(self, exc):
114
self._stream_reader.feed_eof()
116
self._stream_reader.set_exception(exc)
117
# Also wake up the writing side.
119
waiter = self._drain_waiter
120
if waiter is not None:
121
self._drain_waiter = None
122
if not waiter.done():
124
waiter.set_result(None)
126
waiter.set_exception(exc)
128
def data_received(self, data):
129
self._stream_reader.feed_data(data)
131
def eof_received(self):
132
self._stream_reader.feed_eof()
134
def pause_writing(self):
135
assert not self._paused
138
def resume_writing(self):
141
waiter = self._drain_waiter
142
if waiter is not None:
143
self._drain_waiter = None
144
if not waiter.done():
145
waiter.set_result(None)
149
"""Wraps a Transport.
151
This exposes write(), writelines(), [can_]write_eof(),
152
get_extra_info() and close(). It adds drain() which returns an
153
optional Future on which you can wait for flow control. It also
154
adds a transport attribute which references the Transport
158
def __init__(self, transport, protocol, reader, loop):
159
self._transport = transport
160
self._protocol = protocol
161
self._reader = reader
166
return self._transport
168
def write(self, data):
169
self._transport.write(data)
171
def writelines(self, data):
172
self._transport.writelines(data)
175
return self._transport.write_eof()
177
def can_write_eof(self):
178
return self._transport.can_write_eof()
181
return self._transport.close()
183
def get_extra_info(self, name, default=None):
184
return self._transport.get_extra_info(name, default)
187
"""This method has an unusual return value.
189
The intended use is to write
194
When there's nothing to wait for, drain() returns (), and the
195
yield-from continues immediately. When the transport buffer
196
is full (the protocol is paused), drain() creates and returns
197
a Future and the yield-from will block until that Future is
198
completed, which will happen when the buffer is (partially)
199
drained and the protocol is resumed.
201
if self._reader._exception is not None:
202
raise self._writer._exception
203
if self._transport._conn_lost: # Uses private variable.
204
raise ConnectionResetError('Connection lost')
205
if not self._protocol._paused:
207
waiter = self._protocol._drain_waiter
208
assert waiter is None or waiter.cancelled()
209
waiter = futures.Future(loop=self._loop)
210
self._protocol._drain_waiter = waiter
216
def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
217
# The line length limit is a security feature;
218
# it also doubles as half the buffer limit.
221
loop = events.get_event_loop()
223
self._buffer = collections.deque() # Deque of bytes objects.
224
self._byte_count = 0 # Bytes in buffer.
225
self._eof = False # Whether we're done.
226
self._waiter = None # A future.
227
self._exception = None
228
self._transport = None
232
return self._exception
234
def set_exception(self, exc):
235
self._exception = exc
237
waiter = self._waiter
238
if waiter is not None:
240
if not waiter.cancelled():
241
waiter.set_exception(exc)
243
def set_transport(self, transport):
244
assert self._transport is None, 'Transport already set'
245
self._transport = transport
247
def _maybe_resume_transport(self):
248
if self._paused and self._byte_count <= self._limit:
250
self._transport.resume_reading()
254
waiter = self._waiter
255
if waiter is not None:
257
if not waiter.cancelled():
258
waiter.set_result(True)
260
def feed_data(self, data):
264
self._buffer.append(data)
265
self._byte_count += len(data)
267
waiter = self._waiter
268
if waiter is not None:
270
if not waiter.cancelled():
271
waiter.set_result(False)
273
if (self._transport is not None and
275
self._byte_count > 2*self._limit):
277
self._transport.pause_reading()
278
except NotImplementedError:
279
# The transport can't be paused.
280
# We'll just have to buffer all data.
281
# Forget the transport so we don't keep trying.
282
self._transport = None
288
if self._exception is not None:
289
raise self._exception
296
while self._buffer and not_enough:
297
data = self._buffer.popleft()
298
ichar = data.find(b'\n')
301
parts_size += len(data)
304
head, tail = data[:ichar], data[ichar:]
306
self._buffer.appendleft(tail)
309
parts_size += len(head)
311
if parts_size > self._limit:
312
self._byte_count -= parts_size
313
self._maybe_resume_transport()
314
raise ValueError('Line is too long')
320
assert self._waiter is None
321
self._waiter = futures.Future(loop=self._loop)
323
yield from self._waiter
327
line = b''.join(parts)
328
self._byte_count -= parts_size
329
self._maybe_resume_transport()
334
def read(self, n=-1):
335
if self._exception is not None:
336
raise self._exception
343
assert not self._waiter
344
self._waiter = futures.Future(loop=self._loop)
346
yield from self._waiter
350
if not self._byte_count and not self._eof:
351
assert not self._waiter
352
self._waiter = futures.Future(loop=self._loop)
354
yield from self._waiter
358
if n < 0 or self._byte_count <= n:
359
data = b''.join(self._buffer)
362
self._maybe_resume_transport()
367
while self._buffer and parts_bytes < n:
368
data = self._buffer.popleft()
369
data_bytes = len(data)
370
if n < parts_bytes + data_bytes:
371
data_bytes = n - parts_bytes
372
data, rest = data[:data_bytes], data[data_bytes:]
373
self._buffer.appendleft(rest)
376
parts_bytes += data_bytes
377
self._byte_count -= data_bytes
378
self._maybe_resume_transport()
380
return b''.join(parts)
383
def readexactly(self, n):
384
if self._exception is not None:
385
raise self._exception
390
while self._byte_count < n and not self._eof:
391
assert not self._waiter
392
self._waiter = futures.Future(loop=self._loop)
394
yield from self._waiter
398
return (yield from self.read(n))