~ubuntu-branches/ubuntu/precise/pyzmq/precise

« back to all changes in this revision

Viewing changes to zmq/_zmq.pyx

  • Committer: Bazaar Package Importer
  • Author(s): Piotr Ożarowski
  • Date: 2011-02-15 09:08:36 UTC
  • mfrom: (2.1.2 experimental)
  • Revision ID: james.westby@ubuntu.com-20110215090836-phh4slym1g6muucn
Tags: 2.0.10.1-2
* Team upload.
* Upload to unstable
* Add Breaks: ${python:Breaks}

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
"""Python bindings for 0MQ."""
2
 
 
3
 
#
4
 
#    Copyright (c) 2010 Brian E. Granger
5
 
#
6
 
#    This file is part of pyzmq.
7
 
#
8
 
#    pyzmq is free software; you can redistribute it and/or modify it under
9
 
#    the terms of the Lesser GNU General Public License as published by
10
 
#    the Free Software Foundation; either version 3 of the License, or
11
 
#    (at your option) any later version.
12
 
#
13
 
#    pyzmq is distributed in the hope that it will be useful,
14
 
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 
#    Lesser GNU General Public License for more details.
17
 
#
18
 
#    You should have received a copy of the Lesser GNU General Public License
19
 
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
 
#
21
 
 
22
 
#-----------------------------------------------------------------------------
23
 
# Imports
24
 
#-----------------------------------------------------------------------------
25
 
 
26
 
 
27
 
from stdlib cimport *
28
 
from python_string cimport PyString_FromStringAndSize
29
 
from python_string cimport PyString_AsStringAndSize
30
 
from python_string cimport PyString_AsString, PyString_Size
31
 
from python_ref cimport Py_DECREF, Py_INCREF
32
 
 
33
 
cdef extern from "Python.h":
34
 
    ctypedef int Py_ssize_t
35
 
    cdef void PyEval_InitThreads()
36
 
 
37
 
# For some reason we need to call this.  My guess is that we are not doing
38
 
# any Python treading.
39
 
PyEval_InitThreads()
40
 
 
41
 
import copy as copy_mod
42
 
import cPickle as pickle
43
 
import random
44
 
import struct
45
 
 
46
 
try:
47
 
    import json
48
 
except ImportError:
49
 
    try:
50
 
        import simplejson as json
51
 
    except ImportError:
52
 
        json = None
53
 
 
54
 
include "allocate.pxi"
55
 
 
56
 
#-----------------------------------------------------------------------------
57
 
# Import the C header files
58
 
#-----------------------------------------------------------------------------
59
 
 
60
 
cdef extern from "errno.h" nogil:
61
 
    enum: ZMQ_EINVAL "EINVAL"
62
 
    enum: ZMQ_EAGAIN "EAGAIN"
63
 
 
64
 
cdef extern from "string.h" nogil:
65
 
    void *memcpy(void *dest, void *src, size_t n)
66
 
    size_t strlen(char *s)
67
 
 
68
 
cdef extern from "zmq_compat.h":
69
 
    ctypedef signed long long int64_t "pyzmq_int64_t"
70
 
 
71
 
cdef extern from "zmq.h" nogil:
72
 
    enum: ZMQ_HAUSNUMERO
73
 
    enum: ZMQ_ENOTSUP "ENOTSUP"
74
 
    enum: ZMQ_EPROTONOSUPPORT "EPROTONOSUPPORT"
75
 
    enum: ZMQ_ENOBUFS "ENOBUFS"
76
 
    enum: ZMQ_ENETDOWN "ENETDOWN"
77
 
    enum: ZMQ_EADDRINUSE "EADDRINUSE"
78
 
    enum: ZMQ_EADDRNOTAVAIL "EADDRNOTAVAIL"
79
 
    enum: ZMQ_ECONNREFUSED "ECONNREFUSED"
80
 
    enum: ZMQ_EINPROGRESS "EINPROGRESS"
81
 
    enum: ZMQ_EMTHREAD "EMTHREAD"
82
 
    enum: ZMQ_EFSM "EFSM"
83
 
    enum: ZMQ_ENOCOMPATPROTO "ENOCOMPATPROTO"
84
 
    enum: ZMQ_ETERM "ETERM"
85
 
 
86
 
    enum: errno
87
 
    char *zmq_strerror (int errnum)
88
 
    int zmq_errno()
89
 
 
90
 
    enum: ZMQ_MAX_VSM_SIZE # 30
91
 
    enum: ZMQ_DELIMITER # 31
92
 
    enum: ZMQ_VSM # 32
93
 
 
94
 
    ctypedef struct zmq_msg_t:
95
 
        void *content
96
 
        unsigned char shared
97
 
        unsigned char vsm_size
98
 
        unsigned char vsm_data [ZMQ_MAX_VSM_SIZE]
99
 
    
100
 
    ctypedef void zmq_free_fn(void *data, void *hint)
101
 
    
102
 
    int zmq_msg_init (zmq_msg_t *msg)
103
 
    int zmq_msg_init_size (zmq_msg_t *msg, size_t size)
104
 
    int zmq_msg_init_data (zmq_msg_t *msg, void *data,
105
 
        size_t size, zmq_free_fn *ffn, void *hint)
106
 
    int zmq_msg_close (zmq_msg_t *msg)
107
 
    int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src)
108
 
    int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src)
109
 
    void *zmq_msg_data (zmq_msg_t *msg)
110
 
    size_t zmq_msg_size (zmq_msg_t *msg)
111
 
 
112
 
    void *zmq_init (int io_threads)
113
 
    int zmq_term (void *context)
114
 
 
115
 
    enum: ZMQ_PAIR # 0
116
 
    enum: ZMQ_PUB # 1
117
 
    enum: ZMQ_SUB # 2
118
 
    enum: ZMQ_REQ # 3
119
 
    enum: ZMQ_REP # 4
120
 
    enum: ZMQ_XREQ # 5
