~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/tornado/tornado/ioloop.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
#
 
3
# Copyright 2009 Facebook
 
4
#
 
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
6
# not use this file except in compliance with the License. You may obtain
 
7
# a copy of the License at
 
8
#
 
9
#     http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
# Unless required by applicable law or agreed to in writing, software
 
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
14
# License for the specific language governing permissions and limitations
 
15
# under the License.
 
16
 
 
17
"""A level-triggered I/O loop for non-blocking sockets."""
 
18
 
 
19
import bisect
 
20
import errno
 
21
import os
 
22
import logging
 
23
import select
 
24
import time
 
25
 
 
26
try:
 
27
    import fcntl
 
28
except ImportError:
 
29
    if os.name == 'nt':
 
30
        import win32_support
 
31
        import win32_support as fcntl
 
32
    else:
 
33
        raise
 
34
 
 
35
_log = logging.getLogger("tornado.ioloop")
 
36
 
 
37
class IOLoop(object):
 
38
    """A level-triggered I/O loop.
 
39
 
 
40
    We use epoll if it is available, or else we fall back on select(). If
 
41
    you are implementing a system that needs to handle 1000s of simultaneous
 
42
    connections, you should use Linux and either compile our epoll module or
 
43
    use Python 2.6+ to get epoll support.
 
44
 
 
45
    Example usage for a simple TCP server:
 
46
 
 
47
        import errno
 
48
        import functools
 
49
        import ioloop
 
50
        import socket
 
51
 
 
52
        def connection_ready(sock, fd, events):
 
53
            while True:
 
54
                try:
 
55
                    connection, address = sock.accept()
 
56
                except socket.error, e:
 
57
                    if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
 
58
                        raise
 
59
                    return
 
60
                connection.setblocking(0)
 
61
                handle_connection(connection, address)
 
62
 
 
63
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
 
64
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
65
        sock.setblocking(0)
 
66
        sock.bind(("", port))
 
67
        sock.listen(128)
 
68
 
 
69
        io_loop = ioloop.IOLoop.instance()
 
70
        callback = functools.partial(connection_ready, sock)
 
71
        io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
 
72
        io_loop.start()
 
73
 
 
74
    """
 
75
    # Constants from the epoll module
 
76
    _EPOLLIN = 0x001
 
77
    _EPOLLPRI = 0x002
 
78
    _EPOLLOUT = 0x004
 
79
    _EPOLLERR = 0x008
 
80
    _EPOLLHUP = 0x010
 
81
    _EPOLLRDHUP = 0x2000
 
82
    _EPOLLONESHOT = (1 << 30)
 
83
    _EPOLLET = (1 << 31)
 
84
 
 
85
    # Our events map exactly to the epoll events
 
86
    NONE = 0
 
87
    READ = _EPOLLIN
 
88
    WRITE = _EPOLLOUT
 
89
    ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP
 
90
 
 
91
    def __init__(self, impl=None):
 
92
        self._impl = impl or _poll()
 
93
        if hasattr(self._impl, 'fileno'):
 
94
            self._set_close_exec(self._impl.fileno())
 
95
        self._handlers = {}
 
96
        self._events = {}
 
97
        self._callbacks = set()
 
98
        self._timeouts = []
 
99
        self._running = False
 
100
        self._stopped = False
 
101
 
 
102
        # Create a pipe that we send bogus data to when we want to wake
 
103
        # the I/O loop when it is idle
 
104
        if os.name != 'nt':
 
105
            r, w = os.pipe()
 
106
            self._set_nonblocking(r)
 
107
            self._set_nonblocking(w)
 
108
            self._set_close_exec(r)
 
109
            self._set_close_exec(w)
 
110
            self._waker_reader = os.fdopen(r, "r", 0)
 
111
            self._waker_writer = os.fdopen(w, "w", 0)
 
112
        else:
 
113
            self._waker_reader = self._waker_writer = win32_support.Pipe()
 
114
            r = self._waker_writer.reader_fd
 
115
        self.add_handler(r, self._read_waker, self.READ)
 
116
 
 
117
    @classmethod
 
118
    def instance(cls):
 
119
        """Returns a global IOLoop instance.
 
120
 
 
121
        Most single-threaded applications have a single, global IOLoop.
 
122
        Use this method instead of passing around IOLoop instances
 
123
        throughout your code.
 
124
 
 
125
        A common pattern for classes that depend on IOLoops is to use
 
126
        a default argument to enable programs with multiple IOLoops
 
127
        but not require the argument for simpler applications:
 
128
 
 
129
            class MyClass(object):
 
130
                def __init__(self, io_loop=None):
 
131
                    self.io_loop = io_loop or IOLoop.instance()
 
132
        """
 
133
        if not hasattr(cls, "_instance"):
 
134
            cls._instance = cls()
 
135
        return cls._instance
 
136
 
 
137
    @classmethod
 
138
    def initialized(cls):
 
139
        return hasattr(cls, "_instance")
 
140
 
 
141
    def add_handler(self, fd, handler, events):
 
142
        """Registers the given handler to receive the given events for fd."""
 
143
        self._handlers[fd] = handler
 
144
        self._impl.register(fd, events | self.ERROR)
 
145
 
 
146
    def update_handler(self, fd, events):
 
147
        """Changes the events we listen for fd."""
 
148
        self._impl.modify(fd, events | self.ERROR)
 
149
 
 
150
    def remove_handler(self, fd):
 
151
        """Stop listening for events on fd."""
 
152
        self._handlers.pop(fd, None)
 
153
        self._events.pop(fd, None)
 
154
        try:
 
155
            self._impl.unregister(fd)
 
156
        except (OSError, IOError):
 
157
            _log.debug("Error deleting fd from IOLoop", exc_info=True)
 
158
 
 
159
    def start(self):
 
160
        """Starts the I/O loop.
 
161
 
 
162
        The loop will run until one of the I/O handlers calls stop(), which
 
163
        will make the loop stop after the current event iteration completes.
 
164
        """
 
165
        if self._stopped:
 
166
            self._stopped = False
 
167
            return
 
168
        self._running = True
 
169
        while True:
 
170
            # Never use an infinite timeout here - it can stall epoll
 
171
            poll_timeout = 0.2
 
172
 
 
173
            # Prevent IO event starvation by delaying new callbacks
 
174
            # to the next iteration of the event loop.
 
175
            callbacks = list(self._callbacks)
 
176
            for callback in callbacks:
 
177
                # A callback can add or remove other callbacks
 
178
                if callback in self._callbacks:
 
179
                    self._callbacks.remove(callback)
 
180
                    self._run_callback(callback)
 
181
 
 
182
            if self._callbacks:
 
183
                poll_timeout = 0.0
 
184
 
 
185
            if self._timeouts:
 
186
                now = time.time()
 
187
                while self._timeouts and self._timeouts[0].deadline <= now:
 
188
                    timeout = self._timeouts.pop(0)
 
189
                    self._run_callback(timeout.callback)
 
190
                if self._timeouts:
 
191
                    milliseconds = self._timeouts[0].deadline - now
 
192
                    poll_timeout = min(milliseconds, poll_timeout)
 
193
 
 
194
            if not self._running:
 
195
                break
 
196
 
 
197
            try:
 
198
                event_pairs = self._impl.poll(poll_timeout)
 
199
            except Exception, e:
 
200
                if hasattr(e, 'errno') and e.errno == errno.EINTR:
 
201
                    _log.warning("Interrupted system call", exc_info=1)
 
202
                    continue
 
203
                else:
 
204
                    raise
 
205
 
 
206
            # Pop one fd at a time from the set of pending fds and run
 
207
            # its handler. Since that handler may perform actions on
 
208
            # other file descriptors, there may be reentrant calls to
 
209
            # this IOLoop that update self._events
 
210
            self._events.update(event_pairs)
 
211
            while self._events:
 
212
                fd, events = self._events.popitem()
 
213
                try:
 
214
                    self._handlers[fd](fd, events)
 
215
                except (KeyboardInterrupt, SystemExit):
 
216
                    raise
 
217
                except (OSError, IOError), e:
 
218
                    if e[0] == errno.EPIPE:
 
219
                        # Happens when the client closes the connection
 
220
                        pass
 
221
                    else:
 
222
                        _log.error("Exception in I/O handler for fd %d",
 
223
                                      fd, exc_info=True)
 
224
                except:
 
225
                    _log.error("Exception in I/O handler for fd %d",
 
226
                                  fd, exc_info=True)
 
227
        # reset the stopped flag so another start/stop pair can be issued
 
228
        self._stopped = False
 
229
 
 
230
    def stop(self):
 
231
        """Stop the loop after the current event loop iteration is complete.
 
232
        If the event loop is not currently running, the next call to start()
 
233
        will return immediately.
 
234
 
 
235
        To use asynchronous methods from otherwise-synchronous code (such as
 
236
        unit tests), you can start and stop the event loop like this:
 
237
          ioloop = IOLoop()
 
238
          async_method(ioloop=ioloop, callback=ioloop.stop)
 
239
          ioloop.start()
 
240
        ioloop.start() will return after async_method has run its callback,
 
241
        whether that callback was invoked before or after ioloop.start.
 
242
        """
 
243
        self._running = False
 
244
        self._stopped = True
 
245
        self._wake()
 
246
 
 
247
    def running(self):
 
