1
"""MonitoredQueue classes and functions.
10
# Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
12
# This file is part of pyzmq.
14
# pyzmq is free software; you can redistribute it and/or modify it under
15
# the terms of the Lesser GNU General Public License as published by
16
# the Free Software Foundation; either version 3 of the License, or
17
# (at your option) any later version.
19
# pyzmq is distributed in the hope that it will be useful,
20
# but WITHOUT ANY WARRANTY; without even the implied warranty of
21
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22
# Lesser GNU General Public License for more details.
24
# You should have received a copy of the Lesser GNU General Public License
25
# along with this program. If not, see <http://www.gnu.org/licenses/>.
28
#-----------------------------------------------------------------------------
30
#-----------------------------------------------------------------------------
32
cdef extern from "Python.h":
33
ctypedef int Py_ssize_t
35
from buffers cimport asbuffer_r
38
from zmq.core.socket cimport Socket
40
from zmq.core import XREP, ZMQError
42
#-----------------------------------------------------------------------------
43
# MonitoredQueue functions
44
#-----------------------------------------------------------------------------
47
def monitored_queue(Socket in_socket, Socket out_socket, Socket mon_socket,
48
object in_prefix='in', object out_prefix='out'):
49
"""monitored_queue(in_socket, out_socket, mon_socket,
50
in_prefix='in', out_prefix='out')
52
Start a monitored queue device.
54
A monitored queue behaves just like a zmq QUEUE device as far as in_socket
55
and out_socket are concerned, except that all messages *also* go out on
56
mon_socket. mon_socket also prefixes the messages coming from each with a
57
prefix, by defaout 'in' and 'out', so all messages sent by mon_socket are
60
The only difference between this and a QUEUE as far as in/out are
61
concerned is that it works with two XREP sockets by swapping the IDENT
67
One of the sockets to the Queue. Its messages will be prefixed with
70
One of the sockets to the Queue. Its messages will be prefixed with
71
'out'. The only difference between in/out socket is this prefix.
73
This socket sends out every message received by each of the others
74
with an in/out prefix specifying which one it was.
76
Prefix added to broadcast messages from in_socket.
78
Prefix added to broadcast messages from out_socket.
81
cdef void *ins=in_socket.handle
82
cdef void *outs=out_socket.handle
83
cdef void *mons=mon_socket.handle
85
cdef zmq_msg_t out_msg
87
cdef char *msg_c = NULL
88
cdef Py_ssize_t msg_c_len
91
for prefix in (in_prefix, out_prefix):
92
if not isinstance(prefix, bytes):
93
raise TypeError("prefix must be bytes, not %s"%type(prefix))
95
# force swap_ids if both XREP
96
swap_ids = (in_socket.socket_type == XREP and
97
out_socket.socket_type == XREP)
99
# build zmq_msg objects from str prefixes
100
asbuffer_r(in_prefix, <void **>&msg_c, &msg_c_len)
101
rc = zmq_msg_init_size(&in_msg, msg_c_len)
104
memcpy(zmq_msg_data(&in_msg), msg_c, zmq_msg_size(&in_msg))
106
asbuffer_r(out_prefix, <void **>&msg_c, &msg_c_len)
107
rc = zmq_msg_init_size(&out_msg, msg_c_len)
110
memcpy(zmq_msg_data(&out_msg), msg_c, zmq_msg_size(&out_msg))
113
rc = c_monitored_queue(ins, outs, mons, in_msg, out_msg, swap_ids)
116
__all__ = ['monitored_queue']
b'\\ No newline at end of file'