121
 
    enum: ZMQ_XREP # 6
122
 
    enum: ZMQ_UPSTREAM # 7
123
 
    enum: ZMQ_DOWNSTREAM # 8
124
 
 
125
 
    enum: ZMQ_HWM # 1
126
 
    enum: ZMQ_SWAP # 3
127
 
    enum: ZMQ_AFFINITY # 4
128
 
    enum: ZMQ_IDENTITY # 5
129
 
    enum: ZMQ_SUBSCRIBE # 6
130
 
    enum: ZMQ_UNSUBSCRIBE # 7
131
 
    enum: ZMQ_RATE # 8
132
 
    enum: ZMQ_RECOVERY_IVL # 9
133
 
    enum: ZMQ_MCAST_LOOP # 10
134
 
    enum: ZMQ_SNDBUF # 11
135
 
    enum: ZMQ_RCVBUF # 12
136
 
    enum: ZMQ_RCVMORE # 13
137
 
 
138
 
    enum: ZMQ_NOBLOCK # 1
139
 
    enum: ZMQ_SNDMORE # 2
140
 
 
141
 
    void *zmq_socket (void *context, int type)
142
 
    int zmq_close (void *s)
143
 
    int zmq_setsockopt (void *s, int option, void *optval, size_t optvallen)
144
 
    int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen)
145
 
    int zmq_bind (void *s, char *addr)
146
 
    int zmq_connect (void *s, char *addr)
147
 
    int zmq_send (void *s, zmq_msg_t *msg, int flags)
148
 
    int zmq_recv (void *s, zmq_msg_t *msg, int flags)
149
 
    
150
 
    enum: ZMQ_POLLIN # 1
151
 
    enum: ZMQ_POLLOUT # 2
152
 
    enum: ZMQ_POLLERR # 4
153
 
 
154
 
    ctypedef struct zmq_pollitem_t:
155
 
        void *socket
156
 
        int fd
157
 
        # #if defined _WIN32
158
 
        #     SOCKET fd;
159
 
        short events
160
 
        short revents
161
 
 
162
 
    int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout)
163
 
 
164
 
    # void *zmq_stopwatch_start ()
165
 
    # unsigned long zmq_stopwatch_stop (void *watch_)
166
 
    # void zmq_sleep (int seconds_)
167
 
 
168
 
#-----------------------------------------------------------------------------
169
 
# Python module level constants
170
 
#-----------------------------------------------------------------------------
171
 
 
172
 
NOBLOCK = ZMQ_NOBLOCK
173
 
PAIR = ZMQ_PAIR
174
 
PUB = ZMQ_PUB
175
 
SUB = ZMQ_SUB
176
 
REQ = ZMQ_REQ
177
 
REP = ZMQ_REP
178
 
XREQ = ZMQ_XREQ
179
 
XREP = ZMQ_XREP
180
 
UPSTREAM = ZMQ_UPSTREAM
181
 
DOWNSTREAM = ZMQ_DOWNSTREAM
182
 
HWM = ZMQ_HWM
183
 
SWAP = ZMQ_SWAP
184
 
AFFINITY = ZMQ_AFFINITY
185
 
IDENTITY = ZMQ_IDENTITY
186
 
SUBSCRIBE = ZMQ_SUBSCRIBE
187
 
UNSUBSCRIBE = ZMQ_UNSUBSCRIBE
188
 
RATE = ZMQ_RATE
189
 
RECOVERY_IVL = ZMQ_RECOVERY_IVL
190
 
MCAST_LOOP = ZMQ_MCAST_LOOP
191
 
SNDBUF = ZMQ_SNDBUF
192
 
RCVBUF = ZMQ_RCVBUF
193
 
RCVMORE = ZMQ_RCVMORE
194
 
SNDMORE = ZMQ_SNDMORE
195
 
POLLIN = ZMQ_POLLIN
196
 
POLLOUT = ZMQ_POLLOUT
197
 
POLLERR = ZMQ_POLLERR
198
 
 
199
 
#-----------------------------------------------------------------------------
200
 
# Error handling
201
 
#-----------------------------------------------------------------------------
202
 
 
203
 
# Often used (these are alse in errno.)
204
 
EAGAIN = ZMQ_EAGAIN
205
 
EINVAL = ZMQ_EINVAL
206
 
 
207
 
# For Windows compatability
208
 
ENOTSUP = ZMQ_ENOTSUP
209
 
EPROTONOSUPPORT = ZMQ_EPROTONOSUPPORT
210
 
ENOBUFS = ZMQ_ENOBUFS
211
 
ENETDOWN = ZMQ_ENETDOWN
212
 
EADDRINUSE = ZMQ_EADDRINUSE
213
 
EADDRNOTAVAIL = ZMQ_EADDRNOTAVAIL
214
 
ECONNREFUSED = ZMQ_ECONNREFUSED
215
 
EINPROGRESS = ZMQ_EINPROGRESS
216
 
 
217
 
# 0MQ Native
218
 
EMTHREAD = ZMQ_EMTHREAD
219
 
EFSM = ZMQ_EFSM
220
 
ENOCOMPATPROTO = ZMQ_ENOCOMPATPROTO
221
 
ETERM = ZMQ_ETERM
222
 
 
223
 
 
224
 
def strerror(errnum):
225
 
    """Return the error string given the error number."""
226
 
    return zmq_strerror(errnum)
227
 
 
228
 
class ZMQBaseError(Exception):
229
 
    pass
230
 
 
231
 
class ZMQError(ZMQBaseError):
232
 
    """Base exception class for 0MQ errors in Python."""
233
 
 
234
 
    def __init__(self, error=None):
235
 
        """Wrap an errno style error.
236
 
 
237
 
        Parameters
238
 
        ----------
239
 
        error : int
240
 
            The ZMQ errno or None.  If None, then zmq_errno() is called and
241
 
            used.
242
 
        """
243
 
        if error is None:
244
 
            error = zmq_errno()
