51
* **on_recv(callback,copy=True):**
51
* **on_recv(callback, copy=True):**
52
52
register a callback to be run every time the socket has something to receive
53
53
* **on_send(callback):**
54
54
register a callback to be run every time you call send
55
* **on_err(callback):**
56
register a callback to be run every time there is an error
57
55
* **send(self, msg, flags=0, copy=False, callback=None):**
58
56
perform a send that will trigger the callback
59
if callback is passed, on_send is also called
57
if callback is passed, on_send is also called.
61
59
There are also send_multipart(), send_json(), send_pyobj()
66
64
turn off the recv callback
67
65
* **stop_on_send():**
68
66
turn off the send callback
70
turn off the error callback
72
All of which simply call ``on_<evt>(None)``.
68
which simply call ``on_<evt>(None)``.
74
70
The entire socket interface, excluding direct recv methods, is also
75
71
provided, primarily through direct-linking the methods.
121
116
return self.on_send(None)
123
118
def stop_on_err(self):
124
"""Disable callback on errors."""
125
return self.on_err(None)
119
"""DEPRECATED, does nothing"""
120
logging.warn("on_err does nothing, and will be removed")
122
def on_err(self, callback):
123
"""DEPRECATED, does nothing"""
124
logging.warn("on_err does nothing, and will be removed")
127
126
def on_recv(self, callback, copy=True):
128
"""Register a callback to be called when a message is ready to recv.
127
"""Register a callback for when a message is ready to recv.
129
129
There can be only one callback registered at a time, so each
130
call to on_recv replaces previously registered callbacks.
130
call to `on_recv` replaces previously registered callbacks.
132
132
on_recv(None) disables recv event polling.
134
Use on_recv_stream(callback) instead, to register a callback that will receive
135
both this ZMQStream and the message, instead of just the message.
155
159
self._add_io_state(self.io_loop.READ)
161
def on_recv_stream(self, callback, copy=True):
162
"""Same as on_recv, but callback will get this stream as first argument
164
callback must take exactly two arguments, as it will be called as::
166
callback(stream, msg)
168
Useful when a single callback should be used with multiple streams.
173
self.on_recv(lambda msg: callback(self, msg), copy=copy)
157
175
def on_send(self, callback):
158
176
"""Register a callback to be called on each send
159
There will be two arguments: the message being sent (always a list),
160
and the return result of socket.send_multipart(msg).
178
There will be two arguments::
180
callback(msg, status)
182
* `msg` will be the list of sendable objects that was just sent
183
* `status` will be the return result of socket.send_multipart(msg) -
184
MessageTracker or None.
162
186
Non-copying sends return a MessageTracker object whose
163
`done` attribute will be True when the send is complete.
187
`done` attribute will be True when the send is complete.
164
188
This allows users to track when an object is safe to write to
167
191
The second argument will always be None if copy=True
194
Use on_send_stream(callback) to register a callback that will be passed
195
this ZMQStream as the first argument, in addition to the other two.
170
197
on_send(None) disables recv event polling.
175
202
callback : callable
176
203
callback must take exactly two arguments, which will be
177
There will be two arguments: the message being sent (always a list),
178
and the return result of socket.send_multipart(msg) -
204
the message being sent (always a list),
205
and the return result of socket.send_multipart(msg) -
179
206
MessageTracker or None.
181
208
if callback is None, send callbacks are disabled.
183
211
self._check_closed()
184
212
assert callback is None or callable(callback)
185
213
self._send_callback = stack_context.wrap(callback)
187
def on_err(self, callback):
188
"""register a callback to be called on POLLERR events
195
callback will be passed no arguments.
216
def on_send_stream(self, callback):
217
"""Same as on_send, but callback will get this stream as first argument
219
Callback will be passed three arguments::
221
callback(stream, msg, status)
223
Useful when a single callback should be used with multiple streams.
198
assert callback is None or callable(callback)
199
self._errback = stack_context.wrap(callback)
202
def send(self, msg, flags=0, copy=False, track=False, callback=None):
228
self.on_send(lambda msg, status: callback(stream, msg, status))
231
def send(self, msg, flags=0, copy=True, track=False, callback=None):
203
232
"""Send a message, optionally also register a new callback for sends.
204
233
See zmq.socket.send for details.
206
235
return self.send_multipart([msg], flags=flags, copy=copy, track=track, callback=callback)
208
def send_multipart(self, msg, flags=0, copy=False, track=False, prefix=None, callback=None):
237
def send_multipart(self, msg, flags=0, copy=True, track=False, callback=None):
209
238
"""Send a multipart message, optionally also register a new callback for sends.
210
239
See zmq.socket.send_multipart for details.
212
kwargs = dict(flags=flags, copy=copy, track=track, prefix=prefix)
241
kwargs = dict(flags=flags, copy=copy, track=track)
213
242
self._send_queue.put((msg, kwargs))
214
243
callback = callback or self._send_callback
215
244
if callback is not None: