3
# Copyright 2009 Facebook
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
# not use this file except in compliance with the License. You may obtain
7
# a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
# License for the specific language governing permissions and limitations
17
"""A level-triggered I/O loop for non-blocking sockets."""
31
import win32_support as fcntl
35
_log = logging.getLogger("tornado.ioloop")
38
"""A level-triggered I/O loop.
40
We use epoll if it is available, or else we fall back on select(). If
41
you are implementing a system that needs to handle 1000s of simultaneous
42
connections, you should use Linux and either compile our epoll module or
43
use Python 2.6+ to get epoll support.
45
Example usage for a simple TCP server:
52
def connection_ready(sock, fd, events):
55
connection, address = sock.accept()
56
except socket.error, e:
57
if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
60
connection.setblocking(0)
61
handle_connection(connection, address)
63
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
64
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
69
io_loop = ioloop.IOLoop.instance()
70
callback = functools.partial(connection_ready, sock)
71
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
75
# Constants from the epoll module
82
_EPOLLONESHOT = (1 << 30)
85
# Our events map exactly to the epoll events
89
ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP
91
def __init__(self, impl=None):
92
self._impl = impl or _poll()
93
if hasattr(self._impl, 'fileno'):
94
self._set_close_exec(self._impl.fileno())
97
self._callbacks = set()
100
self._stopped = False
102
# Create a pipe that we send bogus data to when we want to wake
103
# the I/O loop when it is idle
106
self._set_nonblocking(r)
107
self._set_nonblocking(w)
108
self._set_close_exec(r)
109
self._set_close_exec(w)
110
self._waker_reader = os.fdopen(r, "r", 0)
111
self._waker_writer = os.fdopen(w, "w", 0)
113
self._waker_reader = self._waker_writer = win32_support.Pipe()
114
r = self._waker_writer.reader_fd
115
self.add_handler(r, self._read_waker, self.READ)
119
"""Returns a global IOLoop instance.
121
Most single-threaded applications have a single, global IOLoop.
122
Use this method instead of passing around IOLoop instances
123
throughout your code.
125
A common pattern for classes that depend on IOLoops is to use
126
a default argument to enable programs with multiple IOLoops
127
but not require the argument for simpler applications:
129
class MyClass(object):
130
def __init__(self, io_loop=None):
131
self.io_loop = io_loop or IOLoop.instance()
133
if not hasattr(cls, "_instance"):
134
cls._instance = cls()
138
def initialized(cls):
139
return hasattr(cls, "_instance")
141
def add_handler(self, fd, handler, events):
142
"""Registers the given handler to receive the given events for fd."""
143
self._handlers[fd] = handler
144
self._impl.register(fd, events | self.ERROR)
146
def update_handler(self, fd, events):
147
"""Changes the events we listen for fd."""
148
self._impl.modify(fd, events | self.ERROR)
150
def remove_handler(self, fd):
151
"""Stop listening for events on fd."""
152
self._handlers.pop(fd, None)
153
self._events.pop(fd, None)
155
self._impl.unregister(fd)
156
except (OSError, IOError):
157
_log.debug("Error deleting fd from IOLoop", exc_info=True)
160
"""Starts the I/O loop.
162
The loop will run until one of the I/O handlers calls stop(), which
163
will make the loop stop after the current event iteration completes.
166
self._stopped = False
170
# Never use an infinite timeout here - it can stall epoll
173
# Prevent IO event starvation by delaying new callbacks
174
# to the next iteration of the event loop.
175
callbacks = list(self._callbacks)
176
for callback in callbacks:
177
# A callback can add or remove other callbacks
178
if callback in self._callbacks:
179
self._callbacks.remove(callback)
180
self._run_callback(callback)
187
while self._timeouts and self._timeouts[0].deadline <= now:
188
timeout = self._timeouts.pop(0)
189
self._run_callback(timeout.callback)
191
milliseconds = self._timeouts[0].deadline - now
192
poll_timeout = min(milliseconds, poll_timeout)
194
if not self._running:
198
event_pairs = self._impl.poll(poll_timeout)
200
if hasattr(e, 'errno') and e.errno == errno.EINTR:
201
_log.warning("Interrupted system call", exc_info=1)
206
# Pop one fd at a time from the set of pending fds and run
207
# its handler. Since that handler may perform actions on
208
# other file descriptors, there may be reentrant calls to
209
# this IOLoop that update self._events
210
self._events.update(event_pairs)
212
fd, events = self._events.popitem()
214
self._handlers[fd](fd, events)
215
except (KeyboardInterrupt, SystemExit):
217
except (OSError, IOError), e:
218
if e[0] == errno.EPIPE:
219
# Happens when the client closes the connection
222
_log.error("Exception in I/O handler for fd %d",
225
_log.error("Exception in I/O handler for fd %d",
227
# reset the stopped flag so another start/stop pair can be issued
228
self._stopped = False
231
"""Stop the loop after the current event loop iteration is complete.
232
If the event loop is not currently running, the next call to start()
233
will return immediately.
235
To use asynchronous methods from otherwise-synchronous code (such as
236
unit tests), you can start and stop the event loop like this:
238
async_method(ioloop=ioloop, callback=ioloop.stop)
240
ioloop.start() will return after async_method has run its callback,
241
whether that callback was invoked before or after ioloop.start.
243
self._running = False
248
"""Returns true if this IOLoop is currently running."""
251
def add_timeout(self, deadline, callback):
252
"""Calls the given callback at the time deadline from the I/O loop."""
253
timeout = _Timeout(deadline, callback)
254
bisect.insort(self._timeouts, timeout)
257
def remove_timeout(self, timeout):
258
self._timeouts.remove(timeout)
260
def add_callback(self, callback):
261
"""Calls the given callback on the next I/O loop iteration."""
262
self._callbacks.add(callback)
265
def remove_callback(self, callback):
266
"""Removes the given callback from the next I/O loop iteration."""
267
self._callbacks.remove(callback)
271
self._waker_writer.write("x")
275
def _run_callback(self, callback):
278
except (KeyboardInterrupt, SystemExit):
281
self.handle_callback_exception(callback)
283
def handle_callback_exception(self, callback):
284
"""This method is called whenever a callback run by the IOLoop
287
By default simply logs the exception as an error. Subclasses
288
may override this method to customize reporting of exceptions.
290
The exception itself is not passed explicitly, but is available
293
_log.error("Exception in callback %r", callback, exc_info=True)
295
def _read_waker(self, fd, events):
298
self._waker_reader.read()
302
def _set_nonblocking(self, fd):
303
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
304
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
306
def _set_close_exec(self, fd):
307
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
308
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
311
class _Timeout(object):
312
"""An IOLoop timeout, a UNIX timestamp and a callback"""
314
# Reduce memory overhead when there are lots of pending callbacks
315
__slots__ = ['deadline', 'callback']
317
def __init__(self, deadline, callback):
318
self.deadline = deadline
319
self.callback = callback
321
def __cmp__(self, other):
322
return cmp((self.deadline, id(self.callback)),
323
(other.deadline, id(other.callback)))
326
class PeriodicCallback(object):
327
"""Schedules the given callback to be called periodically.
329
The callback is called every callback_time milliseconds.
331
def __init__(self, callback, callback_time, io_loop=None):
332
self.callback = callback
333
self.callback_time = callback_time
334
self.io_loop = io_loop or IOLoop.instance()
338
timeout = time.time() + self.callback_time / 1000.0
339
self.io_loop.add_timeout(timeout, self._run)
342
self._running = False
345
if not self._running: return
348
except (KeyboardInterrupt, SystemExit):
351
_log.error("Error in periodic callback", exc_info=True)
355
class _EPoll(object):
356
"""An epoll-based event loop using our C module for Python 2.5 systems"""
362
self._epoll_fd = epoll.epoll_create()
365
return self._epoll_fd
367
def register(self, fd, events):
368
epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_ADD, fd, events)
370
def modify(self, fd, events):
371
epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_MOD, fd, events)
373
def unregister(self, fd):
374
epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_DEL, fd, 0)
376
def poll(self, timeout):
377
return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))
380
class _KQueue(object):
381
"""A kqueue-based event loop for BSD/Mac systems."""
383
self._kqueue = select.kqueue()
387
return self._kqueue.fileno()
389
def register(self, fd, events):
390
self._control(fd, events, select.KQ_EV_ADD)
391
self._active[fd] = events
393
def modify(self, fd, events):
395
self.register(fd, events)
397
def unregister(self, fd):
398
events = self._active.pop(fd)
399
self._control(fd, events, select.KQ_EV_DELETE)
401
def _control(self, fd, events, flags):
403
if events & IOLoop.WRITE:
404
kevents.append(select.kevent(
405
fd, filter=select.KQ_FILTER_WRITE, flags=flags))
406
if events & IOLoop.READ or not kevents:
407
# Always read when there is not a write
408
kevents.append(select.kevent(
409
fd, filter=select.KQ_FILTER_READ, flags=flags))
410
# Even though control() takes a list, it seems to return EINVAL
411
# on Mac OS X (10.6) when there is more than one event in the list.
412
for kevent in kevents:
413
self._kqueue.control([kevent], 0)
415
def poll(self, timeout):
416
kevents = self._kqueue.control(None, 1000, timeout)
418
for kevent in kevents:
421
if kevent.filter == select.KQ_FILTER_READ:
422
events[fd] = events.get(fd, 0) | IOLoop.READ
423
if kevent.filter == select.KQ_FILTER_WRITE:
424
events[fd] = events.get(fd, 0) | IOLoop.WRITE
425
if kevent.flags & select.KQ_EV_ERROR:
426
events[fd] = events.get(fd, 0) | IOLoop.ERROR
427
return events.items()
430
class _Select(object):
431
"""A simple, select()-based IOLoop implementation for non-Linux systems"""
433
self.read_fds = set()
434
self.write_fds = set()
435
self.error_fds = set()
436
self.fd_sets = (self.read_fds, self.write_fds, self.error_fds)
438
def register(self, fd, events):
439
if events & IOLoop.READ: self.read_fds.add(fd)
440
if events & IOLoop.WRITE: self.write_fds.add(fd)
441
if events & IOLoop.ERROR: self.error_fds.add(fd)
443
def modify(self, fd, events):
445
self.register(fd, events)
447
def unregister(self, fd):
448
self.read_fds.discard(fd)
449
self.write_fds.discard(fd)
450
self.error_fds.discard(fd)
452
def poll(self, timeout):
453
readable, writeable, errors = select.select(
454
self.read_fds, self.write_fds, self.error_fds, timeout)
457
events[fd] = events.get(fd, 0) | IOLoop.READ
459
events[fd] = events.get(fd, 0) | IOLoop.WRITE
461
events[fd] = events.get(fd, 0) | IOLoop.ERROR
462
return events.items()
465
# Choose a poll implementation. Use epoll if it is available, fall back to
466
# select() for non-Linux platforms
467
if hasattr(select, "epoll"):
468
# Python 2.6+ on Linux
470
elif hasattr(select, "kqueue"):
471
# Python 2.6+ on BSD or Mac
475
# Linux systems with our C module installed
481
if "linux" in sys.platform:
482
_log.warning("epoll module not found; using select()")