248
        """Returns true if this IOLoop is currently running."""
 
249
        return self._running
 
250
 
 
251
    def add_timeout(self, deadline, callback):
 
252
        """Calls the given callback at the time deadline from the I/O loop."""
 
253
        timeout = _Timeout(deadline, callback)
 
254
        bisect.insort(self._timeouts, timeout)
 
255
        return timeout
 
256
 
 
257
    def remove_timeout(self, timeout):
 
258
        self._timeouts.remove(timeout)
 
259
 
 
260
    def add_callback(self, callback):
 
261
        """Calls the given callback on the next I/O loop iteration."""
 
262
        self._callbacks.add(callback)
 
263
        self._wake()
 
264
 
 
265
    def remove_callback(self, callback):
 
266
        """Removes the given callback from the next I/O loop iteration."""
 
267
        self._callbacks.remove(callback)
 
268
 
 
269
    def _wake(self):
 
270
        try:
 
271
            self._waker_writer.write("x")
 
272
        except IOError:
 
273
            pass
 
274
 
 
275
    def _run_callback(self, callback):
 
276
        try:
 
277
            callback()
 
278
        except (KeyboardInterrupt, SystemExit):
 
279
            raise
 
280
        except:
 
281
            self.handle_callback_exception(callback)
 
282
 
 
283
    def handle_callback_exception(self, callback):
 
284
        """This method is called whenever a callback run by the IOLoop
 
285
        throws an exception.
 
286
 
 
287
        By default simply logs the exception as an error.  Subclasses
 
288
        may override this method to customize reporting of exceptions.
 
289
 
 
290
        The exception itself is not passed explicitly, but is available
 
291
        in sys.exc_info.
 
292
        """
 
293
        _log.error("Exception in callback %r", callback, exc_info=True)
 
294
 
 
295
    def _read_waker(self, fd, events):
 
296
        try:
 
297
            while True:
 
298
                self._waker_reader.read()
 
299
        except IOError:
 
300
            pass
 
301
 
 
302
    def _set_nonblocking(self, fd):
 
303
        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
 
304
        fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
 
305
 
 
306
    def _set_close_exec(self, fd):
 
307
        flags = fcntl.fcntl(fd, fcntl.F_GETFD)
 
308
        fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
 
309
 
 
310
 
 
311
class _Timeout(object):
 
312
    """An IOLoop timeout, a UNIX timestamp and a callback"""
 
313
 
 
314
    # Reduce memory overhead when there are lots of pending callbacks
 
315
    __slots__ = ['deadline', 'callback']
 
316
 
 
317
    def __init__(self, deadline, callback):
 
318
        self.deadline = deadline
 
319
        self.callback = callback
 
320
 
 
321
    def __cmp__(self, other):
 
322
        return cmp((self.deadline, id(self.callback)),
 
323
                   (other.deadline, id(other.callback)))
 
324
 
 
325
 
 
326
class PeriodicCallback(object):
 
327
    """Schedules the given callback to be called periodically.
 
328
 
 
329
    The callback is called every callback_time milliseconds.
 
330
    """
 
331
    def __init__(self, callback, callback_time, io_loop=None):
 
332
        self.callback = callback
 
333
        self.callback_time = callback_time
 
334
        self.io_loop = io_loop or IOLoop.instance()
 
335
        self._running = True
 
336
 
 
337
    def start(self):
 
338
        timeout = time.time() + self.callback_time / 1000.0
 
339
        self.io_loop.add_timeout(timeout, self._run)
 
340
 
 
341
    def stop(self):
 
342
        self._running = False
 
343
 
 
344
    def _run(self):
 
345
        if not self._running: return
 
346
        try:
 
347
            self.callback()
 
348
        except (KeyboardInterrupt, SystemExit):
 
349
            raise
 
350
        except:
 
351
            _log.error("Error in periodic callback", exc_info=True)
 
352
        self.start()
 
353
 
 
354
 
 
355
class _EPoll(object):
 
356
    """An epoll-based event loop using our C module for Python 2.5 systems"""
 
357
    _EPOLL_CTL_ADD = 1
 
358
    _EPOLL_CTL_DEL = 2
 
359
    _EPOLL_CTL_MOD = 3
 
360
 
 
361
    def __init__(self):
 
362
        self._epoll_fd = epoll.epoll_create()
 
363
 
 
364
    def fileno(self):
 
365
        return self._epoll_fd
 
366
 
 
367
    def register(self, fd, events):
 
368
        epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_ADD, fd, events)
 
369
 
 
370
    def modify(self, fd, events):
 
371
        epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_MOD, fd, events)
 
372
 
 
373
    def unregister(self, fd):
 
374
        epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_DEL, fd, 0)
 