245
 
        if type(error) == int:
246
 
            self.errstr = strerror(error)
247
 
            self.errno = error
248
 
        else:
249
 
            self.errstr = str(error)
250
 
            self.errno = None 
251
 
 
252
 
    def __str__(self):
253
 
        return self.errstr
254
 
 
255
 
class ZMQBindError(ZMQBaseError):
256
 
    """An error for bind_to_random_port."""
257
 
    pass
258
 
 
259
 
#-----------------------------------------------------------------------------
260
 
# Code
261
 
#-----------------------------------------------------------------------------
262
 
 
263
 
 
264
 
cdef void free_python_msg(void *data, void *hint) with gil:
265
 
    """A function for DECREF'ing Python based messages."""
266
 
    if hint != NULL:
267
 
        Py_DECREF(<object>hint)
268
 
 
269
 
 
270
 
cdef class Message:
271
 
    """A Message class for non-copy send/recvs.
272
 
 
273
 
    This class is only needed if you want to do non-copying send and recvs.
274
 
    When you pass a string to this class, like ``Message(s)``, the 
275
 
    ref-count of s is increased by two: once because Message saves s as 
276
 
    an instance attribute and another because a ZMQ message is created that
277
 
    points to the buffer of s. This second ref-count increase makes sure
278
 
    that s lives until all messages that use it have been sent. Once 0MQ
279
 
    sends all the messages and it doesn't need the buffer of s, 0MQ will call
280
 
    Py_DECREF(s).
281
 
    """
282
 
 
283
 
    cdef zmq_msg_t zmq_msg
284
 
    cdef object data
285
 
    
286
 
    def __cinit__(self, object data=None):
287
 
        cdef int rc
288
 
        # Save the data object in case the user wants the the data as a str.
289
 
        self.data = data
290
 
        cdef char *data_c = NULL
291
 
        cdef Py_ssize_t data_len_c
292
 
 
293
 
        if data is None:
294
 
            with nogil:
295
 
                rc = zmq_msg_init(&self.zmq_msg)
296
 
            if rc != 0:
297
 
                raise ZMQError()
298
 
        else:
299
 
            PyString_AsStringAndSize(data, &data_c, &data_len_c)
300
 
            # We INCREF the *original* Python object (not self) and pass it
301
 
            # as the hint below. This allows other copies of this Message
302
 
            # object to take over the ref counting of data properly.
303
 
            Py_INCREF(data)
304
 
            with nogil:
305
 
                rc = zmq_msg_init_data(
306
 
                    &self.zmq_msg, <void *>data_c, data_len_c, 
307
 
                    <zmq_free_fn *>free_python_msg, <void *>data
308
 
                )
309
 
            if rc != 0:
310
 
                Py_DECREF(data)
311
 
                raise ZMQError()
312
 
 
313
 
    def __dealloc__(self):
314
 
        cdef int rc
315
 
        # This simply decreases the 0MQ ref-count of zmq_msg.
316
 
        rc = zmq_msg_close(&self.zmq_msg)
317
 
        if rc != 0:
318
 
            raise ZMQError()
319
 
 
320
 
    def __copy__(self):
321
 
        """Create a shallow copy of the message.
322
 
 
323
 
        This does not copy the contents of the Message, just the pointer.
324
 
        This will increment the 0MQ ref count of the message, but not
325
 
        the ref count of the Python object. That is only done once when
326
 
        the Python is first turned into a 0MQ message.
327
 
        """
328
 
        return self.fast_copy()
329
 
 
330
 
    cdef Message fast_copy(self):
331
 
        """Fast, cdef'd version of shallow copy of the message."""
332
 
        cdef Message new_msg
333
 
        new_msg = Message()
334
 
        # This does not copy the contents, but just increases the ref-count 
335
 
        # of the zmq_msg by one.
336
 
        zmq_msg_copy(&new_msg.zmq_msg, &self.zmq_msg)
337
 
        # Copy the ref to data so the copy won't create a copy when str is
338
 
        # called.
339
 
        if self.data is not None:
340
 
            new_msg.data = self.data
341
 
        return new_msg
342
 
 
343
 
    def __len__(self):
344
 
        """Return the length of the message in bytes."""
345
 
        return <int>zmq_msg_size(&self.zmq_msg)
346
 
 
347
 
    def __str__(self):
348
 
        """Return the str form of the message."""
349
 
        cdef char *data_c = NULL
350
 
        cdef Py_ssize_t data_len_c
351
 
        if self.data is None:
352
 
            data_c = <char *>zmq_msg_data(&self.zmq_msg)
353
 
            data_len_c = zmq_msg_size(&self.zmq_msg)
354
 
            return PyString_FromStringAndSize(data_c, data_len_c)
355
 
        else:
356
 
            return self.data
357
 
 
358
 
 
359
 
cdef class Context:
360
 
    """Manage the lifecycle of a 0MQ context.
361
 
 
362
 
    This class no longer takes any flags or the number of application
363
 
    threads.
364
 
 
365
 
    Parameters
366
 
    ----------
367
 
    io_threads : int
368
 
        The number of IO threads.
369
 
    """
370
 
 
371
 
    cdef void *handle
372
 
 
373
 
    def __cinit__(self, int io_threads=1):
374
 
        self.handle = NULL
375
 
        if not io_threads > 0:
376
 
            raise ZMQError(EINVAL)
377
 
        self.handle = zmq_init(io_threads)
378
 
        if self.handle == NULL:
379
 
            raise ZMQError()
380
 
 
381
 
    def __dealloc__(self):
382
 
        cdef int rc
383
 
        if self.handle != NULL:
384
 
            rc = zmq_term(self.handle)
385
 
            if rc != 0:
386
 
                raise ZMQError()
387
 
 
388
 
    def socket(self, int socket_type):
