47
59
turn off the send callback
49
61
turn off the error callback
63
All of which simply call on_<evt>(None).
65
The entire socket interface, excluding direct recv methods, is also
66
provided, primarily through direct-linking the methods.
68
>>> stream.bind is stream.socket.bind
56
77
def __init__(self, socket, io_loop=None):
57
78
self.socket = socket
58
79
self.io_loop = io_loop or ioloop.IOLoop.instance()
60
# self._recv_buffer = ""
61
# self._send_buffer = ""
80
self.poller = zmq.Poller()
82
self._send_queue = Queue()
62
83
self._recv_callback = None
63
84
self._send_callback = None
64
85
self._close_callback = None
65
86
self._errback = None
87
self._recv_copy = False
66
90
self._state = zmq.POLLERR
67
self.io_loop.add_handler(self.socket, self._handle_events, self._state)
91
with stack_context.NullContext():
92
self.io_loop.add_handler(
93
self.socket, self._handle_events, self._state)
69
95
# shortcircuit some socket methods
70
96
self.bind = self.socket.bind
97
self.bind_to_random_port = self.socket.bind_to_random_port
71
98
self.connect = self.socket.connect
99
self.setsockopt = self.socket.setsockopt
100
self.getsockopt = self.socket.getsockopt
101
self.setsockopt_unicode = self.socket.setsockopt_unicode
102
self.getsockopt_unicode = self.socket.getsockopt_unicode
73
105
def stop_on_recv(self):
74
"""disable callback and automatic receiving"""
75
self._recv_callback = None
76
self._drop_io_state(zmq.POLLIN)
106
"""Disable callback and automatic receiving."""
107
return self.on_recv(None)
78
109
def stop_on_send(self):
79
"""disable callback on sending"""
80
self._send_callback = None
81
self._drop_io_state(zmq.POLLOUT)
110
"""Disable callback on sending."""
111
return self.on_send(None)
83
113
def stop_on_err(self):
85
# self._drop_io_state(zmq.POLLOUT)
114
"""Disable callback on errors."""
115
return self.on_err(None)
87
def on_recv(self, callback):
88
"""register a callback to be called on each recv.
89
callback must take exactly one argument, which will be a
90
list, returned by socket.recv_multipart()."""
91
# assert not self._recv_callback, "Already receiving"
92
self._recv_callback = callback
93
self._add_io_state(zmq.POLLIN)
117
def on_recv(self, callback, copy=True):
118
"""Register a callback to be called when a message is ready to recv.
119
There can be only one callback registered at a time, so each
120
call to on_recv replaces previously registered callbacks.
122
on_recv(None) disables recv event polling.
128
callback must take exactly one argument, which will be a
129
list, as returned by socket.recv_multipart()
130
if callback is None, recv callbacks are disabled.
132
copy is passed directly to recv, so if copy is False,
133
callback will receive Message objects. If copy is True,
134
then callback will receive bytes/str objects.
139
assert callback is None or callable(callback)
140
self._recv_callback = stack_context.wrap(callback)
141
self._recv_copy = copy
143
self._drop_io_state(zmq.POLLIN)
145
self._add_io_state(zmq.POLLIN)
95
147
def on_send(self, callback):
96
"""register a callback to be called on each send
148
"""Register a callback to be called on each send
149
There will be two arguments: the message being sent (always a list),
150
and the return result of socket.send_multipart(msg).
152
Non-copying sends return a MessageTracker object whose
153
`done` attribute will be True when the send is complete.
154
This allows users to track when an object is safe to write to
157
The second argument will always be None if copy=True
160
on_send(None) disables recv event polling.
166
callback must take exactly two arguments, which will be
167
There will be two arguments: the message being sent (always a list),
168
and the return result of socket.send_multipart(msg) -
169
MessageTracker or None.
171
if callback is None, send callbacks are disabled.
99
self._add_io_state(zmq.POLLOUT)
100
self._send_callback = callback
173
self._send_callback = stack_context.wrap(callback)
102
175
def on_err(self, callback):
103
"""register a callback to be called on each send
104
with no arguments (?)
176
"""register a callback to be called on POLLERR events
183
callback will be passed no arguments.
106
# self._add_io_state(zmq.POLLOUT)
107
self._errback = callback
185
self._errback = stack_context.wrap(callback)
110
def send(self, msg, callback=None):
111
"""send a message, optionally also register
188
def send(self, msg, flags=0, copy=False, callback=None):
189
"""Send a message, optionally also register a new callback for sends.
190
See zmq.socket.send for details.
113
return self.send_multipart([msg], callback=callback)
192
return self.send_multipart([msg], flags=flags, copy=copy, callback=callback)
115
def send_multipart(self, msg, callback=None):
116
"""send a multipart message
194
def send_multipart(self, msg, flags=0, copy=False, callback=None):
195
"""Send a multipart message, optionally also register a new callback for sends.
196
See zmq.socket.send_multipart for details.
118
198
# self._check_closed()
199
self._send_queue.put((msg, flags, copy))
120
200
callback = callback or self._send_callback
121
201
if callback is not None:
122
202
self.on_send(callback)
124
self.on_send(lambda : None)
205
self.on_send(lambda *args: None)
206
self._add_io_state(zmq.POLLOUT)
208
def send_unicode(self, u, flags=0, encoding='utf-8', callback=None):
209
"""Send a unicode message with an encoding.
210
See zmq.socket.send_unicode for details.
212
if not isinstance(u, basestring):
213
raise TypeError("unicode/str objects only")
214
return self.send(u.encode(encoding), flags=flags, callback=callback)
216
def send_json(self, obj, flags=0, callback=None):
217
"""Send json-serialized version of an object.
218
See zmq.socket.send_json for details.
221
raise ImportError('jsonlib{1,2}, json or simplejson library is required.')
223
msg = jsonapi.dumps(obj)
224
return self.send(msg, flags=flags, callback=callback)
226
def send_pyobj(self, obj, flags=0, protocol=-1, callback=None):
227
"""Send a Python object as a message using pickle to serialize.
229
See zmq.socket.send_json for details.
231
msg = pickle.dumps(obj, protocol)
232
return self.send(msg, flags, callback=callback)
234
def _finish_flush(self):
235
"""callback for unsetting _flushed flag."""
236
self._flushed = False
238
def flush(self, flag=zmq.POLLIN|zmq.POLLOUT, limit=None):
239
"""Flush pending messages.
241
This method safely handles all pending incoming and/or outgoing messages,
242
bypassing the inner loop, passing them to the registered callbacks.
244
A limit can be specified, to prevent blocking under high load.
246
:func:`flush` will return the first time ANY of these conditions are met:
247
* No more events matching the flag are pending.
248
* the total number of events handled reaches the limit.
250
Note that if flag|POLLIN, recv events will be flushed even if no callback
251
is registered, unlike normal IOLoop operation. This allows flush to be
252
used to remove *and ignore* incoming messages.
256
flag : int, default=POLLIN|POLLOUT
258
If flag|POLLIN, recv events will be flushed.
259
If flag|POLLOUT, send events will be flushed.
260
Both flags can be set at once, which is the default.
261
limit : None or int, optional
262
The maximum number of messages to send or receive.
263
Both send and recv count against this limit.
267
int : count of events handled (both send and recv)
269
# unset self._flushed, so callbacks will execute, in case flush has
270
# already been called this iteration
271
already_flushed = self._flushed
272
self._flushed = False
273
# initialize counters
276
return flag & zmq.POLLIN | (self.sending() and flag & zmq.POLLOUT)
278
self.poller.register(self.socket, update_flag())
279
events = self.poller.poll(0)
280
while events and (not limit or count < limit):
282
if event & zmq.POLLIN: # receiving
285
if event & zmq.POLLOUT and self.sending():
288
self.poller.register(self.socket, update_flag())
290
events = self.poller.poll(0)
291
if count: # only bypass loop if we actually flushed something
292
# skip send/recv callbacks this iteration
294
# reregister them at the end of the loop
295
if not already_flushed: # don't need to do it again
296
dc = ioloop.DelayedCallback(self._finish_flush, 0, self.io_loop)
298
elif already_flushed:
126
302
def set_close_callback(self, callback):
127
303
"""Call the given callback when the stream is closed."""
128
self._close_callback = callback
304
self._close_callback = stack_context.wrap(callback)
131
307
"""Close this stream."""
132
308
if self.socket is not None:
133
309
self.io_loop.remove_handler(self.socket)
310
dc = ioloop.DelayedCallback(self.socket.close, 100, self.io_loop)
135
312
self.socket = None
136
313
if self._close_callback:
137
314
self._run_callback(self._close_callback)
139
316
def receiving(self):
140
"""Returns true if we are currently receiving from the stream."""
317
"""Returns True if we are currently receiving from the stream."""
141
318
return self._recv_callback is not None
143
320
def sending(self):
144
"""Returns true if we are currently sending to the stream."""
145
return self._tosend is not None
321
"""Returns True if we are currently sending to the stream."""
322
return not self._send_queue.empty()
147
324
def closed(self):
148
325
return self.socket is None
150
327
def _run_callback(self, callback, *args, **kwargs):
328
"""Wrap running callbacks in try/except to allow us to
152
callback(*args, **kwargs)
331
# Use a NullContext to ensure that all StackContexts are run
332
# inside our blanket exception handler rather than outside.
333
with stack_context.NullContext():
334
callback(*args, **kwargs)
336
logging.error("Uncaught exception, closing connection.",
154
338
# Close the socket on an uncaught exception from a user callback
155
339
# (It would eventually get closed when the socket object is
156
340
# gc'd, but we don't want to rely on gc happening before we