1
"""Python bindings for 0MQ."""
4
# Copyright (c) 2010 Brian E. Granger
6
# This file is part of pyzmq.
8
# pyzmq is free software; you can redistribute it and/or modify it under
9
# the terms of the Lesser GNU General Public License as published by
10
# the Free Software Foundation; either version 3 of the License, or
11
# (at your option) any later version.
13
# pyzmq is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
# Lesser GNU General Public License for more details.
18
# You should have received a copy of the Lesser GNU General Public License
19
# along with this program. If not, see <http://www.gnu.org/licenses/>.
22
#-----------------------------------------------------------------------------
24
#-----------------------------------------------------------------------------
28
from python_string cimport PyString_FromStringAndSize
29
from python_string cimport PyString_AsStringAndSize
30
from python_string cimport PyString_AsString, PyString_Size
31
from python_ref cimport Py_DECREF, Py_INCREF
33
cdef extern from "Python.h":
34
ctypedef int Py_ssize_t
35
cdef void PyEval_InitThreads()
37
# For some reason we need to call this. My guess is that we are not doing
38
# any Python treading.
41
import copy as copy_mod
42
import cPickle as pickle
50
import simplejson as json
54
include "allocate.pxi"
56
#-----------------------------------------------------------------------------
57
# Import the C header files
58
#-----------------------------------------------------------------------------
60
cdef extern from "errno.h" nogil:
61
enum: ZMQ_EINVAL "EINVAL"
62
enum: ZMQ_EAGAIN "EAGAIN"
64
cdef extern from "string.h" nogil:
65
void *memcpy(void *dest, void *src, size_t n)
66
size_t strlen(char *s)
68
cdef extern from "zmq_compat.h":
69
ctypedef signed long long int64_t "pyzmq_int64_t"
71
cdef extern from "zmq.h" nogil:
73
enum: ZMQ_ENOTSUP "ENOTSUP"
74
enum: ZMQ_EPROTONOSUPPORT "EPROTONOSUPPORT"
75
enum: ZMQ_ENOBUFS "ENOBUFS"
76
enum: ZMQ_ENETDOWN "ENETDOWN"
77
enum: ZMQ_EADDRINUSE "EADDRINUSE"
78
enum: ZMQ_EADDRNOTAVAIL "EADDRNOTAVAIL"
79
enum: ZMQ_ECONNREFUSED "ECONNREFUSED"
80
enum: ZMQ_EINPROGRESS "EINPROGRESS"
81
enum: ZMQ_EMTHREAD "EMTHREAD"
83
enum: ZMQ_ENOCOMPATPROTO "ENOCOMPATPROTO"
84
enum: ZMQ_ETERM "ETERM"
87
char *zmq_strerror (int errnum)
90
enum: ZMQ_MAX_VSM_SIZE # 30
91
enum: ZMQ_DELIMITER # 31
94
ctypedef struct zmq_msg_t:
97
unsigned char vsm_size
98
unsigned char vsm_data [ZMQ_MAX_VSM_SIZE]
100
ctypedef void zmq_free_fn(void *data, void *hint)
102
int zmq_msg_init (zmq_msg_t *msg)
103
int zmq_msg_init_size (zmq_msg_t *msg, size_t size)
104
int zmq_msg_init_data (zmq_msg_t *msg, void *data,
105
size_t size, zmq_free_fn *ffn, void *hint)
106
int zmq_msg_close (zmq_msg_t *msg)
107
int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src)
108
int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src)
109
void *zmq_msg_data (zmq_msg_t *msg)
110
size_t zmq_msg_size (zmq_msg_t *msg)
112
void *zmq_init (int io_threads)
113
int zmq_term (void *context)
122
enum: ZMQ_UPSTREAM # 7
123
enum: ZMQ_DOWNSTREAM # 8
127
enum: ZMQ_AFFINITY # 4
128
enum: ZMQ_IDENTITY # 5
129
enum: ZMQ_SUBSCRIBE # 6
130
enum: ZMQ_UNSUBSCRIBE # 7
132
enum: ZMQ_RECOVERY_IVL # 9
133
enum: ZMQ_MCAST_LOOP # 10
134
enum: ZMQ_SNDBUF # 11
135
enum: ZMQ_RCVBUF # 12
136
enum: ZMQ_RCVMORE # 13
138
enum: ZMQ_NOBLOCK # 1
139
enum: ZMQ_SNDMORE # 2
141
void *zmq_socket (void *context, int type)
142
int zmq_close (void *s)
143
int zmq_setsockopt (void *s, int option, void *optval, size_t optvallen)
144
int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen)
145
int zmq_bind (void *s, char *addr)
146
int zmq_connect (void *s, char *addr)
147
int zmq_send (void *s, zmq_msg_t *msg, int flags)
148
int zmq_recv (void *s, zmq_msg_t *msg, int flags)
151
enum: ZMQ_POLLOUT # 2
152
enum: ZMQ_POLLERR # 4
154
ctypedef struct zmq_pollitem_t:
162
int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout)
164
# void *zmq_stopwatch_start ()
165
# unsigned long zmq_stopwatch_stop (void *watch_)
166
# void zmq_sleep (int seconds_)
168
#-----------------------------------------------------------------------------
169
# Python module level constants
170
#-----------------------------------------------------------------------------
172
NOBLOCK = ZMQ_NOBLOCK
180
UPSTREAM = ZMQ_UPSTREAM
181
DOWNSTREAM = ZMQ_DOWNSTREAM
184
AFFINITY = ZMQ_AFFINITY
185
IDENTITY = ZMQ_IDENTITY
186
SUBSCRIBE = ZMQ_SUBSCRIBE
187
UNSUBSCRIBE = ZMQ_UNSUBSCRIBE
189
RECOVERY_IVL = ZMQ_RECOVERY_IVL
190
MCAST_LOOP = ZMQ_MCAST_LOOP
193
RCVMORE = ZMQ_RCVMORE
194
SNDMORE = ZMQ_SNDMORE
196
POLLOUT = ZMQ_POLLOUT
197
POLLERR = ZMQ_POLLERR
199
#-----------------------------------------------------------------------------
201
#-----------------------------------------------------------------------------
203
# Often used (these are alse in errno.)
207
# For Windows compatability
208
ENOTSUP = ZMQ_ENOTSUP
209
EPROTONOSUPPORT = ZMQ_EPROTONOSUPPORT
210
ENOBUFS = ZMQ_ENOBUFS
211
ENETDOWN = ZMQ_ENETDOWN
212
EADDRINUSE = ZMQ_EADDRINUSE
213
EADDRNOTAVAIL = ZMQ_EADDRNOTAVAIL
214
ECONNREFUSED = ZMQ_ECONNREFUSED
215
EINPROGRESS = ZMQ_EINPROGRESS
218
EMTHREAD = ZMQ_EMTHREAD
220
ENOCOMPATPROTO = ZMQ_ENOCOMPATPROTO
224
def strerror(errnum):
225
"""Return the error string given the error number."""
226
return zmq_strerror(errnum)
228
class ZMQBaseError(Exception):
231
class ZMQError(ZMQBaseError):
232
"""Base exception class for 0MQ errors in Python."""
234
def __init__(self, error=None):
235
"""Wrap an errno style error.
240
The ZMQ errno or None. If None, then zmq_errno() is called and
245
if type(error) == int:
246
self.errstr = strerror(error)
249
self.errstr = str(error)
255
class ZMQBindError(ZMQBaseError):
256
"""An error for bind_to_random_port."""
259
#-----------------------------------------------------------------------------
261
#-----------------------------------------------------------------------------
264
cdef void free_python_msg(void *data, void *hint) with gil:
265
"""A function for DECREF'ing Python based messages."""
267
Py_DECREF(<object>hint)
271
"""A Message class for non-copy send/recvs.
273
This class is only needed if you want to do non-copying send and recvs.
274
When you pass a string to this class, like ``Message(s)``, the
275
ref-count of s is increased by two: once because Message saves s as
276
an instance attribute and another because a ZMQ message is created that
277
points to the buffer of s. This second ref-count increase makes sure
278
that s lives until all messages that use it have been sent. Once 0MQ
279
sends all the messages and it doesn't need the buffer of s, 0MQ will call
283
cdef zmq_msg_t zmq_msg
286
def __cinit__(self, object data=None):
288
# Save the data object in case the user wants the the data as a str.
290
cdef char *data_c = NULL
291
cdef Py_ssize_t data_len_c
295
rc = zmq_msg_init(&self.zmq_msg)
299
PyString_AsStringAndSize(data, &data_c, &data_len_c)
300
# We INCREF the *original* Python object (not self) and pass it
301
# as the hint below. This allows other copies of this Message
302
# object to take over the ref counting of data properly.
305
rc = zmq_msg_init_data(
306
&self.zmq_msg, <void *>data_c, data_len_c,
307
<zmq_free_fn *>free_python_msg, <void *>data
313
def __dealloc__(self):
315
# This simply decreases the 0MQ ref-count of zmq_msg.
316
rc = zmq_msg_close(&self.zmq_msg)
321
"""Create a shallow copy of the message.
323
This does not copy the contents of the Message, just the pointer.
324
This will increment the 0MQ ref count of the message, but not
325
the ref count of the Python object. That is only done once when
326
the Python is first turned into a 0MQ message.
328
return self.fast_copy()
330
cdef Message fast_copy(self):
331
"""Fast, cdef'd version of shallow copy of the message."""
334
# This does not copy the contents, but just increases the ref-count
335
# of the zmq_msg by one.
336
zmq_msg_copy(&new_msg.zmq_msg, &self.zmq_msg)
337
# Copy the ref to data so the copy won't create a copy when str is
339
if self.data is not None:
340
new_msg.data = self.data
344
"""Return the length of the message in bytes."""
345
return <int>zmq_msg_size(&self.zmq_msg)
348
"""Return the str form of the message."""
349
cdef char *data_c = NULL
350
cdef Py_ssize_t data_len_c
351
if self.data is None:
352
data_c = <char *>zmq_msg_data(&self.zmq_msg)
353
data_len_c = zmq_msg_size(&self.zmq_msg)
354
return PyString_FromStringAndSize(data_c, data_len_c)
360
"""Manage the lifecycle of a 0MQ context.
362
This class no longer takes any flags or the number of application
368
The number of IO threads.
373
def __cinit__(self, int io_threads=1):
375
if not io_threads > 0:
376
raise ZMQError(EINVAL)
377
self.handle = zmq_init(io_threads)
378
if self.handle == NULL:
381
def __dealloc__(self):
383
if self.handle != NULL:
384
rc = zmq_term(self.handle)
388
def socket(self, int socket_type):
389
"""Create a Socket associated with this Context.
394
The socket type, which can be any of the 0MQ socket types:
395
REQ, REP, PUB, SUB, PAIR, XREQ, XREP, UPSTREAM, DOWNSTREAM.
397
return Socket(self, socket_type)
403
Socket(context, socket_type)
408
The 0MQ Context this Socket belongs to.
410
The socket type, which can be any of the 0MQ socket types:
411
REQ, REP, PUB, SUB, PAIR, XREQ, XREP, UPSTREAM, DOWNSTREAM.
415
cdef public int socket_type
416
# Hold on to a reference to the context to make sure it is not garbage
417
# collected until the socket it done with it.
418
cdef public Context context
419
cdef public object closed
421
def __cinit__(self, Context context, int socket_type):
423
self.context = context
424
self.socket_type = socket_type
425
self.handle = zmq_socket(context.handle, socket_type)
426
if self.handle == NULL:
430
def __dealloc__(self):
436
This can be called to close the socket by hand. If this is not
437
called, the socket will automatically be closed when it is
441
if self.handle != NULL and not self.closed:
442
rc = zmq_close(self.handle)
448
def _check_closed(self):
450
raise ZMQError(ENOTSUP)
452
def setsockopt(self, int option, optval):
453
"""Set socket options.
455
See the 0MQ documentation for details on specific options.
460
The name of the option to set. Can be any of: SUBSCRIBE,
461
UNSUBSCRIBE, IDENTITY, HWM, SWAP, AFFINITY, RATE,
462
RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF.
464
The value of the option to set.
466
cdef int64_t optval_int_c
471
if option in [SUBSCRIBE, UNSUBSCRIBE, IDENTITY]:
472
if not isinstance(optval, str):
473
raise TypeError('expected str, got: %r' % optval)
476
PyString_AsString(optval), PyString_Size(optval)
478
elif option in [HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL,
479
MCAST_LOOP, SNDBUF, RCVBUF]:
480
if not isinstance(optval, int):
481
raise TypeError('expected int, got: %r' % optval)
482
optval_int_c = optval
485
&optval_int_c, sizeof(int64_t)
488
raise ZMQError(EINVAL)
493
def getsockopt(self, int option):
494
"""Get the value of a socket option.
496
See the 0MQ documentation for details on specific options.
501
The name of the option to set. Can be any of:
502
IDENTITY, HWM, SWAP, AFFINITY, RATE,
503
RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF, RCVMORE.
507
The value of the option as a string or int.
509
cdef int64_t optval_int_c
510
cdef char identity_str_c [255]
516
if option in [IDENTITY]:
518
rc = zmq_getsockopt(self.handle, option, <void *>identity_str_c, &sz)
521
result = PyString_FromStringAndSize(<char *>identity_str_c, sz)
522
elif option in [HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL,
523
MCAST_LOOP, SNDBUF, RCVBUF, RCVMORE]:
525
rc = zmq_getsockopt(self.handle, option, <void *>&optval_int_c, &sz)
528
result = optval_int_c
534
def bind(self, addr):
535
"""Bind the socket to an address.
537
This causes the socket to listen on a network port. Sockets on the
538
other side of this connection will use :meth:`Sockiet.connect` to
539
connect to this socket.
544
The address string. This has the form 'protocol://interface:port',
545
for example 'tcp://127.0.0.1:5555'. Protocols supported are
546
tcp, upd, pgm, inproc and ipc.
552
if not isinstance(addr, str):
553
raise TypeError('expected str, got: %r' % addr)
554
rc = zmq_bind(self.handle, addr)
558
def bind_to_random_port(self, addr, min_port=2000, max_port=20000, max_tries=100):
559
"""Bind this socket to a random port in a range.
564
The address string without the port to pass to :meth:`Socket.bind`.
566
The minimum port in the range of ports to try.
568
The maximum port in the range of ports to try.
570
The number of attempt to bind.
575
The port the socket was bound to.
577
for i in range(max_tries):
579
port = random.randrange(min_port, max_port)
580
self.bind('%s:%s' % (addr, port))
585
raise ZMQBindError("Could not bind socket to random port.")
587
def connect(self, addr):
588
"""Connect to a remote 0MQ socket.
593
The address string. This has the form 'protocol://interface:port',
594
for example 'tcp://127.0.0.1:5555'. Protocols supported are
595
tcp, upd, pgm, inproc and ipc.
601
if not isinstance(addr, str):
602
raise TypeError('expected str, got: %r' % addr)
603
rc = zmq_connect(self.handle, addr)
607
#-------------------------------------------------------------------------
608
# Sending and receiving messages
609
#-------------------------------------------------------------------------
611
def send(self, object data, int flags=0, bool copy=True):
612
"""Send a message on this socket.
614
This queues the message to be sent by the IO thread at a later time.
618
data : object, str, Message
619
The content of the message.
621
Any supported flag: NOBLOCK, SNDMORE.
623
Should the message be sent in a copying or non-copying manner.
627
None if message was sent, raises an exception otherwise.
630
if isinstance(data, Message):
631
return self._send_message(data, flags)
633
return self._send_copy(data, flags)
635
# I am not sure which non-copy implemntation to use here.
636
# It probably doesn't matter though.
638
return self._send_message(msg, flags)
639
# return self._send_nocopy(msg, flags)
641
def _send_message(self, Message msg, int flags=0):
642
"""Send a Message on this socket in a non-copy manner."""
644
cdef Message msg_copy
646
# Always copy so the original message isn't garbage collected.
647
# This doesn't do a real copy, just a reference.
648
msg_copy = msg.fast_copy()
649
# msg_copy = copy_mod.copy(msg)
651
rc = zmq_send(self.handle, &msg.zmq_msg, flags)
656
def _send_copy(self, object msg, int flags=0):
657
"""Send a message on this socket by copying its content."""
661
cdef Py_ssize_t msg_c_len
663
if not isinstance(msg, str):
664
raise TypeError('expected str, got: %r' % msg)
666
PyString_AsStringAndSize(msg, &msg_c, &msg_c_len)
667
# Copy the msg before sending. This avoids any complications with
669
# If zmq_msg_init_* fails do we need to call zmq_msg_close?
670
rc = zmq_msg_init_size(&data, msg_c_len)
671
memcpy(zmq_msg_data(&data), msg_c, zmq_msg_size(&data))
677
rc = zmq_send(self.handle, &data, flags)
678
rc2 = zmq_msg_close(&data)
680
# Shouldn't the error handling for zmq_msg_close come after that
688
def _send_nocopy(self, object msg, int flags=0):
689
"""Send a Python string on this socket in a non-copy manner.
691
This method is not being used currently, as the same functionality
692
is provided by self._send_message(Message(data)). This may eventually
698
cdef Py_ssize_t msg_c_len
700
if not isinstance(msg, str):
701
raise TypeError('expected str, got: %r' % msg)
703
PyString_AsStringAndSize(msg, &msg_c, &msg_c_len)
704
Py_INCREF(msg) # We INCREF to prevent Python from gc'ing msg
705
rc = zmq_msg_init_data(
706
&data, <void *>msg_c, msg_c_len,
707
<zmq_free_fn *>free_python_msg, <void *>msg
711
# If zmq_msg_init_data fails it does not call zmq_free_fn,
717
rc = zmq_send(self.handle, &data, flags)
720
# If zmq_send fails it does not call zmq_free_fn, so we Py_DECREF.
725
rc = zmq_msg_close(&data)
729
def recv(self, int flags=0, copy=True):
730
"""Receive a message.
735
Any supported flag: NOBLOCK. If NOBLOCK is set, this method
736
will return None if a message is not ready. If NOBLOCK is not
737
set, then this method will block until a message arrives.
739
Should the message be received in a copying or non-copying manner.
740
If True a Message object is returned, if False a string copy of
745
The returned message, or raises ZMQError otherwise.
749
# This could be implemented by simple calling _recv_message and
750
# then casting to a str.
751
return self._recv_copy(flags)
753
return self._recv_message(flags)
755
def _recv_message(self, int flags=0):
756
"""Receive a message in a non-copying manner and return a Message."""
762
rc = zmq_recv(self.handle, &msg.zmq_msg, flags)
768
def _recv_copy(self, int flags=0):
769
"""Receive a message in a copying manner as a string."""
773
rc = zmq_msg_init(&data)
778
rc = zmq_recv(self.handle, &data, flags)
784
msg = PyString_FromStringAndSize(
785
<char *>zmq_msg_data(&data),
789
rc = zmq_msg_close(&data)
795
def send_multipart(self, msg_parts, int flags=0, copy=True):
796
"""Send a sequence of messages as a multipart message.
801
A sequence of messages to send as a multipart message.
803
Only the NOBLOCK flagis supported, SNDMORE is handled
806
for msg in msg_parts[:-1]:
807
self.send(msg, SNDMORE|flags, copy=copy)
808
# Send the last part without the SNDMORE flag.
809
self.send(msg_parts[-1], flags)
811
def recv_multipart(self, int flags=0, copy=True):
812
"""Receive a multipart message as a list of messages.
817
Any supported flag: NOBLOCK. If NOBLOCK is set, this method
818
will return None if a message is not ready. If NOBLOCK is not
819
set, then this method will block until a message arrives.
824
A list of messages in the multipart message.
828
part = self.recv(flags, copy=copy)
837
"""Are there more parts to a multipart message."""
838
more = self.getsockopt(RCVMORE)
841
def send_pyobj(self, obj, flags=0, protocol=-1):
842
"""Send a Python object as a message using pickle to serialize.
847
The Python object to send.
851
The pickle protocol number to use. Default of -1 will select
852
the highest supported number. Use 0 for multiple platform
855
msg = pickle.dumps(obj, protocol)
856
return self.send(msg, flags)
858
def recv_pyobj(self, flags=0):
859
"""Receive a Python object as a message using pickle to serialize.
869
The Python object that arrives as a message.
872
return pickle.loads(s)
874
def send_json(self, obj, flags=0):
875
"""Send a Python object as a message using json to serialize.
880
The Python object to send.
885
raise ImportError('json or simplejson library is required.')
887
msg = json.dumps(obj, separators=(',',':'))
888
return self.send(msg, flags)
890
def recv_json(self, flags=0):
891
"""Receive a Python object as a message using json to serialize.
901
The Python object that arrives as a message.
904
raise ImportError('json or simplejson library is required.')
906
msg = self.recv(flags)
907
return json.loads(msg)
910
# cdef class Stopwatch:
911
# """A simple stopwatch based on zmq_stopwatch_start/stop."""
915
# def __cinit__(self):
918
# def __dealloc__(self):
925
# if self.watch == NULL:
926
# self.watch = zmq_stopwatch_start()
928
# raise ZMQError('Stopwatch is already runing.')
931
# if self.watch == NULL:
932
# raise ZMQError('Must start the Stopwatch before calling stop.')
934
# time = zmq_stopwatch_stop(self.watch)
938
# def sleep(self, int seconds):
942
def _poll(sockets, long timeout=-1):
943
"""Poll a set of 0MQ sockets, native file descs. or sockets.
947
sockets : list of tuples of (socket, flags)
948
Each element of this list is a two-tuple containing a socket
949
and a flags. The socket may be a 0MQ socket or any object with
950
a :meth:`fileno` method. The flags can be zmq.POLLIN (for detecting
951
for incoming messages), zmq.POLLOUT (for detecting that send is OK)
952
or zmq.POLLIN|zmq.POLLOUT for detecting both.
954
The number of microseconds to poll for. Negative means no timeout.
957
cdef zmq_pollitem_t *pollitems = NULL
958
cdef int nsockets = len(sockets)
959
cdef Socket current_socket
960
pollitems_o = allocate(nsockets*sizeof(zmq_pollitem_t),<void**>&pollitems)
962
for i in range(nsockets):
964
events = sockets[i][1]
965
if isinstance(s, Socket):
967
pollitems[i].socket = current_socket.handle
968
pollitems[i].events = events
969
pollitems[i].revents = 0
970
elif isinstance(s, int):
971
pollitems[i].socket = NULL
973
pollitems[i].events = events
974
pollitems[i].revents = 0
975
elif hasattr(s, 'fileno'):
977
fileno = int(s.fileno())
979
raise ValueError('fileno() must return an valid integer fd')
981
pollitems[i].socket = NULL
982
pollitems[i].fd = fileno
983
pollitems[i].events = events
984
pollitems[i].revents = 0
987
"Socket must be a 0MQ socket, an integer fd or have "
988
"a fileno() method: %r" % s
991
# int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout)
993
rc = zmq_poll(pollitems, nsockets, timeout)
998
for i in range(nsockets):
1000
# Return the fd for sockets, for compat. with select.poll.
1001
if hasattr(s, 'fileno'):
1003
revents = pollitems[i].revents
1004
# Only return sockets with non-zero status for compat. with select.poll.
1006
results.append((s, revents))
1011
class Poller(object):
1012
"""An stateful poll interface that mirrors Python's built-in poll."""
1017
def register(self, socket, flags=POLLIN|POLLOUT):
1018
"""Register a 0MQ socket or native fd for I/O monitoring.
1022
socket : zmq.Socket or native socket
1023
A zmq.Socket or any Python object having a :meth:`fileno`
1024
method that returns a valid file descriptor.
1026
The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
1028
self.sockets[socket] = flags
1030
def modify(self, socket, flags=POLLIN|POLLOUT):
1031
"""Modify the flags for an already registered 0MQ socket or native fd."""
1032
self.register(socket, flags)
1034
def unregister(self, socket):
1035
"""Remove a 0MQ socket or native fd for I/O monitoring.
1040
The socket instance to stop polling.
1042
del self.sockets[socket]
1044
def poll(self, timeout=None):
1045
"""Poll the registered 0MQ or native fds for I/O.
1049
timeout : float, int
1050
The timeout in milliseconds. If None, no timeout (infinite). This
1051
is in milliseconds to be compatible with :func:`select.poll`. The
1052
underlying zmq_poll uses microseconds and we convert to that in
1057
# Convert from ms -> us for zmq_poll.
1058
timeout = int(timeout*1000.0)
1061
return _poll(self.sockets.items(), timeout=timeout)
1064
def select(rlist, wlist, xlist, timeout=None):
1065
"""Return the result of poll as a lists of sockets ready for r/w.
1067
This has the same interface as Python's built-in :func:`select` function.
1071
timeout : float, int
1072
The timeout in seconds. This is in seconds to be compatible with
1073
:func:`select.select`. The underlying zmq_poll uses microseconds and
1074
we convert to that in this function.
1078
# Convert from sec -> us for zmq_poll.
1079
timeout = int(timeout*1000000.0)
1083
for s in set(rlist + wlist + xlist):
1091
sockets.append((s, flags))
1092
return_sockets = _poll(sockets, timeout)
1093
rlist, wlist, xlist = [], [], []
1094
for s, flags in return_sockets:
1101
return rlist, wlist, xlist