~ubuntu-branches/ubuntu/wily/pyzmq/wily

« back to all changes in this revision

Viewing changes to zmq/devices/monitoredqueue.pyx

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2013-02-24 19:23:15 UTC
  • mfrom: (1.2.1) (9 sid)
  • mto: This revision was merged to the branch mainline in revision 10.
  • Revision ID: package-import@ubuntu.com-20130224192315-qhmwp3m3ymk8r60d
Tags: 2.2.0.1-1
* New upstream release
* relicense debian packaging to LGPL-3
* update watch file to use github directly
  thanks to Bart Martens for the file
* add autopkgtests
* drop obsolete DM-Upload-Allowed
* bump standard to 3.9.4, no changes required

Show diffs side-by-side

added added

removed removed

Lines of Context:
6
6
* Brian Granger
7
7
"""
8
8
 
9
 
#
10
 
#    Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
11
 
#
12
 
#    This file is part of pyzmq.
13
 
#
14
 
#    pyzmq is free software; you can redistribute it and/or modify it under
15
 
#    the terms of the Lesser GNU General Public License as published by
16
 
#    the Free Software Foundation; either version 3 of the License, or
17
 
#    (at your option) any later version.
18
 
#
19
 
#    pyzmq is distributed in the hope that it will be useful,
20
 
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
21
 
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22
 
#    Lesser GNU General Public License for more details.
23
 
#
24
 
#    You should have received a copy of the Lesser GNU General Public License
25
 
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
26
 
#
 
9
#-----------------------------------------------------------------------------
 
10
#  Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley
 
11
#
 
12
#  This file is part of pyzmq
 
13
#
 
14
#  Distributed under the terms of the New BSD License.  The full license is in
 
15
#  the file COPYING.BSD, distributed as part of this software.
 
16
#-----------------------------------------------------------------------------
27
17
 
28
18
#-----------------------------------------------------------------------------
29
19
# Imports
33
23
    ctypedef int Py_ssize_t
34
24
 
35
25
from buffers cimport asbuffer_r
36
 
from czmq cimport *
 
26
from libzmq cimport *
37
27
 
38
28
from zmq.core.socket cimport Socket
39
29
 
40
 
from zmq.core import XREP, ZMQError
 
30
from zmq.core import ROUTER, ZMQError
41
31
 
42
32
#-----------------------------------------------------------------------------
43
33
# MonitoredQueue functions
54
44
    A monitored queue behaves just like a zmq QUEUE device as far as in_socket
55
45
    and out_socket are concerned, except that all messages *also* go out on
56
46
    mon_socket. mon_socket also prefixes the messages coming from each with a
57
 
    prefix, by defaout 'in' and 'out', so all messages sent by mon_socket are
 
47
    prefix, by default 'in' and 'out', so all messages sent by mon_socket are
58
48
    multipart.
59
49
    
60
50
    The only difference between this and a QUEUE as far as in/out are
61
 
    concerned is that it works with two XREP sockets by swapping the IDENT
 
51
    concerned is that it works with two ROUTER sockets by swapping the IDENT
62
52
    prefixes.
63
53
    
64
54
    Parameters
92
82
        if not isinstance(prefix, bytes):
93
83
            raise TypeError("prefix must be bytes, not %s"%type(prefix))
94
84
 
95
 
    # force swap_ids if both XREP
96
 
    swap_ids = (in_socket.socket_type == XREP and 
97
 
                out_socket.socket_type == XREP)
 
85
    # force swap_ids if both ROUTERs
 
86
    swap_ids = (in_socket.socket_type == ROUTER and 
 
87
                out_socket.socket_type == ROUTER)
98
88
    
99
89
    # build zmq_msg objects from str prefixes
100
90
    asbuffer_r(in_prefix, <void **>&msg_c, &msg_c_len)
101
 
    rc = zmq_msg_init_size(&in_msg, msg_c_len)
 
91
    with nogil:
 
92
        rc = zmq_msg_init_size(&in_msg, msg_c_len)
102
93
    if rc != 0:
103
94
        raise ZMQError()
104
 
    memcpy(zmq_msg_data(&in_msg), msg_c, zmq_msg_size(&in_msg))
 
95
    with nogil:
 
96
        memcpy(zmq_msg_data(&in_msg), msg_c, zmq_msg_size(&in_msg))
105
97
    
106
98
    asbuffer_r(out_prefix, <void **>&msg_c, &msg_c_len)
107
 
    rc = zmq_msg_init_size(&out_msg, msg_c_len)
 
99
    
 
100
    with nogil:
 
101
        rc = zmq_msg_init_size(&out_msg, msg_c_len)
108
102
    if rc != 0:
109
103
        raise ZMQError()
110
 
    memcpy(zmq_msg_data(&out_msg), msg_c, zmq_msg_size(&out_msg))
111
104
    
112
105
    with nogil:
113
 
        rc = c_monitored_queue(ins, outs, mons, in_msg, out_msg, swap_ids)
 
106
        memcpy(zmq_msg_data(&out_msg), msg_c, zmq_msg_size(&out_msg))
 
107
        rc = c_monitored_queue(ins, outs, mons, &in_msg, &out_msg, swap_ids)
114
108
    return rc
115
109
 
116
 
__all__ = ['monitored_queue']
 
 
b'\\ No newline at end of file'
 
110
__all__ = ['monitored_queue']