1
"""MonitoredQueue class declarations.
10
# Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
12
# This file is part of pyzmq.
14
# pyzmq is free software; you can redistribute it and/or modify it under
15
# the terms of the Lesser GNU General Public License as published by
16
# the Free Software Foundation; either version 3 of the License, or
17
# (at your option) any later version.
19
# pyzmq is distributed in the hope that it will be useful,
20
# but WITHOUT ANY WARRANTY; without even the implied warranty of
21
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22
# Lesser GNU General Public License for more details.
24
# You should have received a copy of the Lesser GNU General Public License
25
# along with this program. If not, see <http://www.gnu.org/licenses/>.
28
#-----------------------------------------------------------------------------
30
#-----------------------------------------------------------------------------
34
#-----------------------------------------------------------------------------
35
# MonitoredQueue C functions
36
#-----------------------------------------------------------------------------
39
# the MonitoredQueue C function, adapted from zmq::queue.cpp :
40
cdef inline int c_monitored_queue (void *insocket_, void *outsocket_,
41
void *sidesocket_, zmq_msg_t in_msg,
42
zmq_msg_t out_msg, int swap_ids) nogil:
43
"""The actual C function for a monitored queue device.
45
See ``monitored_queue()`` for details.
50
cdef int rc = zmq_msg_init (&msg)
52
rc = zmq_msg_init (&id_msg)
53
cdef zmq_msg_t side_msg
54
rc = zmq_msg_init (&side_msg)
60
cdef zmq_pollitem_t items [2]
61
items [0].socket = insocket_
63
items [0].events = ZMQ_POLLIN
65
items [1].socket = outsocket_
67
items [1].events = ZMQ_POLLIN
69
# I don't think sidesocket should be polled?
70
# items [2].socket = sidesocket_
72
# items [2].events = ZMQ_POLLIN
73
# items [2].revents = 0
77
# // Wait while there are either requests or replies to process.
78
rc = zmq_poll (&items [0], 2, -1)
80
# // The algorithm below asumes ratio of request and replies processed
81
# // under full load to be 1:1. Although processing requests replies
82
# // first is tempting it is suspectible to DoS attacks (overloading
83
# // the system with unsolicited replies).
85
# // Process a request.
86
if (items [0].revents & ZMQ_POLLIN):
87
# send in_prefix to side socket
88
rc = zmq_msg_copy(&side_msg, &in_msg)
89
rc = zmq_send (sidesocket_, &side_msg, ZMQ_SNDMORE)
90
if swap_ids:# both xrep, must send second identity first
91
# recv two ids into msg, id_msg
92
rc = zmq_recv (insocket_, &msg, 0)
93
rc = zmq_recv (insocket_, &id_msg, 0)
95
# send second id (id_msg) first
96
#!!!! always send a copy before the original !!!!
97
rc = zmq_msg_copy(&side_msg, &id_msg)
98
rc = zmq_send (outsocket_, &side_msg, ZMQ_SNDMORE)
99
rc = zmq_send (sidesocket_, &id_msg, ZMQ_SNDMORE)
100
# send first id (msg) second
101
rc = zmq_msg_copy(&side_msg, &msg)
102
rc = zmq_send (outsocket_, &side_msg, ZMQ_SNDMORE)
103
rc = zmq_send (sidesocket_, &msg, ZMQ_SNDMORE)
105
rc = zmq_recv (insocket_, &msg, 0)
108
moresz = sizeof (more)
109
rc = zmq_getsockopt (insocket_, ZMQ_RCVMORE, &more, &moresz)
112
rc = zmq_msg_copy(&side_msg, &msg)
114
rc = zmq_send (outsocket_, &side_msg, ZMQ_SNDMORE)
115
rc = zmq_send (sidesocket_, &msg,ZMQ_SNDMORE)
117
rc = zmq_send (outsocket_, &side_msg, 0)
118
rc = zmq_send (sidesocket_, &msg,0)
123
if (items [1].revents & ZMQ_POLLIN):
124
rc = zmq_msg_copy(&side_msg, &out_msg)
125
rc = zmq_send (sidesocket_, &side_msg, ZMQ_SNDMORE)
127
# recv two ids into msg, id_msg
128
rc = zmq_recv (outsocket_, &msg, 0)
129
rc = zmq_recv (outsocket_, &id_msg, 0)
131
# send second id (id_msg) first
132
rc = zmq_msg_copy(&side_msg, &id_msg)
133
rc = zmq_send (insocket_, &side_msg, ZMQ_SNDMORE)
134
rc = zmq_send (sidesocket_, &id_msg,ZMQ_SNDMORE)
136
# send first id (msg) second
137
rc = zmq_msg_copy(&side_msg, &msg)
138
rc = zmq_send (insocket_, &side_msg, ZMQ_SNDMORE)
139
rc = zmq_send (sidesocket_, &msg,ZMQ_SNDMORE)
141
rc = zmq_recv (outsocket_, &msg, 0)
144
moresz = sizeof (more)
145
rc = zmq_getsockopt (outsocket_, ZMQ_RCVMORE, &more, &moresz)
147
rc = zmq_msg_copy(&side_msg, &msg)
149
rc = zmq_send (insocket_, &side_msg,ZMQ_SNDMORE)
150
rc = zmq_send (sidesocket_, &msg,ZMQ_SNDMORE)
152
rc = zmq_send (insocket_, &side_msg,0)
153
rc = zmq_send (sidesocket_, &msg,0)
154
# errno_assert (rc == 0)