389
 
        """Create a Socket associated with this Context.
390
 
 
391
 
        Parameters
392
 
        ----------
393
 
        socket_type : int
394
 
            The socket type, which can be any of the 0MQ socket types: 
395
 
            REQ, REP, PUB, SUB, PAIR, XREQ, XREP, UPSTREAM, DOWNSTREAM.
396
 
        """
397
 
        return Socket(self, socket_type)
398
 
 
399
 
 
400
 
cdef class Socket:
401
 
    """A 0MQ socket.
402
 
 
403
 
    Socket(context, socket_type)
404
 
 
405
 
    Parameters
406
 
    ----------
407
 
    context : Context
408
 
        The 0MQ Context this Socket belongs to.
409
 
    socket_type : int
410
 
        The socket type, which can be any of the 0MQ socket types: 
411
 
        REQ, REP, PUB, SUB, PAIR, XREQ, XREP, UPSTREAM, DOWNSTREAM.
412
 
    """
413
 
 
414
 
    cdef void *handle
415
 
    cdef public int socket_type
416
 
    # Hold on to a reference to the context to make sure it is not garbage
417
 
    # collected until the socket it done with it.
418
 
    cdef public Context context
419
 
    cdef public object closed
420
 
 
421
 
    def __cinit__(self, Context context, int socket_type):
422
 
        self.handle = NULL
423
 
        self.context = context
424
 
        self.socket_type = socket_type
425
 
        self.handle = zmq_socket(context.handle, socket_type)
426
 
        if self.handle == NULL:
427
 
            raise ZMQError()
428
 
        self.closed = False
429
 
 
430
 
    def __dealloc__(self):
431
 
        self.close()
432
 
 
433
 
    def close(self):
434
 
        """Close the socket.
435
 
 
436
 
        This can be called to close the socket by hand. If this is not
437
 
        called, the socket will automatically be closed when it is
438
 
        garbage collected.
439
 
        """
440
 
        cdef int rc
441
 
        if self.handle != NULL and not self.closed:
442
 
            rc = zmq_close(self.handle)
443
 
            if rc != 0:
444
 
                raise ZMQError()
445
 
            self.handle = NULL
446
 
            self.closed = True
447
 
 
448
 
    def _check_closed(self):
449
 
        if self.closed:
450
 
            raise ZMQError(ENOTSUP)
451
 
 
452
 
    def setsockopt(self, int option, optval):
453
 
        """Set socket options.
454
 
 
455
 
        See the 0MQ documentation for details on specific options.
456
 
 
457
 
        Parameters
458
 
        ----------
459
 
        option : str
460
 
            The name of the option to set. Can be any of: SUBSCRIBE, 
461
 
            UNSUBSCRIBE, IDENTITY, HWM, SWAP, AFFINITY, RATE, 
462
 
            RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF.
463
 
        optval : int or str
464
 
            The value of the option to set.
465
 
        """
466
 
        cdef int64_t optval_int_c
467
 
        cdef int rc
468
 
 
469
 
        self._check_closed()
470
 
 
471
 
        if option in [SUBSCRIBE, UNSUBSCRIBE, IDENTITY]:
472
 
            if not isinstance(optval, str):
473
 
                raise TypeError('expected str, got: %r' % optval)
474
 
            rc = zmq_setsockopt(
475
 
                self.handle, option,
476
 
                PyString_AsString(optval), PyString_Size(optval)
477
 
            )
478
 
        elif option in [HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL,
479
 
                        MCAST_LOOP, SNDBUF, RCVBUF]:
480
 
            if not isinstance(optval, int):
481
 
                raise TypeError('expected int, got: %r' % optval)
482
 
            optval_int_c = optval
483
 
            rc = zmq_setsockopt(
484
 
                self.handle, option,
485
 
                &optval_int_c, sizeof(int64_t)
486
 
            )
487
 
        else:
488
 
            raise ZMQError(EINVAL)
489
 
 
490
 
        if rc != 0:
491
 
            raise ZMQError()
492
 
 
493
 
    def getsockopt(self, int option):
494
 
        """Get the value of a socket option.
495
 
 
496
 
        See the 0MQ documentation for details on specific options.
497
 
 
498
 
        Parameters
499
 
        ----------
500
 
        option : str
501
 
            The name of the option to set. Can be any of: 
502
 
            IDENTITY, HWM, SWAP, AFFINITY, RATE, 
503
 
            RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF, RCVMORE.
504
 
 
505
 
        Returns
506
 
        -------
507
 
        The value of the option as a string or int.
508
 
        """
509
 
        cdef int64_t optval_int_c
510
 
        cdef char identity_str_c [255]
511
 
        cdef size_t sz
512
 
        cdef int rc
513
 
 
514
 
        self._check_closed()
515
 
 
516
 
        if option in [IDENTITY]:
517
 
            sz = 255
518
 
            rc = zmq_getsockopt(self.handle, option, <void *>identity_str_c, &sz)
519
 
            if rc != 0:
520
 
                raise ZMQError()
521
 
            result = PyString_FromStringAndSize(<char *>identity_str_c, sz)
522
 
        elif option in [HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL,
523
 
                        MCAST_LOOP, SNDBUF, RCVBUF, RCVMORE]:
524
 
            sz = sizeof(int64_t)
525
 
            rc = zmq_getsockopt(self.handle, option, <void *>&optval_int_c, &sz)
526
 
            if rc != 0:
527
 
                raise ZMQError()
528
 
            result = optval_int_c
529
 
        else:
530
 
            raise ZMQError()
531
 
 
532
 
        return result
533
 
 
534
 
    def bind(self, addr):
535
 
        """Bind the socket to an address.
536
 
 
537
 
        This causes the socket to listen on a network port. Sockets on the
538
 
        other side of this connection will use :meth:`Sockiet.connect` to
539
 
        connect to this socket.
540
 
 
541
 
        Parameters
542
 
        ----------
543
 
        addr : str
544
 
            The address string. This has the form 'protocol://interface:port',
545
 
            for example 'tcp://127.0.0.1:5555'. Protocols supported are
546
 
            tcp, upd, pgm, inproc and ipc.
547
 
        """
