~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/tornado/tornado/iostream.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
#
 
3
# Copyright 2009 Facebook
 
4
#
 
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
6
# not use this file except in compliance with the License. You may obtain
 
7
# a copy of the License at
 
8
#
 
9
#     http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
# Unless required by applicable law or agreed to in writing, software
 
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
14
# License for the specific language governing permissions and limitations
 
15
# under the License.
 
16
 
 
17
"""A utility class to write to and read from a non-blocking socket."""
 
18
 
 
19
import errno
 
20
import ioloop
 
21
import logging
 
22
import socket
 
23
 
 
24
_log = logging.getLogger('tornado.iostream')
 
25
 
 
26
class IOStream(object):
 
27
    """A utility class to write to and read from a non-blocking socket.
 
28
 
 
29
    We support three methods: write(), read_until(), and read_bytes().
 
30
    All of the methods take callbacks (since writing and reading are
 
31
    non-blocking and asynchronous). read_until() reads the socket until
 
32
    a given delimiter, and read_bytes() reads until a specified number
 
33
    of bytes have been read from the socket.
 
34
 
 
35
    A very simple (and broken) HTTP client using this class:
 
36
 
 
37
        import ioloop
 
38
        import iostream
 
39
        import socket
 
40
 
 
41
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
 
42
        s.connect(("friendfeed.com", 80))
 
43
        stream = IOStream(s)
 
44
 
 
45
        def on_headers(data):
 
46
            headers = {}
 
47
            for line in data.split("\r\n"):
 
48
               parts = line.split(":")
 
49
               if len(parts) == 2:
 
50
                   headers[parts[0].strip()] = parts[1].strip()
 
51
            stream.read_bytes(int(headers["Content-Length"]), on_body)
 
52
 
 
53
        def on_body(data):
 
54
            print data
 
55
            stream.close()
 
56
            ioloop.IOLoop.instance().stop()
 
57
 
 
58
        stream.write("GET / HTTP/1.0\r\n\r\n")
 
59
        stream.read_until("\r\n\r\n", on_headers)
 
60
        ioloop.IOLoop.instance().start()
 
61
 
 
62
    """
 
63
    def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
 
64
                 read_chunk_size=4096):
 
65
        self.socket = socket
 
66
        self.socket.setblocking(False)
 
67
        self.io_loop = io_loop or ioloop.IOLoop.instance()
 
68
        self.max_buffer_size = max_buffer_size
 
69
        self.read_chunk_size = read_chunk_size
 
70
        self._read_buffer = ""
 
71
        self._write_buffer = ""
 
72
        self._read_delimiter = None
 
73
        self._read_bytes = None
 
74
        self._read_callback = None
 
75
        self._write_callback = None
 
76
        self._close_callback = None
 
77
        self._state = self.io_loop.ERROR
 
78
        self.io_loop.add_handler(
 
79
            self.socket.fileno(), self._handle_events, self._state)
 
80
 
 
81
    def read_until(self, delimiter, callback):
 
82
        """Call callback when we read the given delimiter."""
 
83
        assert not self._read_callback, "Already reading"
 
84
        loc = self._read_buffer.find(delimiter)
 
85
        if loc != -1:
 
86
            callback(self._consume(loc + len(delimiter)))
 
87
            return
 
88
        self._check_closed()
 
89
        self._read_delimiter = delimiter
 
90
        self._read_callback = callback
 
91
        self._add_io_state(self.io_loop.READ)
 
92
 
 
93
    def read_bytes(self, num_bytes, callback):
 
94
        """Call callback when we read the given number of bytes."""
 
95
        assert not self._read_callback, "Already reading"
 
96
        if len(self._read_buffer) >= num_bytes:
 
97
            callback(self._consume(num_bytes))
 
98
            return
 
99
        self._check_closed()
 
100
        self._read_bytes = num_bytes
 
101
        self._read_callback = callback
 
102
        self._add_io_state(self.io_loop.READ)
 
103
 
 
104
    def write(self, data, callback=None):
 
105
        """Write the given data to this stream.
 
106
 
 
107
        If callback is given, we call it when all of the buffered write
 
108
        data has been successfully written to the stream. If there was
 
109
        previously buffered write data and an old write callback, that
 
110
        callback is simply overwritten with this new callback.
 
111
        """
 
112
        self._check_closed()
 
113
        self._write_buffer += data
 
