10
# Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
12
# This file is part of pyzmq.
14
# pyzmq is free software; you can redistribute it and/or modify it under
15
# the terms of the Lesser GNU General Public License as published by
16
# the Free Software Foundation; either version 3 of the License, or
17
# (at your option) any later version.
19
# pyzmq is distributed in the hope that it will be useful,
20
# but WITHOUT ANY WARRANTY; without even the implied warranty of
21
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22
# Lesser GNU General Public License for more details.
24
# You should have received a copy of the Lesser GNU General Public License
25
# along with this program. If not, see <http://www.gnu.org/licenses/>.
9
#-----------------------------------------------------------------------------
10
# Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley
12
# This file is part of pyzmq
14
# Distributed under the terms of the New BSD License. The full license is in
15
# the file COPYING.BSD, distributed as part of this software.
16
#-----------------------------------------------------------------------------
28
18
#-----------------------------------------------------------------------------
33
23
ctypedef int Py_ssize_t
35
25
from buffers cimport asbuffer_r
38
28
from zmq.core.socket cimport Socket
40
from zmq.core import XREP, ZMQError
30
from zmq.core import ROUTER, ZMQError
42
32
#-----------------------------------------------------------------------------
43
33
# MonitoredQueue functions
54
44
A monitored queue behaves just like a zmq QUEUE device as far as in_socket
55
45
and out_socket are concerned, except that all messages *also* go out on
56
46
mon_socket. mon_socket also prefixes the messages coming from each with a
57
prefix, by defaout 'in' and 'out', so all messages sent by mon_socket are
47
prefix, by default 'in' and 'out', so all messages sent by mon_socket are
60
50
The only difference between this and a QUEUE as far as in/out are
61
concerned is that it works with two XREP sockets by swapping the IDENT
51
concerned is that it works with two ROUTER sockets by swapping the IDENT
92
82
if not isinstance(prefix, bytes):
93
83
raise TypeError("prefix must be bytes, not %s"%type(prefix))
95
# force swap_ids if both XREP
96
swap_ids = (in_socket.socket_type == XREP and
97
out_socket.socket_type == XREP)
85
# force swap_ids if both ROUTERs
86
swap_ids = (in_socket.socket_type == ROUTER and
87
out_socket.socket_type == ROUTER)
99
89
# build zmq_msg objects from str prefixes
100
90
asbuffer_r(in_prefix, <void **>&msg_c, &msg_c_len)
101
rc = zmq_msg_init_size(&in_msg, msg_c_len)
92
rc = zmq_msg_init_size(&in_msg, msg_c_len)
104
memcpy(zmq_msg_data(&in_msg), msg_c, zmq_msg_size(&in_msg))
96
memcpy(zmq_msg_data(&in_msg), msg_c, zmq_msg_size(&in_msg))
106
98
asbuffer_r(out_prefix, <void **>&msg_c, &msg_c_len)
107
rc = zmq_msg_init_size(&out_msg, msg_c_len)
101
rc = zmq_msg_init_size(&out_msg, msg_c_len)
110
memcpy(zmq_msg_data(&out_msg), msg_c, zmq_msg_size(&out_msg))
113
rc = c_monitored_queue(ins, outs, mons, in_msg, out_msg, swap_ids)
106
memcpy(zmq_msg_data(&out_msg), msg_c, zmq_msg_size(&out_msg))
107
rc = c_monitored_queue(ins, outs, mons, &in_msg, &out_msg, swap_ids)
116
__all__ = ['monitored_queue']
b'\\ No newline at end of file'
110
__all__ = ['monitored_queue']