548
 
        cdef int rc
549
 
 
550
 
        self._check_closed()
551
 
 
552
 
        if not isinstance(addr, str):
553
 
            raise TypeError('expected str, got: %r' % addr)
554
 
        rc = zmq_bind(self.handle, addr)
555
 
        if rc != 0:
556
 
            raise ZMQError()
557
 
 
558
 
    def bind_to_random_port(self, addr, min_port=2000, max_port=20000, max_tries=100):
559
 
        """Bind this socket to a random port in a range.
560
 
 
561
 
        Parameters
562
 
        ----------
563
 
        addr : str
564
 
            The address string without the port to pass to :meth:`Socket.bind`.
565
 
        min_port : int
566
 
            The minimum port in the range of ports to try.
567
 
        max_port : int
568
 
            The maximum port in the range of ports to try.
569
 
        max_tries : int
570
 
            The number of attempt to bind.
571
 
 
572
 
        Returns
573
 
        -------
574
 
        port : int
575
 
            The port the socket was bound to.
576
 
        """
577
 
        for i in range(max_tries):
578
 
            try:
579
 
                port = random.randrange(min_port, max_port)
580
 
                self.bind('%s:%s' % (addr, port))
581
 
            except ZMQError:
582
 
                pass
583
 
            else:
584
 
                return port
585
 
        raise ZMQBindError("Could not bind socket to random port.")
586
 
 
587
 
    def connect(self, addr):
588
 
        """Connect to a remote 0MQ socket.
589
 
 
590
 
        Parameters
591
 
        ----------
592
 
        addr : str
593
 
            The address string. This has the form 'protocol://interface:port',
594
 
            for example 'tcp://127.0.0.1:5555'. Protocols supported are
595
 
            tcp, upd, pgm, inproc and ipc.
596
 
        """
597
 
        cdef int rc
598
 
 
599
 
        self._check_closed()
600
 
 
601
 
        if not isinstance(addr, str):
602
 
            raise TypeError('expected str, got: %r' % addr)
603
 
        rc = zmq_connect(self.handle, addr)
604
 
        if rc != 0:
605
 
            raise ZMQError()
606
 
 
607
 
    #-------------------------------------------------------------------------
608
 
    # Sending and receiving messages
609
 
    #-------------------------------------------------------------------------
610
 
 
611
 
    def send(self, object data, int flags=0, bool copy=True):
612
 
        """Send a message on this socket.
613
 
 
614
 
        This queues the message to be sent by the IO thread at a later time.
615
 
 
616
 
        Parameters
617
 
        ----------
618
 
        data : object, str, Message
619
 
            The content of the message.
620
 
        flags : int
621
 
            Any supported flag: NOBLOCK, SNDMORE.
622
 
        copy : bool
623
 
            Should the message be sent in a copying or non-copying manner.
624
 
 
625
 
        Returns
626
 
        -------
627
 
        None if message was sent, raises an exception otherwise.
628
 
        """
629
 
        self._check_closed()
630
 
        if isinstance(data, Message):
631
 
            return self._send_message(data, flags)
632
 
        elif copy:
633
 
            return self._send_copy(data, flags)
634
 
        else:
635
 
            # I am not sure which non-copy implemntation to use here.
636
 
            # It probably doesn't matter though.
637
 
            msg = Message(data)
638
 
            return self._send_message(msg, flags)
639
 
            # return self._send_nocopy(msg, flags)
640
 
 
641
 
    def _send_message(self, Message msg, int flags=0):
642
 
        """Send a Message on this socket in a non-copy manner."""
643
 
        cdef int rc
644
 
        cdef Message msg_copy
645
 
 
646
 
        # Always copy so the original message isn't garbage collected.
647
 
        # This doesn't do a real copy, just a reference.
648
 
        msg_copy = msg.fast_copy()
649
 
        # msg_copy = copy_mod.copy(msg)
650
 
        with nogil:
651
 
            rc = zmq_send(self.handle, &msg.zmq_msg, flags)
652
 
 
653
 
        if rc != 0:
654
 
            raise ZMQError()
655
 
 
656
 
    def _send_copy(self, object msg, int flags=0):
657
 
        """Send a message on this socket by copying its content."""
658
 
        cdef int rc, rc2
659
 
        cdef zmq_msg_t data
660
 
        cdef char *msg_c
661
 
        cdef Py_ssize_t msg_c_len
662
 
 
663
 
        if not isinstance(msg, str):
664
 
            raise TypeError('expected str, got: %r' % msg)
665
 
 
666
 
        PyString_AsStringAndSize(msg, &msg_c, &msg_c_len)
667
 
        # Copy the msg before sending. This avoids any complications with
668
 
        # the GIL, etc.
669
 
        # If zmq_msg_init_* fails do we need to call zmq_msg_close?
670
 
        rc = zmq_msg_init_size(&data, msg_c_len)
671
 
        memcpy(zmq_msg_data(&data), msg_c, zmq_msg_size(&data))
672
 
 
673
 
        if rc != 0:
674
 
            raise ZMQError()
675
 
 
676
 
        with nogil:
677
 
            rc = zmq_send(self.handle, &data, flags)
678
 
        rc2 = zmq_msg_close(&data)
679
 
 
680
 
        # Shouldn't the error handling for zmq_msg_close come after that
681
 
        # of zmq_send?
682
 
        if rc2 != 0:
683
 
            raise ZMQError()
684
 
 
685
 
        if rc != 0:
686
 
            raise ZMQError()
687
 
 
688
 
    def _send_nocopy(self, object msg, int flags=0):
689
 
        """Send a Python string on this socket in a non-copy manner.
690
 
 
691
 
        This method is not being used currently, as the same functionality
692
 
        is provided by self._send_message(Message(data)). This may eventually
693
 
        be removed.
694
 
        """
695
 
        cdef int rc
