3
3
__zmq__ = __import__('zmq')
4
4
from eventlet import sleep
5
from eventlet.hubs import trampoline, get_hub
5
from eventlet.hubs import trampoline, _threadlocal
6
from eventlet.patcher import slurp_properties
7
8
__patched__ = ['Context', 'Socket']
8
globals().update(dict([(var, getattr(__zmq__, var))
9
for var in __zmq__.__all__
10
if not (var.startswith('__')
16
def get_hub_name_from_instance(hub):
17
"""Get the string name the eventlet uses to refer to hub
19
:param hub: An eventlet hub
21
return hub.__class__.__module__.rsplit('.',1)[-1]
9
slurp_properties(__zmq__, globals(), ignore=__patched__)
23
12
def Context(io_threads=1):
24
13
"""Factory function replacement for :class:`zmq.core.context.Context`
31
20
instance per thread. This is due to the way :class:`zmq.core.poll.Poller`
35
hub_name = get_hub_name_from_instance(hub)
36
if hub_name != 'zeromq':
37
raise RuntimeError("Hub must be 'zeromq', got '%s'" % hub_name)
38
return hub.get_context(io_threads)
24
return _threadlocal.context
25
except AttributeError:
26
_threadlocal.context = _Context(io_threads)
27
return _threadlocal.context
40
29
class _Context(__zmq__.Context):
41
30
"""Internal subclass of :class:`zmq.core.context.Context`
68
57
``zmq.EAGAIN`` (retry) error is raised
72
def _send_message(self, msg, flags=0):
73
flags |= __zmq__.NOBLOCK
76
super(Socket, self)._send_message(msg, flags)
78
except __zmq__.ZMQError, e:
81
trampoline(self, write=True)
83
def _send_copy(self, msg, flags=0):
84
flags |= __zmq__.NOBLOCK
87
super(Socket, self)._send_copy(msg, flags)
89
except __zmq__.ZMQError, e:
92
trampoline(self, write=True)
94
def _recv_message(self, flags=0, track=False):
96
flags |= __zmq__.NOBLOCK
99
m = super(Socket, self)._recv_message(flags, track)
102
except __zmq__.ZMQError, e:
103
if e.errno != EAGAIN:
105
trampoline(self, read=True)
107
def _recv_copy(self, flags=0):
108
flags |= __zmq__.NOBLOCK
111
m = super(Socket, self)._recv_copy(flags)
114
except __zmq__.ZMQError, e:
115
if e.errno != EAGAIN:
117
trampoline(self, read=True)
60
def _sock_wait(self, read=False, write=False):
62
First checks if there are events in the socket, to avoid
63
edge trigger problems with race conditions. Then if there
64
are none it will trampoline and when coming back check
67
events = self.getsockopt(__zmq__.EVENTS)
69
if read and (events & __zmq__.POLLIN):
71
elif write and (events & __zmq__.POLLOUT):
74
# ONLY trampoline on read events for the zmq FD
75
trampoline(self.getsockopt(__zmq__.FD), read=True)
76
return self.getsockopt(__zmq__.EVENTS)
78
def send(self, msg, flags=0, copy=True, track=False):
80
Override this instead of the internal _send_* methods
81
since those change and it's not clear when/how they're
84
if flags & __zmq__.NOBLOCK:
85
super(Socket, self).send(msg, flags=flags, track=track, copy=copy)
88
flags |= __zmq__.NOBLOCK
92
self._sock_wait(write=True)
93
super(Socket, self).send(msg, flags=flags, track=track,
96
except __zmq__.ZMQError, e:
100
def recv(self, flags=0, copy=True, track=False):
102
Override this instead of the internal _recv_* methods
103
since those change and it's not clear when/how they're
106
if flags & __zmq__.NOBLOCK:
107
return super(Socket, self).recv(flags=flags, track=track, copy=copy)
109
flags |= __zmq__.NOBLOCK
113
self._sock_wait(read=True)
114
m = super(Socket, self).recv(flags=flags, track=track, copy=copy)
117
except __zmq__.ZMQError, e:
118
if e.errno != EAGAIN: