1
#-----------------------------------------------------------------------------
2
# Copyright (c) 2011-2012 Travis Cline
4
# This file is part of pyzmq
5
# It is adapted from upstream project zeromq_gevent under the New BSD License
7
# Distributed under the terms of the New BSD License. The full license is in
8
# the file COPYING.BSD, distributed as part of this software.
9
#-----------------------------------------------------------------------------
11
"""This module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq <zmq>` to be non blocking
14
from __future__ import print_function
21
from zmq import Context as _original_Context
22
from zmq import Socket as _original_Socket
25
from gevent.event import AsyncResult
26
from gevent.hub import get_hub
30
"""simple wrapper for stopping an Event, allowing for method rename in gevent 1.0"""
33
except AttributeError as e:
37
class _Socket(_original_Socket):
38
"""Green version of :class:`zmq.core.socket.Socket`
40
The following methods are overridden:
45
To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or recieving
46
is deferred to the hub if a ``zmq.EAGAIN`` (retry) error is raised.
48
The `__state_changed` method is triggered when the zmq.FD for the socket is
49
marked as readable and triggers the necessary read and write events (which
50
are waited for in the recv and send methods).
52
Some double underscore prefixes are used to minimize pollution of
53
:class:`zmq.core.socket.Socket`'s namespace.
56
def __init__(self, context, socket_type):
57
self.__in_send_multipart = False
58
self.__in_recv_multipart = False
64
def close(self, linger=None):
65
super(_Socket, self).close(linger)
66
self.__cleanup_events()
68
def __cleanup_events(self):
69
# close the _state_event event, keeps the number of active file descriptors down
70
if getattr(self, '_state_event', None):
71
_stop(self._state_event)
72
self._state_event = None
73
# if the socket has entered a close state resume any waiting greenlets
77
def __setup_events(self):
78
self.__readable = AsyncResult()
79
self.__writable = AsyncResult()
84
self._state_event = get_hub().loop.io(self.getsockopt(zmq.FD), 1) # read state watcher
85
self._state_event.start(self.__state_changed)
86
except AttributeError:
87
# for gevent<1.0 compatibility
88
from gevent.core import read_event
89
self._state_event = read_event(self.getsockopt(zmq.FD), self.__state_changed, persist=True)
91
def __state_changed(self, event=None, _evtype=None):
93
self.__cleanup_events()
96
# avoid triggering __state_changed from inside __state_changed
97
events = super(_Socket, self).getsockopt(zmq.EVENTS)
98
except zmq.ZMQError as exc:
99
self.__writable.set_exception(exc)
100
self.__readable.set_exception(exc)
102
if events & zmq.POLLOUT:
103
self.__writable.set()
104
if events & zmq.POLLIN:
105
self.__readable.set()
107
def _wait_write(self):
108
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
109
self.__writable = AsyncResult()
110
# timeout is because libzmq cannot be trusted to properly signal a new send event:
111
# this is effectively a maximum poll interval of 1s
113
timeout = gevent.Timeout(seconds=1)
116
self.__writable.get(block=True)
117
except gevent.Timeout, t:
121
# gevent bug: get can raise timeout even on clean return
122
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
123
if toc-tic > 0.9 and self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
124
print("BUG: gevent missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
126
self.__writable.set()
129
def _wait_read(self):
130
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
131
self.__readable = AsyncResult()
132
# timeout is because libzmq cannot always be trusted to play nice with libevent.
133
# I can only confirm that this actually happens for send, but lets be symmetrical
134
# with our dirty hacks.
135
# this is effectively a maximum poll interval of 1s
137
timeout = gevent.Timeout(seconds=1)
140
self.__readable.get(block=True)
141
except gevent.Timeout, t:
145
# gevent bug: get can raise timeout even on clean return
146
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
147
if toc-tic > 0.9 and self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
148
print("BUG: gevent missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
150
self.__readable.set()
153
def send(self, data, flags=0, copy=True, track=False):
154
"""send, which will only block current greenlet
156
state_changed always fires exactly once (success or fail) at the
160
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
161
if flags & zmq.NOBLOCK:
163
msg = super(_Socket, self).send(data, flags, copy, track)
165
if not self.__in_send_multipart:
166
self.__state_changed()
168
# ensure the zmq.NOBLOCK flag is part of flags
170
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
172
# attempt the actual call
173
msg = super(_Socket, self).send(data, flags, copy, track)
174
except zmq.ZMQError as e:
175
# if the raised ZMQError is not EAGAIN, reraise
176
if e.errno != zmq.EAGAIN:
177
if not self.__in_send_multipart:
178
self.__state_changed()
181
if not self.__in_send_multipart:
182
self.__state_changed()
184
# defer to the event loop until we're notified the socket is writable
187
def recv(self, flags=0, copy=True, track=False):
188
"""recv, which will only block current greenlet
190
state_changed always fires exactly once (success or fail) at the
193
if flags & zmq.NOBLOCK:
195
msg = super(_Socket, self).recv(flags, copy, track)
197
if not self.__in_recv_multipart:
198
self.__state_changed()
204
msg = super(_Socket, self).recv(flags, copy, track)
205
except zmq.ZMQError as e:
206
if e.errno != zmq.EAGAIN:
207
if not self.__in_recv_multipart:
208
self.__state_changed()
211
if not self.__in_recv_multipart:
212
self.__state_changed()
216
def send_multipart(self, *args, **kwargs):
217
"""wrap send_multipart to prevent state_changed on each partial send"""
218
self.__in_send_multipart = True
220
msg = super(_Socket, self).send_multipart(*args, **kwargs)
222
self.__in_send_multipart = False
223
self.__state_changed()
226
def recv_multipart(self, *args, **kwargs):
227
"""wrap recv_multipart to prevent state_changed on each partial recv"""
228
self.__in_recv_multipart = True
230
msg = super(_Socket, self).recv_multipart(*args, **kwargs)
232
self.__in_recv_multipart = False
233
self.__state_changed()
236
def getsockopt(self, opt):
237
"""trigger state_changed on getsockopt(EVENTS)"""
238
optval = super(_Socket, self).getsockopt(opt)
239
if opt == zmq.EVENTS:
240
self.__state_changed()
244
class _Context(_original_Context):
245
"""Replacement for :class:`zmq.core.context.Context`
247
Ensures that the greened Socket above is used in calls to `socket`.
249
_socket_class = _Socket