696
 
        cdef zmq_msg_t data
697
 
        cdef char *msg_c
698
 
        cdef Py_ssize_t msg_c_len
699
 
 
700
 
        if not isinstance(msg, str):
701
 
            raise TypeError('expected str, got: %r' % msg)
702
 
 
703
 
        PyString_AsStringAndSize(msg, &msg_c, &msg_c_len)
704
 
        Py_INCREF(msg) # We INCREF to prevent Python from gc'ing msg
705
 
        rc = zmq_msg_init_data(
706
 
            &data, <void *>msg_c, msg_c_len,
707
 
            <zmq_free_fn *>free_python_msg, <void *>msg
708
 
        )
709
 
 
710
 
        if rc != 0:
711
 
            # If zmq_msg_init_data fails it does not call zmq_free_fn, 
712
 
            # so we Py_DECREF.
713
 
            Py_DECREF(msg)
714
 
            raise ZMQError()
715
 
 
716
 
        with nogil:
717
 
            rc = zmq_send(self.handle, &data, flags)
718
 
 
719
 
        if rc != 0:
720
 
            # If zmq_send fails it does not call zmq_free_fn, so we Py_DECREF.
721
 
            Py_DECREF(msg)
722
 
            zmq_msg_close(&data)
723
 
            raise ZMQError()
724
 
 
725
 
        rc = zmq_msg_close(&data)
726
 
        if rc != 0:
727
 
            raise ZMQError()
728
 
 
729
 
    def recv(self, int flags=0, copy=True):
730
 
        """Receive a message.
731
 
 
732
 
        Parameters
733
 
        ----------
734
 
        flags : int
735
 
            Any supported flag: NOBLOCK. If NOBLOCK is set, this method
736
 
            will return None if a message is not ready. If NOBLOCK is not
737
 
            set, then this method will block until a message arrives.
738
 
        copy : bool
739
 
            Should the message be received in a copying or non-copying manner.
740
 
            If True a Message object is returned, if False a string copy of 
741
 
            message is returned.
742
 
        Returns
743
 
        -------
744
 
        msg : str
745
 
            The returned message, or raises ZMQError otherwise.
746
 
        """
747
 
        self._check_closed()
748
 
        if copy:
749
 
            # This could be implemented by simple calling _recv_message and
750
 
            # then casting to a str.
751
 
            return self._recv_copy(flags)
752
 
        else:
753
 
            return self._recv_message(flags)
754
 
 
755
 
    def _recv_message(self, int flags=0):
756
 
        """Receive a message in a non-copying manner and return a Message."""
757
 
        cdef int rc
758
 
        cdef Message msg
759
 
        msg = Message()
760
 
 
761
 
        with nogil:
762
 
            rc = zmq_recv(self.handle, &msg.zmq_msg, flags)
763
 
 
764
 
        if rc != 0:
765
 
            raise ZMQError()
766
 
        return msg
767
 
 
768
 
    def _recv_copy(self, int flags=0):
769
 
        """Receive a message in a copying manner as a string."""
770
 
        cdef int rc
771
 
        cdef zmq_msg_t data
772
 
 
773
 
        rc = zmq_msg_init(&data)
774
 
        if rc != 0:
775
 
            raise ZMQError()
776
 
 
777
 
        with nogil:
778
 
            rc = zmq_recv(self.handle, &data, flags)
779
 
 
780
 
        if rc != 0:
781
 
            raise ZMQError()
782
 
 
783
 
        try:
784
 
            msg = PyString_FromStringAndSize(
785
 
                <char *>zmq_msg_data(&data), 
786
 
                zmq_msg_size(&data)
787
 
            )
788
 
        finally:
789
 
            rc = zmq_msg_close(&data)
790
 
 
791
 
        if rc != 0:
792
 
            raise ZMQError()
793
 
        return msg
794
 
 
795
 
    def send_multipart(self, msg_parts, int flags=0, copy=True):
796
 
        """Send a sequence of messages as a multipart message.
797
 
 
798
 
        Parameters
799
 
        ----------
800
 
        msg_parts : iterable
801
 
            A sequence of messages to send as a multipart message.
802
 
        flags : int
803
 
            Only the NOBLOCK flagis supported, SNDMORE is handled
804
 
            automatically.
805
 
        """
806
 
        for msg in msg_parts[:-1]:
807
 
            self.send(msg, SNDMORE|flags, copy=copy)
808
 
        # Send the last part without the SNDMORE flag.
809
 
        self.send(msg_parts[-1], flags)
810
 
 
811
 
    def recv_multipart(self, int flags=0, copy=True):
812
 
        """Receive a multipart message as a list of messages.
813
 
 
814
 
        Parameters
815
 
        ----------
816
 
        flags : int
817
 
            Any supported flag: NOBLOCK. If NOBLOCK is set, this method
818
 
            will return None if a message is not ready. If NOBLOCK is not
819
 
            set, then this method will block until a message arrives.
820
 
 
821
 
        Returns
822
 
        -------
823
 
        msg_parts : list
824
 
            A list of messages in the multipart message.
825
 
        """
826
 
        parts = []
827
 
        while True:
828
 
            part = self.recv(flags, copy=copy)
829
 
            parts.append(part)
830
 
            if self.rcvmore():
831
 
                continue
832
 
            else:
833
 
                break
834
 
        return parts
835
 
 
836
 
    def rcvmore(self):
837
 
        """Are there more parts to a multipart message."""
838
 
        more = self.getsockopt(RCVMORE)
839
 
        return bool(more)
840
 
 
841
 
    def send_pyobj(self, obj, flags=0, protocol=-1):
842
 
        """Send a Python object as a message using pickle to serialize.
843
 
 
844
 
        Parameters
845
 
        ----------
846
 
        obj : Python object
847
 
            The Python object to send.
848
 
        flags : int
849
 
            Any valid send flag.
850
 
        protocol : int
851
 
            The pickle protocol number to use. Default of -1 will select
852
 
            the highest supported number. Use 0 for multiple platform
853
 
            support.
854
 
        """
