3
# Copyright 2009 Facebook
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
# not use this file except in compliance with the License. You may obtain
7
# a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
# License for the specific language governing permissions and limitations
17
"""A utility class to write to and read from a non-blocking socket."""
24
_log = logging.getLogger('tornado.iostream')
26
class IOStream(object):
27
"""A utility class to write to and read from a non-blocking socket.
29
We support three methods: write(), read_until(), and read_bytes().
30
All of the methods take callbacks (since writing and reading are
31
non-blocking and asynchronous). read_until() reads the socket until
32
a given delimiter, and read_bytes() reads until a specified number
33
of bytes have been read from the socket.
35
A very simple (and broken) HTTP client using this class:
41
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
42
s.connect(("friendfeed.com", 80))
47
for line in data.split("\r\n"):
48
parts = line.split(":")
50
headers[parts[0].strip()] = parts[1].strip()
51
stream.read_bytes(int(headers["Content-Length"]), on_body)
56
ioloop.IOLoop.instance().stop()
58
stream.write("GET / HTTP/1.0\r\n\r\n")
59
stream.read_until("\r\n\r\n", on_headers)
60
ioloop.IOLoop.instance().start()
63
def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
64
read_chunk_size=4096):
66
self.socket.setblocking(False)
67
self.io_loop = io_loop or ioloop.IOLoop.instance()
68
self.max_buffer_size = max_buffer_size
69
self.read_chunk_size = read_chunk_size
70
self._read_buffer = ""
71
self._write_buffer = ""
72
self._read_delimiter = None
73
self._read_bytes = None
74
self._read_callback = None
75
self._write_callback = None
76
self._close_callback = None
77
self._state = self.io_loop.ERROR
78
self.io_loop.add_handler(
79
self.socket.fileno(), self._handle_events, self._state)
81
def read_until(self, delimiter, callback):
82
"""Call callback when we read the given delimiter."""
83
assert not self._read_callback, "Already reading"
84
loc = self._read_buffer.find(delimiter)
86
callback(self._consume(loc + len(delimiter)))
89
self._read_delimiter = delimiter
90
self._read_callback = callback
91
self._add_io_state(self.io_loop.READ)
93
def read_bytes(self, num_bytes, callback):
94
"""Call callback when we read the given number of bytes."""
95
assert not self._read_callback, "Already reading"
96
if len(self._read_buffer) >= num_bytes:
97
callback(self._consume(num_bytes))
100
self._read_bytes = num_bytes
101
self._read_callback = callback
102
self._add_io_state(self.io_loop.READ)
104
def write(self, data, callback=None):
105
"""Write the given data to this stream.
107
If callback is given, we call it when all of the buffered write
108
data has been successfully written to the stream. If there was
109
previously buffered write data and an old write callback, that
110
callback is simply overwritten with this new callback.
113
self._write_buffer += data
114
self._add_io_state(self.io_loop.WRITE)
115
self._write_callback = callback
117
def set_close_callback(self, callback):
118
"""Call the given callback when the stream is closed."""
119
self._close_callback = callback
122
"""Close this stream."""
123
if self.socket is not None:
124
self.io_loop.remove_handler(self.socket.fileno())
127
if self._close_callback: self._close_callback()
130
"""Returns true if we are currently reading from the stream."""
131
return self._read_callback is not None
134
"""Returns true if we are currently writing to the stream."""
135
return len(self._write_buffer) > 0
138
return self.socket is None
140
def _handle_events(self, fd, events):
142
_log.warning("Got events for closed stream %d", fd)
144
if events & self.io_loop.READ:
148
if events & self.io_loop.WRITE:
152
if events & self.io_loop.ERROR:
155
state = self.io_loop.ERROR
156
if self._read_delimiter or self._read_bytes:
157
state |= self.io_loop.READ
158
if self._write_buffer:
159
state |= self.io_loop.WRITE
160
if state != self._state:
162
self.io_loop.update_handler(self.socket.fileno(), self._state)
164
def _handle_read(self):
166
chunk = self.socket.recv(self.read_chunk_size)
167
except socket.error, e:
168
if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
171
_log.warning("Read error on %d: %s",
172
self.socket.fileno(), e)
178
self._read_buffer += chunk
179
if len(self._read_buffer) >= self.max_buffer_size:
180
_log.error("Reached maximum read buffer size")
184
if len(self._read_buffer) >= self._read_bytes:
185
num_bytes = self._read_bytes
186
callback = self._read_callback
187
self._read_callback = None
188
self._read_bytes = None
189
callback(self._consume(num_bytes))
190
elif self._read_delimiter:
191
loc = self._read_buffer.find(self._read_delimiter)
193
callback = self._read_callback
194
delimiter_len = len(self._read_delimiter)
195
self._read_callback = None
196
self._read_delimiter = None
197
callback(self._consume(loc + delimiter_len))
199
def _handle_write(self):
200
while self._write_buffer:
202
num_bytes = self.socket.send(self._write_buffer)
203
self._write_buffer = self._write_buffer[num_bytes:]
204
except socket.error, e:
205
if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
208
_log.warning("Write error on %d: %s",
209
self.socket.fileno(), e)
212
if not self._write_buffer and self._write_callback:
213
callback = self._write_callback
214
self._write_callback = None
217
def _consume(self, loc):
218
result = self._read_buffer[:loc]
219
self._read_buffer = self._read_buffer[loc:]
222
def _check_closed(self):
224
raise IOError("Stream is closed")
226
def _add_io_state(self, state):
227
if not self._state & state:
228
self._state = self._state | state
229
self.io_loop.update_handler(self.socket.fileno(), self._state)