114
        self._add_io_state(self.io_loop.WRITE)
 
115
        self._write_callback = callback
 
116
 
 
117
    def set_close_callback(self, callback):
 
118
        """Call the given callback when the stream is closed."""
 
119
        self._close_callback = callback
 
120
 
 
121
    def close(self):
 
122
        """Close this stream."""
 
123
        if self.socket is not None:
 
124
            self.io_loop.remove_handler(self.socket.fileno())
 
125
            self.socket.close()
 
126
            self.socket = None
 
127
            if self._close_callback: self._close_callback()
 
128
 
 
129
    def reading(self):
 
130
        """Returns true if we are currently reading from the stream."""
 
131
        return self._read_callback is not None
 
132
 
 
133
    def writing(self):
 
134
        """Returns true if we are currently writing to the stream."""
 
135
        return len(self._write_buffer) > 0
 
136
 
 
137
    def closed(self):
 
138
        return self.socket is None
 
139
 
 
140
    def _handle_events(self, fd, events):
 
141
        if not self.socket:
 
142
            _log.warning("Got events for closed stream %d", fd)
 
143
            return
 
144
        if events & self.io_loop.READ:
 
145
            self._handle_read()
 
146
        if not self.socket:
 
147
            return
 
148
        if events & self.io_loop.WRITE:
 
149
            self._handle_write()
 
150
        if not self.socket:
 
151
            return
 
152
        if events & self.io_loop.ERROR:
 
153
            self.close()
 
154
            return
 
155
        state = self.io_loop.ERROR
 
156
        if self._read_delimiter or self._read_bytes:
 
157
            state |= self.io_loop.READ
 
158
        if self._write_buffer:
 
159
            state |= self.io_loop.WRITE
 
160
        if state != self._state:
 
161
            self._state = state
 
162
            self.io_loop.update_handler(self.socket.fileno(), self._state)
 
163
 
 
164
    def _handle_read(self):
 
165
        try:
 
166
            chunk = self.socket.recv(self.read_chunk_size)
 
167
        except socket.error, e:
 
168
            if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
 
169
                return
 
170
            else:
 
171
                _log.warning("Read error on %d: %s",
 
172
                                self.socket.fileno(), e)
 
173
                self.close()
 
174
                return
 
175
        if not chunk:
 
176
            self.close()
 
177
            return
 
178
        self._read_buffer += chunk
 
179
        if len(self._read_buffer) >= self.max_buffer_size:
 
180
            _log.error("Reached maximum read buffer size")
 
181
            self.close()
 
182
            return
 
183
        if self._read_bytes:
 
184
            if len(self._read_buffer) >= self._read_bytes:
 
185
                num_bytes = self._read_bytes
 
186
                callback = self._read_callback
 
187
                self._read_callback = None
 
188
                self._read_bytes = None
 
189
                callback(self._consume(num_bytes))
 
190
        elif self._read_delimiter:
 
191
            loc = self._read_buffer.find(self._read_delimiter)
 
192
            if loc != -1:
 
193
                callback = self._read_callback
 
194
                delimiter_len = len(self._read_delimiter)
 
195
                self._read_callback = None
 
196
                self._read_delimiter = None
 
197
                callback(self._consume(loc + delimiter_len))
 
198
 
 
199
    def _handle_write(self):
 
200
        while self._write_buffer:
 
201
            try:
 
202
                num_bytes = self.socket.send(self._write_buffer)
 
203
                self._write_buffer = self._write_buffer[num_bytes:]
 
204
            except socket.error, e:
 
205
                if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
 
206
                    break
 
207
                else:
 
208
                    _log.warning("Write error on %d: %s",
 
209
                                    self.socket.fileno(), e)
 
210
                    self.close()
 
211
                    return
 
212
        if not self._write_buffer and self._write_callback:
 
213
            callback = self._write_callback
 
214
            self._write_callback = None
 
215
            callback()
 
216
 
 
217
    def _consume(self, loc):
 
218
        result = self._read_buffer[:loc]
 
219
        self._read_buffer = self._read_buffer[loc:]
 
220
        return result
 
221
 
 
222
    def _check_closed(self):
 
223
        if not self.socket:
 
224
            raise IOError("Stream is closed")
 
225
 
 
226
    def _add_io_state(self, state):
 
227
        if not self._state & state:
 
228
            self._state = self._state | state
 
229
            self.io_loop.update_handler(self.socket.fileno(), self._state)