855
 
        msg = pickle.dumps(obj, protocol)
856
 
        return self.send(msg, flags)
857
 
 
858
 
    def recv_pyobj(self, flags=0):
859
 
        """Receive a Python object as a message using pickle to serialize.
860
 
 
861
 
        Parameters
862
 
        ----------
863
 
        flags : int
864
 
            Any valid recv flag.
865
 
 
866
 
        Returns
867
 
        -------
868
 
        obj : Python object
869
 
            The Python object that arrives as a message.
870
 
        """
871
 
        s = self.recv(flags)
872
 
        return pickle.loads(s)
873
 
 
874
 
    def send_json(self, obj, flags=0):
875
 
        """Send a Python object as a message using json to serialize.
876
 
 
877
 
        Parameters
878
 
        ----------
879
 
        obj : Python object
880
 
            The Python object to send.
881
 
        flags : int
882
 
            Any valid send flag.
883
 
        """
884
 
        if json is None:
885
 
            raise ImportError('json or simplejson library is required.')
886
 
        else:
887
 
            msg = json.dumps(obj, separators=(',',':'))
888
 
            return self.send(msg, flags)
889
 
 
890
 
    def recv_json(self, flags=0):
891
 
        """Receive a Python object as a message using json to serialize.
892
 
 
893
 
        Parameters
894
 
        ----------
895
 
        flags : int
896
 
            Any valid recv flag.
897
 
 
898
 
        Returns
899
 
        -------
900
 
        obj : Python object
901
 
            The Python object that arrives as a message.
902
 
        """
903
 
        if json is None:
904
 
            raise ImportError('json or simplejson library is required.')
905
 
        else:
906
 
            msg = self.recv(flags)
907
 
            return json.loads(msg)
908
 
 
909
 
 
910
 
# cdef class Stopwatch:
911
 
#     """A simple stopwatch based on zmq_stopwatch_start/stop."""
912
 
913
 
#     cdef void *watch
914
 
915
 
#     def __cinit__(self):
916
 
#         self.watch = NULL
917
 
918
 
#     def __dealloc__(self):
919
 
#         try:
920
 
#             self.stop()
921
 
#         except ZMQError:
922
 
#             pass
923
 
924
 
#     def start(self):
925
 
#         if self.watch == NULL:
926
 
#             self.watch = zmq_stopwatch_start()
927
 
#         else:
928
 
#             raise ZMQError('Stopwatch is already runing.')
929
 
930
 
#     def stop(self):
931
 
#         if self.watch == NULL:
932
 
#             raise ZMQError('Must start the Stopwatch before calling stop.')
933
 
#         else:
934
 
#             time = zmq_stopwatch_stop(self.watch)
935
 
#             self.watch = NULL
936
 
#             return time
937
 
938
 
#     def sleep(self, int seconds):
939
 
#         zmq_sleep(seconds)
940
 
 
941
 
 
942
 
def _poll(sockets, long timeout=-1):
943
 
    """Poll a set of 0MQ sockets, native file descs. or sockets.
944
 
 
945
 
    Parameters
946
 
    ----------
947
 
    sockets : list of tuples of (socket, flags)
948
 
        Each element of this list is a two-tuple containing a socket
949
 
        and a flags. The socket may be a 0MQ socket or any object with
950
 
        a :meth:`fileno` method. The flags can be zmq.POLLIN (for detecting
951
 
        for incoming messages), zmq.POLLOUT (for detecting that send is OK)
952
 
        or zmq.POLLIN|zmq.POLLOUT for detecting both.
953
 
    timeout : int
954
 
        The number of microseconds to poll for. Negative means no timeout.
955
 
    """
956
 
    cdef int rc, i
957
 
    cdef zmq_pollitem_t *pollitems = NULL
958
 
    cdef int nsockets = len(sockets)
959
 
    cdef Socket current_socket
960
 
    pollitems_o = allocate(nsockets*sizeof(zmq_pollitem_t),<void**>&pollitems)
961
 
 
962
 
    for i in range(nsockets):
963
 
        s = sockets[i][0]
964
 
        events = sockets[i][1]
965
 
        if isinstance(s, Socket):
966
 
            current_socket = s
967
 
            pollitems[i].socket = current_socket.handle
968
 
            pollitems[i].events = events
969
 
            pollitems[i].revents = 0
970
 
        elif isinstance(s, int):
971
 
            pollitems[i].socket = NULL
972
 
            pollitems[i].fd = s
973
 
            pollitems[i].events = events
974
 
            pollitems[i].revents = 0
975
 
        elif hasattr(s, 'fileno'):
976
 
            try:
977
 
                fileno = int(s.fileno())
978
 
            except:
979
 
                raise ValueError('fileno() must return an valid integer fd')
980
 
            else:
981
 
                pollitems[i].socket = NULL
982
 
                pollitems[i].fd = fileno
983
 
                pollitems[i].events = events
984
 
                pollitems[i].revents = 0
985
 
        else:
986
 
            raise TypeError(
987
 
                "Socket must be a 0MQ socket, an integer fd or have "
988
 
                "a fileno() method: %r" % s
989
 
            )
990
 
 
991
 
    # int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout)
992
 
    with nogil:
993
 
        rc = zmq_poll(pollitems, nsockets, timeout)
994
 
    if rc == -1:
995
 
        raise ZMQError()
996
 
    
997
 
    results = []
998
 
    for i in range(nsockets):
999
 
        s = sockets[i][0]
1000
 
        # Return the fd for sockets, for compat. with select.poll.
1001
 
        if hasattr(s, 'fileno'):
1002
 
            s = s.fileno()
1003
 
        revents = pollitems[i].revents
1004
 
        # Only return sockets with non-zero status for compat. with select.poll.
1005
 
        if revents > 0:
1006
 
            results.append((s, revents))
1007
 
 
1008
 
    return results