375
 
 
376
    def poll(self, timeout):
 
377
        return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))
 
378
 
 
379
 
 
380
class _KQueue(object):
 
381
    """A kqueue-based event loop for BSD/Mac systems."""
 
382
    def __init__(self):
 
383
        self._kqueue = select.kqueue()
 
384
        self._active = {}
 
385
 
 
386
    def fileno(self):
 
387
        return self._kqueue.fileno()
 
388
 
 
389
    def register(self, fd, events):
 
390
        self._control(fd, events, select.KQ_EV_ADD)
 
391
        self._active[fd] = events
 
392
 
 
393
    def modify(self, fd, events):
 
394
        self.unregister(fd)
 
395
        self.register(fd, events)
 
396
 
 
397
    def unregister(self, fd):
 
398
        events = self._active.pop(fd)
 
399
        self._control(fd, events, select.KQ_EV_DELETE)
 
400
 
 
401
    def _control(self, fd, events, flags):
 
402
        kevents = []
 
403
        if events & IOLoop.WRITE:
 
404
            kevents.append(select.kevent(
 
405
                    fd, filter=select.KQ_FILTER_WRITE, flags=flags))
 
406
        if events & IOLoop.READ or not kevents:
 
407
            # Always read when there is not a write
 
408
            kevents.append(select.kevent(
 
409
                    fd, filter=select.KQ_FILTER_READ, flags=flags))
 
410
        # Even though control() takes a list, it seems to return EINVAL
 
411
        # on Mac OS X (10.6) when there is more than one event in the list.
 
412
        for kevent in kevents:
 
413
            self._kqueue.control([kevent], 0)
 
414
 
 
415
    def poll(self, timeout):
 
416
        kevents = self._kqueue.control(None, 1000, timeout)
 
417
        events = {}
 
418
        for kevent in kevents:
 
419
            fd = kevent.ident
 
420
            flags = 0
 
421
            if kevent.filter == select.KQ_FILTER_READ:
 
422
                events[fd] = events.get(fd, 0) | IOLoop.READ
 
423
            if kevent.filter == select.KQ_FILTER_WRITE:
 
424
                events[fd] = events.get(fd, 0) | IOLoop.WRITE
 
425
            if kevent.flags & select.KQ_EV_ERROR:
 
426
                events[fd] = events.get(fd, 0) | IOLoop.ERROR
 
427
        return events.items()
 
428
 
 
429
 
 
430
class _Select(object):
 
431
    """A simple, select()-based IOLoop implementation for non-Linux systems"""
 
432
    def __init__(self):
 
433
        self.read_fds = set()
 
434
        self.write_fds = set()
 
435
        self.error_fds = set()
 
436
        self.fd_sets = (self.read_fds, self.write_fds, self.error_fds)
 
437
 
 
438
    def register(self, fd, events):
 
439
        if events & IOLoop.READ: self.read_fds.add(fd)
 
440
        if events & IOLoop.WRITE: self.write_fds.add(fd)
 
441
        if events & IOLoop.ERROR: self.error_fds.add(fd)
 
442
 
 
443
    def modify(self, fd, events):
 
444
        self.unregister(fd)
 
445
        self.register(fd, events)
 
446
 
 
447
    def unregister(self, fd):
 
448
        self.read_fds.discard(fd)
 
449
        self.write_fds.discard(fd)
 
450
        self.error_fds.discard(fd)
 
451
 
 
452
    def poll(self, timeout):
 
453
        readable, writeable, errors = select.select(
 
454
            self.read_fds, self.write_fds, self.error_fds, timeout)
 
455
        events = {}
 
456
        for fd in readable:
 
457
            events[fd] = events.get(fd, 0) | IOLoop.READ
 
458
        for fd in writeable:
 
459
            events[fd] = events.get(fd, 0) | IOLoop.WRITE
 
460
        for fd in errors:
 
461
            events[fd] = events.get(fd, 0) | IOLoop.ERROR
 
462
        return events.items()
 
463
 
 
464
 
 
465
# Choose a poll implementation. Use epoll if it is available, fall back to
 
466
# select() for non-Linux platforms
 
467
if hasattr(select, "epoll"):
 
468
    # Python 2.6+ on Linux
 
469
    _poll = select.epoll
 
470
elif hasattr(select, "kqueue"):
 
471
    # Python 2.6+ on BSD or Mac
 
472
    _poll = _KQueue
 
473
else:
 
474
    try:
 
475
        # Linux systems with our C module installed
 
476
        import epoll
 
477
        _poll = _EPoll
 
478
    except:
 
479
        # All other systems
 
480
        import sys
 
481
        if "linux" in sys.platform:
 
482
            _log.warning("epoll module not found; using select()")
 
483
        _poll = _Select