45
46
The Socket instance for the outbound traffic.
49
result = zmq_device(device_type, isocket.handle, osocket.handle)
50
if ZMQ_VERSION_MAJOR >= 3:
51
rc = c_device(isocket.handle, osocket.handle)
53
rc = zmq_device(device_type, isocket.handle, osocket.handle)
58
# inner loop inlined, to prevent code duplication for up/downstream
59
cdef inline int _relay(void * insocket, void *outsocket, zmq_msg_t msg) nogil:
64
flagsz = sizeof (more)
68
rc = zmq_recvmsg(insocket, &msg, 0)
73
rc = zmq_getsockopt(insocket, ZMQ_RCVMORE, &more, &flagsz)
77
flags = flags | ZMQ_SNDMORE
79
# LABELs have been removed:
80
# rc = zmq_getsockopt(insocket, ZMQ_RCVLABEL, &label, &flagsz)
84
# flags = flags | ZMQ_SNDLABEL
86
rc = zmq_sendmsg(outsocket, &msg, flags)
95
# c_device copied (and cythonized) from zmq_device in zeromq release-2.1.6
97
cdef inline int c_device (void * insocket, void *outsocket) nogil:
98
if ZMQ_VERSION_MAJOR < 3:
102
cdef int rc = zmq_msg_init (&msg)
107
cdef zmq_pollitem_t items [2]
108
items [0].socket = insocket
110
items [0].events = ZMQ_POLLIN
111
items [0].revents = 0
112
items [1].socket = outsocket
114
items [1].events = ZMQ_POLLIN
115
items [1].revents = 0
119
# Wait while there are either requests or replies to process.
120
rc = zmq_poll (&items [0], 2, -1)
125
# The algorithm below asumes ratio of request and replies processed
126
# under full load to be 1:1. Although processing requests replies
127
# first is tempting it is suspectible to DoS attacks (overloading
128
# the system with unsolicited replies).
131
if (items [0].revents & ZMQ_POLLIN):
132
rc = _relay(insocket, outsocket, msg)
135
if (items [1].revents & ZMQ_POLLIN):
136
rc = _relay(outsocket, insocket, msg)
53
140
__all__ = ['device']