1
.. PyZMQ eventloop doc, by Min Ragan-Kelley, 2011
12
Facebook's `Tornado`_ includes an eventloop for handing poll events on filedescriptors and
13
native sockets. We have included a small part of Tornado (specifically its
14
:mod:`.ioloop`), and adapted its :class:`IOStream` class into :class:`.ZMQStream` for
15
handling poll events on ØMQ sockets. A ZMQStream object works much like a Socket object,
16
but instead of calling :meth:`~.Socket.recv` directly, you register a callback with
17
:meth:`~.ZMQStream.on_recv`. callbacks can also be registered for send events
18
with :meth:`~.ZMQStream.on_send`.
24
With PyZMQ's ioloop, you can use zmq sockets in any tornado application. You must first
25
install PyZMQ's :class:`.IOLoop`, with the :func:`.ioloop.install` function:
27
.. sourcecode:: python
29
from zmq.eventloop import ioloop
32
This sets the global instance of :class:`tornado.ioloop.IOLoop` with the global instance of
33
our IOLoop class. The reason this must happen is that tornado objects avoid having to pass
34
the active IOLoop instance around by having a staticmethod :meth:`.IOLoop.instance`, which
35
always returns the active instance. If PyZMQ's IOLoop is installed after the first call to
36
:meth:`.IOLoop.instance()` (called in almost every tornado object constructor), then it will
37
raise an :exc:`AssertionError`, because the global IOLoop instance has already been
38
created, and proceeding would result in not all objects being associated with the right
41
It is possible to use PyZMQ sockets with tornado *without* calling :func:`.ioloop.install`,
42
but it is less convenient. First, you must instruct the tornado IOLoop to use the zmq poller:
44
.. sourcecode:: python
46
from tornado.ioloop import IOLoop
47
from zmq.eventloop.ioloop import ZMQPoller
49
loop = IOLoop(ZMQPoller())
51
Then, when you instantiate tornado and ZMQStream objects, you must pass the `io_loop`
52
argument to ensure that they use this loop, instead of the global instance. You can
53
install this IOLoop as the global tornado instance, with:
55
.. sourcecode:: python
59
but it will **NOT** be the global *pyzmq* IOLoop instance, so it must still be passed to
60
your ZMQStream constructors.
66
ZMQStream objects do have :meth:`~.ZMQStream.send` and :meth:`~.ZMQStream.send_multipart`
67
methods, which behaves the same way as :meth:`.Socket.send`, but instead of sending right
68
away, the :class:`.IOLoop` will wait until socket is able to send (for instance if ``HWM``
69
is met, or a ``REQ/REP`` pattern prohibits sending at a certain point). Messages sent via
70
send will also be passed to the callback registered with :meth:`~.ZMQStream.on_send` after
76
:meth:`.ZMQStream.on_recv` is the primary method for using a ZMQStream. It registers a
77
callback to fire with messages as they are received, which will *always* be multipart,
78
even if its length is 1. You can easily use this to build things like an echo socket:
80
.. sourcecode:: python
82
s = ctx.socket(zmq.REP)
83
s.bind('tcp://localhost:12345')
86
stream.send_multipart(msg)
88
ioloop.IOLoop.instance().start()
90
on_recv can also take a `copy` flag, just like :meth:`.Socket.recv`. If `copy=False`, then
91
callbacks registered with on_recv will receive tracked :class:`.Frame` objects instead of
94
:meth:`on_recv_stream`
95
----------------------
97
:meth:`.ZMQStream.on_recv_stream` is just like on_recv above, but the callback will be
98
passed both the message and the stream, rather than just the message. This is meant to make
99
it easier to use a single callback with multiple streams.
101
.. sourcecode:: python
103
s1 = ctx.socket(zmq.REP)
104
s1.bind('tcp://localhost:12345')
105
stream1 = ZMQStream(s1)
107
s2 = ctx.socket(zmq.REP)
108
s2.bind('tcp://localhost:54321')
109
stream2 = ZMQStream(s2)
111
def echo(msg, stream):
112
stream.send_multipart(msg)
114
stream1.on_recv_stream(echo)
115
stream2.on_recv_stream(echo)
117
ioloop.IOLoop.instance().start()
123
Sometimes with an eventloop, there can be multiple events ready on a single iteration of
124
the loop. The :meth:`~.ZMQStream.flush` method allows developers to pull messages off of
125
the queue to enforce some priority over the event loop ordering. flush pulls any pending
126
events off of the queue. You can specify to flush only recv events, only send events, or
127
any events, and you can specify a limit for how many events to flush in order to prevent
130
.. _Tornado: https://github.com/facebook/tornado
137
PyZMQ ≥ 2.2.0.1 ships with a `gevent <http://www.gevent.org/>`_ compatible API as :mod:`zmq.green`.
140
.. sourcecode:: python
142
import zmq.green as zmq
144
Then write your code as normal.
146
Currently, Socket.send/recv methods and zmq.Poller are gevent-aware.
147
The tornado-based ZMQStream/IOLoop *are not* compatible with gevent.
151
There is a `known issue <https://github.com/zeromq/pyzmq/issues/229>`_ in gevent ≤ 1.0 or libevent,
152
which can cause zeromq socket events to be missed.
153
PyZMQ works around this by adding a timeout so it will not wait forever for gevent to notice events.
154
The only known solution for this is to use gevent ≥ 1.0, which is currently at 1.0b3,
155
and does not exhibit this behavior.
159
zmq.green examples `on GitHub <https://github.com/zeromq/pyzmq/tree/master/examples/gevent>`_.
161
:mod:`zmq.green` is simply `gevent_zeromq <https://github.com/traviscline/gevent_zeromq>`_,
162
merged into the pyzmq project.