1009
 
 
1010
 
 
1011
 
class Poller(object):
1012
 
    """An stateful poll interface that mirrors Python's built-in poll."""
1013
 
 
1014
 
    def __init__(self):
1015
 
        self.sockets = {}
1016
 
 
1017
 
    def register(self, socket, flags=POLLIN|POLLOUT):
1018
 
        """Register a 0MQ socket or native fd for I/O monitoring.
1019
 
 
1020
 
        Parameters
1021
 
        ----------
1022
 
        socket : zmq.Socket or native socket
1023
 
            A zmq.Socket or any Python object having a :meth:`fileno` 
1024
 
            method that returns a valid file descriptor.
1025
 
        flags : int
1026
 
            The events to watch for.  Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
1027
 
        """
1028
 
        self.sockets[socket] = flags
1029
 
 
1030
 
    def modify(self, socket, flags=POLLIN|POLLOUT):
1031
 
        """Modify the flags for an already registered 0MQ socket or native fd."""
1032
 
        self.register(socket, flags)
1033
 
 
1034
 
    def unregister(self, socket):
1035
 
        """Remove a 0MQ socket or native fd for I/O monitoring.
1036
 
 
1037
 
        Parameters
1038
 
        ----------
1039
 
        socket : Socket
1040
 
            The socket instance to stop polling.
1041
 
        """
1042
 
        del self.sockets[socket]
1043
 
 
1044
 
    def poll(self, timeout=None):
1045
 
        """Poll the registered 0MQ or native fds for I/O.
1046
 
 
1047
 
        Parameters
1048
 
        ----------
1049
 
        timeout : float, int
1050
 
            The timeout in milliseconds. If None, no timeout (infinite). This
1051
 
            is in milliseconds to be compatible with :func:`select.poll`. The
1052
 
            underlying zmq_poll uses microseconds and we convert to that in
1053
 
            this function.
1054
 
        """
1055
 
        if timeout is None:
1056
 
            timeout = -1
1057
 
        # Convert from ms -> us for zmq_poll.
1058
 
        timeout = int(timeout*1000.0)
1059
 
        if timeout < 0:
1060
 
            timeout = -1
1061
 
        return _poll(self.sockets.items(), timeout=timeout)
1062
 
 
1063
 
 
1064
 
def select(rlist, wlist, xlist, timeout=None):
1065
 
    """Return the result of poll as a lists of sockets ready for r/w.
1066
 
 
1067
 
    This has the same interface as Python's built-in :func:`select` function.
1068
 
 
1069
 
    Parameters
1070
 
    ----------
1071
 
    timeout : float, int
1072
 
        The timeout in seconds. This is in seconds to be compatible with
1073
 
        :func:`select.select`. The underlying zmq_poll uses microseconds and
1074
 
        we convert to that in this function.
1075
 
    """
1076
 
    if timeout is None:
1077
 
        timeout = -1
1078
 
    # Convert from sec -> us for zmq_poll.
1079
 
    timeout = int(timeout*1000000.0)
1080
 
    if timeout < 0:
1081
 
        timeout = -1
1082
 
    sockets = []
1083
 
    for s in set(rlist + wlist + xlist):
1084
 
        flags = 0
1085
 
        if s in rlist:
1086
 
            flags |= POLLIN
1087
 
        if s in wlist:
1088
 
            flags |= POLLOUT
1089
 
        if s in xlist:
1090
 
            flags |= POLLERR
1091
 
        sockets.append((s, flags))
1092
 
    return_sockets = _poll(sockets, timeout)
1093
 
    rlist, wlist, xlist = [], [], []
1094
 
    for s, flags in return_sockets:
1095
 
        if flags & POLLIN:
1096
 
            rlist.append(s)
1097
 
        if flags & POLLOUT:
1098
 
            wlist.append(s)
1099
 
        if flags & POLLERR:
1100
 
            xlist.append(s)
1101
 
    return rlist, wlist, xlist
1102
 
    
1103
 
 
1104
 
 
1105
 
__all__ = [
1106
 
    'Message',
1107
 
    'Context',
1108
 
    'Socket',
1109
 
    # 'Stopwatch',
1110
 
    'ZMQBaseError',
1111
 
    'ZMQError',
1112
 
    'ZMQBindError',
1113
 
    'NOBLOCK',
1114
 
    'PAIR',
1115
 
    'PUB',
1116
 
    'SUB',
1117
 
    'REQ',
1118
 
    'REP',
1119
 
    'XREQ',
1120
 
    'XREP',
1121
 
    'UPSTREAM',
1122
 
    'DOWNSTREAM',
1123
 
    'HWM',
1124
 
    'SWAP',
1125
 
    'AFFINITY',
1126
 
    'IDENTITY',
1127
 
    'SUBSCRIBE',
1128
 
    'UNSUBSCRIBE',
1129
 
    'RATE',
1130
 
    'RECOVERY_IVL',
1131
 
    'MCAST_LOOP',
1132
 
    'SNDBUF',
1133
 
    'RCVBUF',
1134
 
    'SNDMORE',
1135
 
    'RCVMORE',
1136
 
    'POLLIN',
1137
 
    'POLLOUT',
1138
 
    'POLLERR',
1139
 
    '_poll',
1140
 
    'select',
1141
 
    'Poller',
1142
 
    # ERRORNO codes
1143
 
    'EAGAIN',
1144
 
    'EINVAL',
1145
 
    'ENOTSUP',
1146
 
    'EPROTONOSUPPORT',
1147
 
    'ENOBUFS',
1148
 
    'ENETDOWN',
1149
 
    'EADDRINUSE',
1150
 
    'EADDRNOTAVAIL',
1151
 
    'ECONNREFUSED',
1152
 
    'EINPROGRESS',
1153
 
    'EMTHREAD',
1154
 
    'EFSM',
1155
 
    'ENOCOMPATPROTO',
1156
 
    'ETERM',
1157
 
]
1158