1
from eventlet import patcher
2
from eventlet.green import zmq
3
from eventlet.hubs import _threadlocal
4
from eventlet.hubs.hub import BaseHub, READ, WRITE, noop
5
from eventlet.support import clear_sys_exc_info
8
time = patcher.original('time')
9
select = patcher.original('select')
12
EXC_MASK = zmq.POLLERR
13
READ_MASK = zmq.POLLIN
14
WRITE_MASK = zmq.POLLOUT
17
def __init__(self, clock=time.time):
18
BaseHub.__init__(self, clock)
19
self.poll = zmq.Poller()
21
def get_context(self, io_threads=1):
22
"""zmq's Context must be unique within a hub
24
The zeromq API documentation states:
25
All zmq sockets passed to the zmq_poll() function must share the same
26
zmq context and must belong to the thread calling zmq_poll()
28
As zmq_poll is what's eventually being called then we need to insure
29
that all sockets that are going to be passed to zmq_poll (via
30
hub.do_poll) are in the same context
33
return _threadlocal.context
34
except AttributeError:
35
_threadlocal.context = zmq._Context(io_threads)
36
return _threadlocal.context
38
def add(self, evtype, fileno, cb):
39
listener = super(Hub, self).add(evtype, fileno, cb)
40
self.register(fileno, new=True)
43
def remove(self, listener):
44
super(Hub, self).remove(listener)
45
self.register(listener.fileno)
47
def register(self, fileno, new=False):
49
if self.listeners[READ].get(fileno):
51
if self.listeners[WRITE].get(fileno):
54
self.poll.register(fileno, mask)
56
self.poll.unregister(fileno)
58
def remove_descriptor(self, fileno):
59
super(Hub, self).remove_descriptor(fileno)
61
self.poll.unregister(fileno)
62
except (KeyError, ValueError, IOError, OSError):
63
# raised if we try to remove a fileno that was
64
# already removed/invalid
67
def do_poll(self, seconds):
68
# zmq.Poller.poll expects milliseconds
69
return self.poll.poll(seconds * 1000.0)
71
def wait(self, seconds=None):
72
readers = self.listeners[READ]
73
writers = self.listeners[WRITE]
75
if not readers and not writers:
80
presult = self.do_poll(seconds)
81
except zmq.ZMQError, e:
82
# In the poll hub this part exists to special case some exceptions
83
# from socket. There may be some error numbers that wider use of
84
# this hub will throw up as needing special treatment so leaving
85
# this block and this comment as a remineder
87
SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
89
if self.debug_blocking:
90
self.block_detect_pre()
92
for fileno, event in presult:
95
readers.get(fileno, noop).cb(fileno)
96
if event & WRITE_MASK:
97
writers.get(fileno, noop).cb(fileno)
99
# zmq.POLLERR is returned for any error condition in the
100
# underlying fd (as passed through to poll/epoll)
101
readers.get(fileno, noop).cb(fileno)
102
writers.get(fileno, noop).cb(fileno)
103
except SYSTEM_EXCEPTIONS:
106
self.squelch_exception(fileno, sys.exc_info())
109
if self.debug_blocking:
110
self.block_detect_post()