30
31
#-----------------------------------------------------------------------------
34
35
#-----------------------------------------------------------------------------
35
36
# MonitoredQueue C functions
36
37
#-----------------------------------------------------------------------------
39
cdef inline int _relay(void *insocket_, void *outsocket_, void *sidesocket_,
40
zmq_msg_t msg, zmq_msg_t side_msg, zmq_msg_t id_msg,
50
if ZMQ_VERSION_MAJOR < 3:
51
flagsz = sizeof (int64_t)
57
if swap_ids:# both router, must send second identity first
58
# recv two ids into msg, id_msg
59
rc = zmq_recvmsg (insocket_, &msg, 0)
60
rc = zmq_recvmsg (insocket_, &id_msg, 0)
62
# send second id (id_msg) first
63
#!!!! always send a copy before the original !!!!
64
rc = zmq_msg_copy(&side_msg, &id_msg)
65
rc = zmq_sendmsg (outsocket_, &side_msg, ZMQ_SNDMORE)
66
rc = zmq_sendmsg (sidesocket_, &id_msg, ZMQ_SNDMORE)
67
# send first id (msg) second
68
rc = zmq_msg_copy(&side_msg, &msg)
69
rc = zmq_sendmsg (outsocket_, &side_msg, ZMQ_SNDMORE)
70
rc = zmq_sendmsg (sidesocket_, &msg, ZMQ_SNDMORE)
74
rc = zmq_recvmsg (insocket_, &msg, 0)
76
rc = zmq_getsockopt (insocket_, ZMQ_RCVMORE, flag_ptr, &flagsz)
78
if ZMQ_VERSION_MAJOR < 3:
84
# LABEL has been removed:
85
# rc = zmq_getsockopt (insocket_, ZMQ_RCVLABEL, flag_ptr, &flagsz)
87
# flags |= ZMQ_SNDLABEL
90
rc = zmq_msg_copy(&side_msg, &msg)
92
rc = zmq_sendmsg (outsocket_, &side_msg, flags)
93
# only SNDMORE for side-socket
94
rc = zmq_sendmsg (sidesocket_, &msg, ZMQ_SNDMORE)
96
rc = zmq_sendmsg (outsocket_, &side_msg, 0)
97
rc = zmq_sendmsg (sidesocket_, &msg, 0)
39
101
# the MonitoredQueue C function, adapted from zmq::queue.cpp :
40
102
cdef inline int c_monitored_queue (void *insocket_, void *outsocket_,
41
void *sidesocket_, zmq_msg_t in_msg,
42
zmq_msg_t out_msg, int swap_ids) nogil:
103
void *sidesocket_, zmq_msg_t *in_msg_ptr,
104
zmq_msg_t *out_msg_ptr, int swap_ids) nogil:
43
105
"""The actual C function for a monitored queue device.
45
107
See ``monitored_queue()`` for details.
49
110
cdef zmq_msg_t msg
50
111
cdef int rc = zmq_msg_init (&msg)
51
112
cdef zmq_msg_t id_msg
85
145
# // Process a request.
86
146
if (items [0].revents & ZMQ_POLLIN):
87
147
# send in_prefix to side socket
88
rc = zmq_msg_copy(&side_msg, &in_msg)
89
rc = zmq_send (sidesocket_, &side_msg, ZMQ_SNDMORE)
90
if swap_ids:# both xrep, must send second identity first
91
# recv two ids into msg, id_msg
92
rc = zmq_recv (insocket_, &msg, 0)
93
rc = zmq_recv (insocket_, &id_msg, 0)
95
# send second id (id_msg) first
96
#!!!! always send a copy before the original !!!!
97
rc = zmq_msg_copy(&side_msg, &id_msg)
98
rc = zmq_send (outsocket_, &side_msg, ZMQ_SNDMORE)
99
rc = zmq_send (sidesocket_, &id_msg, ZMQ_SNDMORE)
100
# send first id (msg) second
101
rc = zmq_msg_copy(&side_msg, &msg)
102
rc = zmq_send (outsocket_, &side_msg, ZMQ_SNDMORE)
103
rc = zmq_send (sidesocket_, &msg, ZMQ_SNDMORE)
105
rc = zmq_recv (insocket_, &msg, 0)
108
moresz = sizeof (more)
109
rc = zmq_getsockopt (insocket_, ZMQ_RCVMORE, &more, &moresz)
112
rc = zmq_msg_copy(&side_msg, &msg)
114
rc = zmq_send (outsocket_, &side_msg, ZMQ_SNDMORE)
115
rc = zmq_send (sidesocket_, &msg,ZMQ_SNDMORE)
117
rc = zmq_send (outsocket_, &side_msg, 0)
118
rc = zmq_send (sidesocket_, &msg,0)
148
rc = zmq_msg_copy(&side_msg, in_msg_ptr)
149
rc = zmq_sendmsg (sidesocket_, &side_msg, ZMQ_SNDMORE)
152
# relay the rest of the message
153
rc = _relay(insocket_, outsocket_, sidesocket_, msg, side_msg, id_msg, swap_ids)
123
156
if (items [1].revents & ZMQ_POLLIN):
124
rc = zmq_msg_copy(&side_msg, &out_msg)
125
rc = zmq_send (sidesocket_, &side_msg, ZMQ_SNDMORE)
127
# recv two ids into msg, id_msg
128
rc = zmq_recv (outsocket_, &msg, 0)
129
rc = zmq_recv (outsocket_, &id_msg, 0)
131
# send second id (id_msg) first
132
rc = zmq_msg_copy(&side_msg, &id_msg)
133
rc = zmq_send (insocket_, &side_msg, ZMQ_SNDMORE)
134
rc = zmq_send (sidesocket_, &id_msg,ZMQ_SNDMORE)
136
# send first id (msg) second
137
rc = zmq_msg_copy(&side_msg, &msg)
138
rc = zmq_send (insocket_, &side_msg, ZMQ_SNDMORE)
139
rc = zmq_send (sidesocket_, &msg,ZMQ_SNDMORE)
141
rc = zmq_recv (outsocket_, &msg, 0)
144
moresz = sizeof (more)
145
rc = zmq_getsockopt (outsocket_, ZMQ_RCVMORE, &more, &moresz)
147
rc = zmq_msg_copy(&side_msg, &msg)
149
rc = zmq_send (insocket_, &side_msg,ZMQ_SNDMORE)
150
rc = zmq_send (sidesocket_, &msg,ZMQ_SNDMORE)
152
rc = zmq_send (insocket_, &side_msg,0)
153
rc = zmq_send (sidesocket_, &msg,0)
154
# errno_assert (rc == 0)
157
# send out_prefix to side socket
158
rc = zmq_msg_copy(&side_msg, out_msg_ptr)
159
rc = zmq_sendmsg (sidesocket_, &side_msg, ZMQ_SNDMORE)
162
# relay the rest of the message
163
rc = _relay(outsocket_, insocket_, sidesocket_, msg, side_msg, id_msg, swap_ids)