~ubuntu-branches/ubuntu/precise/pyzmq/precise

« back to all changes in this revision

Viewing changes to zmq/eventloop/zmqstream.py

  • Committer: Bazaar Package Importer
  • Author(s): Piotr Ożarowski
  • Date: 2011-02-15 09:08:36 UTC
  • mfrom: (2.1.2 experimental)
  • Revision ID: james.westby@ubuntu.com-20110215090836-phh4slym1g6muucn
Tags: 2.0.10.1-2
* Team upload.
* Upload to unstable
* Add Breaks: ${python:Breaks}

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/env python
2
1
#
3
2
# Copyright 2009 Facebook
4
3
#
16
15
 
17
16
"""A utility class to send to and recv from a non-blocking socket."""
18
17
 
19
 
# import errno
20
 
# import socket
 
18
from __future__ import with_statement
 
19
 
 
20
import sys
21
21
import logging
22
 
import time
 
22
 
23
23
import zmq
24
 
import ioloop
 
24
from zmq.core.socket import jsonapi, pickle
 
25
 
 
26
from zmq.eventloop import ioloop
 
27
from zmq.eventloop import stack_context
 
28
 
 
29
try:
 
30
    from queue import Queue
 
31
except ImportError:
 
32
    from Queue import Queue
 
33
 
 
34
from zmq.utils.strtypes import bytes, unicode, basestring
 
35
 
25
36
 
26
37
class ZMQStream(object):
27
38
    """A utility class to register callbacks when a zmq socket sends and receives
28
39
    
29
40
    For use with zmq.eventloop.ioloop
30
41
 
31
 
    There are 3 main methods:
32
 
    on_recv(callback):
 
42
    There are 4 main methods:
 
43
    on_recv(callback,copy=True):
33
44
        register a callback to be run every time the socket has something to receive
34
45
    on_send(callback):
35
46
        register a callback to be run every time you call send
36
47
    on_err(callback):
37
48
        register a callback to be run every time there is an error
38
 
    send(msg, callback=None)
 
49
    send(self, msg, flags=0, copy=False, callback=None):
39
50
        perform a send that will trigger the callback
40
51
        if callback is passed, on_send is also called
41
 
        There is also send_multipart()
 
52
        
 
53
        There are also send_multipart(), send_json, send_pyobj
42
54
    
43
55
    Two other methods for deactivating the callbacks:
44
56
    stop_on_recv():
47
59
        turn off the send callback
48
60
    stop_on_err():
49
61
        turn off the error callback
50
 
 
 
62
    
 
63
    All of which simply call on_<evt>(None).
 
64
    
 
65
    The entire socket interface, excluding direct recv methods, is also
 
66
    provided, primarily through direct-linking the methods.
 
67
    e.g.
 
68
    >>> stream.bind is stream.socket.bind
 
69
    True
 
70
    
51
71
    """
52
72
    
53
73
    socket = None
54
74
    io_loop = None
 
75
    poller = None
55
76
    
56
77
    def __init__(self, socket, io_loop=None):
57
78
        self.socket = socket
58
79
        self.io_loop = io_loop or ioloop.IOLoop.instance()
59
 
        self._tosend = None
60
 
        # self._recv_buffer = ""
61
 
        # self._send_buffer = ""
 
80
        self.poller = zmq.Poller()
 
81
        
 
82
        self._send_queue = Queue()
62
83
        self._recv_callback = None
63
84
        self._send_callback = None
64
85
        self._close_callback = None
65
86
        self._errback = None
 
87
        self._recv_copy = False
 
88
        self._flushed = False
 
89
        
66
90
        self._state = zmq.POLLERR
67
 
        self.io_loop.add_handler(self.socket, self._handle_events, self._state)
 
91
        with stack_context.NullContext():
 
92
            self.io_loop.add_handler(
 
93
                self.socket, self._handle_events, self._state)
68
94
        
69
95
        # shortcircuit some socket methods
70
96
        self.bind = self.socket.bind
 
97
        self.bind_to_random_port = self.socket.bind_to_random_port
71
98
        self.connect = self.socket.connect
 
99
        self.setsockopt = self.socket.setsockopt
 
100
        self.getsockopt = self.socket.getsockopt
 
101
        self.setsockopt_unicode = self.socket.setsockopt_unicode
 
102
        self.getsockopt_unicode = self.socket.getsockopt_unicode
 
103
    
72
104
    
73
105
    def stop_on_recv(self):
74
 
        """disable callback and automatic receiving"""
75
 
        self._recv_callback = None
76
 
        self._drop_io_state(zmq.POLLIN)
 
106
        """Disable callback and automatic receiving."""
 
107
        return self.on_recv(None)
77
108
    
78
109
    def stop_on_send(self):
79
 
        """disable callback on sending"""
80
 
        self._send_callback = None
81
 
        self._drop_io_state(zmq.POLLOUT)
 
110
        """Disable callback on sending."""
 
111
        return self.on_send(None)
82
112
    
83
113
    def stop_on_err(self):
84
 
        self._errback = None
85
 
        # self._drop_io_state(zmq.POLLOUT)
 
114
        """Disable callback on errors."""
 
115
        return self.on_err(None)
86
116
    
87
 
    def on_recv(self, callback):
88
 
        """register a callback to be called on each recv.
89
 
        callback must take exactly one argument, which will be a
90
 
        list, returned by socket.recv_multipart()."""
91
 
        # assert not self._recv_callback, "Already receiving"
92
 
        self._recv_callback = callback
93
 
        self._add_io_state(zmq.POLLIN)
 
117
    def on_recv(self, callback, copy=True):
 
118
        """Register a callback to be called when a message is ready to recv.
 
119
        There can be only one callback registered at a time, so each
 
120
        call to on_recv replaces previously registered callbacks.
 
121
        
 
122
        on_recv(None) disables recv event polling.
 
123
        
 
124
        Parameters
 
125
        ----------
 
126
        
 
127
        callback : callable
 
128
            callback must take exactly one argument, which will be a
 
129
            list, as returned by socket.recv_multipart()
 
130
            if callback is None, recv callbacks are disabled.
 
131
        copy : bool
 
132
            copy is passed directly to recv, so if copy is False,
 
133
            callback will receive Message objects. If copy is True,
 
134
            then callback will receive bytes/str objects.
 
135
        
 
136
        Returns : None
 
137
        """
 
138
        
 
139
        assert callback is None or callable(callback)
 
140
        self._recv_callback = stack_context.wrap(callback)
 
141
        self._recv_copy = copy
 
142
        if callback is None:
 
143
            self._drop_io_state(zmq.POLLIN)
 
144
        else:
 
145
            self._add_io_state(zmq.POLLIN)
94
146
    
95
147
    def on_send(self, callback):
96
 
        """register a callback to be called on each send
97
 
        with no arguments (?)
 
148
        """Register a callback to be called on each send
 
149
        There will be two arguments: the message being sent (always a list), 
 
150
        and the return result of socket.send_multipart(msg).
 
151
        
 
152
        Non-copying sends return a MessageTracker object whose
 
153
        `done` attribute will be True when the send is complete. 
 
154
        This allows users to track when an object is safe to write to
 
155
        again.
 
156
        
 
157
        The second argument will always be None if copy=True
 
158
        on the send.
 
159
        
 
160
        on_send(None) disables recv event polling.
 
161
        
 
162
        Parameters
 
163
        ----------
 
164
        
 
165
        callback : callable
 
166
            callback must take exactly two arguments, which will be
 
167
            There will be two arguments: the message being sent (always a list), 
 
168
            and the return result of socket.send_multipart(msg) - 
 
169
            MessageTracker or None.
 
170
            
 
171
            if callback is None, send callbacks are disabled.
98
172
        """
99
 
        self._add_io_state(zmq.POLLOUT)
100
 
        self._send_callback = callback
 
173
        self._send_callback = stack_context.wrap(callback)
101
174
        
102
175
    def on_err(self, callback):
103
 
        """register a callback to be called on each send
104
 
        with no arguments (?)
 
176
        """register a callback to be called on POLLERR events
 
177
        with no arguments.
 
178
        
 
179
        Parameters
 
180
        ----------
 
181
        
 
182
        callback : callable
 
183
            callback will be passed no arguments.
105
184
        """
106
 
        # self._add_io_state(zmq.POLLOUT)
107
 
        self._errback = callback
 
185
        self._errback = stack_context.wrap(callback)
108
186
        
109
187
                
110
 
    def send(self, msg, callback=None):
111
 
        """send a message, optionally also register
 
188
    def send(self, msg, flags=0, copy=False, callback=None):
 
189
        """Send a message, optionally also register a new callback for sends.
 
190
        See zmq.socket.send for details.
112
191
        """
113
 
        return self.send_multipart([msg], callback=callback)
 
192
        return self.send_multipart([msg], flags=flags, copy=copy, callback=callback)
114
193
 
115
 
    def send_multipart(self, msg, callback=None):
116
 
        """send a multipart message
 
194
    def send_multipart(self, msg, flags=0, copy=False, callback=None):
 
195
        """Send a multipart message, optionally also register a new callback for sends.
 
196
        See zmq.socket.send_multipart for details.
117
197
        """
118
198
        # self._check_closed()
119
 
        self._tosend = msg
 
199
        self._send_queue.put((msg, flags, copy))
120
200
        callback = callback or self._send_callback
121
201
        if callback is not None:
122
202
            self.on_send(callback)
123
203
        else:
124
 
            self.on_send(lambda : None)
 
204
            # noop callback
 
205
            self.on_send(lambda *args: None)
 
206
        self._add_io_state(zmq.POLLOUT)
 
207
    
 
208
    def send_unicode(self, u, flags=0, encoding='utf-8', callback=None):
 
209
        """Send a unicode message with an encoding.
 
210
        See zmq.socket.send_unicode for details.
 
211
        """
 
212
        if not isinstance(u, basestring):
 
213
            raise TypeError("unicode/str objects only")
 
214
        return self.send(u.encode(encoding), flags=flags, callback=callback)
 
215
 
 
216
    def send_json(self, obj, flags=0, callback=None):
 
217
        """Send json-serialized version of an object.
 
218
        See zmq.socket.send_json for details.
 
219
        """
 
220
        if jsonapi is None:
 
221
            raise ImportError('jsonlib{1,2}, json or simplejson library is required.')
 
222
        else:
 
223
            msg = jsonapi.dumps(obj)
 
224
            return self.send(msg, flags=flags, callback=callback)
 
225
 
 
226
    def send_pyobj(self, obj, flags=0, protocol=-1, callback=None):
 
227
        """Send a Python object as a message using pickle to serialize.
 
228
 
 
229
        See zmq.socket.send_json for details.
 
230
        """
 
231
        msg = pickle.dumps(obj, protocol)
 
232
        return self.send(msg, flags, callback=callback)
 
233
    
 
234
    def _finish_flush(self):
 
235
        """callback for unsetting _flushed flag."""
 
236
        self._flushed = False
 
237
    
 
238
    def flush(self, flag=zmq.POLLIN|zmq.POLLOUT, limit=None):
 
239
        """Flush pending messages.
 
240
 
 
241
        This method safely handles all pending incoming and/or outgoing messages,
 
242
        bypassing the inner loop, passing them to the registered callbacks.
 
243
 
 
244
        A limit can be specified, to prevent blocking under high load.
 
245
 
 
246
        :func:`flush` will return the first time ANY of these conditions are met:
 
247
            * No more events matching the flag are pending.
 
248
            * the total number of events handled reaches the limit.
 
249
 
 
250
        Note that if flag|POLLIN, recv events will be flushed even if no callback
 
251
        is registered, unlike normal IOLoop operation. This allows flush to be
 
252
        used to remove *and ignore* incoming messages.
 
253
 
 
254
        Parameters
 
255
        ----------
 
256
        flag : int, default=POLLIN|POLLOUT
 
257
                0MQ poll flags.
 
258
                If flag|POLLIN,  recv events will be flushed.
 
259
                If flag|POLLOUT, send events will be flushed.
 
260
                Both flags can be set at once, which is the default.
 
261
        limit : None or int, optional
 
262
                The maximum number of messages to send or receive.
 
263
                Both send and recv count against this limit.
 
264
 
 
265
        Returns
 
266
        -------
 
267
        int : count of events handled (both send and recv)
 
268
        """
 
269
        # unset self._flushed, so callbacks will execute, in case flush has
 
270
        # already been called this iteration
 
271
        already_flushed = self._flushed
 
272
        self._flushed = False
 
273
        # initialize counters
 
274
        count = 0
 
275
        def update_flag():
 
276
            return flag & zmq.POLLIN | (self.sending() and flag & zmq.POLLOUT)
 
277
        
 
278
        self.poller.register(self.socket, update_flag())
 
279
        events = self.poller.poll(0)
 
280
        while events and (not limit or count < limit):
 
281
            s,event = events[0]
 
282
            if event & zmq.POLLIN: # receiving
 
283
                self._handle_recv()
 
284
                count += 1
 
285
            if event & zmq.POLLOUT and self.sending():
 
286
                self._handle_send()
 
287
                count += 1
 
288
            self.poller.register(self.socket, update_flag())
 
289
            
 
290
            events = self.poller.poll(0)
 
291
        if count: # only bypass loop if we actually flushed something
 
292
            # skip send/recv callbacks this iteration
 
293
            self._flushed = True
 
294
            # reregister them at the end of the loop
 
295
            if not already_flushed: # don't need to do it again
 
296
                dc = ioloop.DelayedCallback(self._finish_flush, 0, self.io_loop)
 
297
                dc.start()
 
298
        elif already_flushed:
 
299
            self._flushed = True
 
300
        return count
125
301
    
126
302
    def set_close_callback(self, callback):
127
303
        """Call the given callback when the stream is closed."""
128
 
        self._close_callback = callback
129
 
 
 
304
        self._close_callback = stack_context.wrap(callback)
 
305
    
130
306
    def close(self):
131
307
        """Close this stream."""
132
308
        if self.socket is not None:
133
309
            self.io_loop.remove_handler(self.socket)
134
 
            self.socket.close()
 
310
            dc = ioloop.DelayedCallback(self.socket.close, 100, self.io_loop)
 
311
            dc.start()
135
312
            self.socket = None
136
313
            if self._close_callback:
137
314
                self._run_callback(self._close_callback)
138
315
 
139
316
    def receiving(self):
140
 
        """Returns true if we are currently receiving from the stream."""
 
317
        """Returns True if we are currently receiving from the stream."""
141
318
        return self._recv_callback is not None
142
319
 
143
320
    def sending(self):
144
 
        """Returns true if we are currently sending to the stream."""
145
 
        return self._tosend is not None
 
321
        """Returns True if we are currently sending to the stream."""
 
322
        return not self._send_queue.empty()
146
323
 
147
324
    def closed(self):
148
325
        return self.socket is None
149
326
 
150
327
    def _run_callback(self, callback, *args, **kwargs):
 
328
        """Wrap running callbacks in try/except to allow us to
 
329
        close our socket."""
151
330
        try:
152
 
            callback(*args, **kwargs)
 
331
            # Use a NullContext to ensure that all StackContexts are run
 
332
            # inside our blanket exception handler rather than outside.
 
333
            with stack_context.NullContext():
 
334
                callback(*args, **kwargs)
153
335
        except:
 
336
            logging.error("Uncaught exception, closing connection.",
 
337
                          exc_info=True)
154
338
            # Close the socket on an uncaught exception from a user callback
155
339
            # (It would eventually get closed when the socket object is
156
340
            # gc'd, but we don't want to rely on gc happening before we
167
351
        if not self.socket:
168
352
            logging.warning("Got events for closed stream %s", fd)
169
353
            return
170
 
        if events & zmq.POLLIN:
171
 
            self._handle_recv()
172
 
        if not self.socket:
173
 
            return
174
 
        if events & zmq.POLLOUT:
175
 
            self._handle_send()
176
 
        if not self.socket:
177
 
            return
178
 
        if events & zmq.POLLERR:
179
 
            self._handle_error()
180
 
            return
181
 
        state = zmq.POLLERR
182
 
        if self.receiving():
183
 
            state |= zmq.POLLIN
184
 
        if self._tosend is not None:
185
 
            state |= zmq.POLLOUT
186
 
        if state != self._state:
187
 
            self._state = state
188
 
            self.io_loop.update_handler(self.socket, self._state)
 
354
        try:
 
355
            # dispatch events:
 
356
            if events & zmq.POLLERR:
 
357
                self._handle_error()
 
358
                return
 
359
            if events & zmq.POLLIN:
 
360
                self._handle_recv()
 
361
                if not self.socket:
 
362
                    return
 
363
            if events & zmq.POLLOUT:
 
364
                self._handle_send()
 
365
                if not self.socket:
 
366
                    return
 
367
 
 
368
            # rebuild the poll state
 
369
            state = zmq.POLLERR
 
370
            if self.receiving():
 
371
                state |= zmq.POLLIN
 
372
            if self.sending():
 
373
                state |= zmq.POLLOUT
 
374
            if state != self._state:
 
375
                self._state = state
 
376
                self.io_loop.update_handler(self.socket, self._state)
 
377
        except:
 
378
            logging.error("Uncaught exception, closing connection.",
 
379
                          exc_info=True)
 
380
            self.close()
 
381
            raise
189
382
            
190
383
    def _handle_recv(self):
191
 
        # print "handling recv"
 
384
        """Handle a recv event."""
 
385
        if self._flushed:
 
386
            return
192
387
        try:
193
 
            msg = self.socket.recv_multipart()
 
388
            msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
194
389
        except zmq.ZMQError:
195
 
            logging.warning("RECV Error")
 
390
            e = sys.exc_info()[1]
 
391
            if e.errno == zmq.EAGAIN:
 
392
                # state changed since poll event
 
393
                pass
 
394
            else:
 
395
                logging.error("RECV Error: %s"%zmq.strerror(e.errno))
196
396
        else:
197
397
            if self._recv_callback:
198
398
                callback = self._recv_callback
203
403
        
204
404
 
205
405
    def _handle_send(self):
206
 
        # print "handling send"
207
 
        if not self._tosend:
208
 
            return
209
 
        self.socket.send_multipart(self._tosend)
210
 
        self._tosend = None
 
406
        """Handle a send event."""
 
407
        if self._flushed:
 
408
            return
 
409
        if not self.sending():
 
410
            logging.error("Shouldn't have handled a send event")
 
411
            return
 
412
        
 
413
        msg = self._send_queue.get()
 
414
        try:
 
415
            status = self.socket.send_multipart(*msg)
 
416
        except zmq.ZMQError:
 
417
            e = sys.exc_info()[1]
 
418
            status = e
211
419
        if self._send_callback:
212
420
            callback = self._send_callback
213
 
            self._run_callback(callback)
214
 
        
215
 
        # unregister from event loop:
216
 
        self._drop_io_state(zmq.POLLOUT)
 
421
            self._run_callback(callback, msg, status)
217
422
        
218
423
        # self.update_state()
219
424
    
220
425
    def _handle_error(self):
221
 
        # if evt & zmq.POLLERR:
222
 
        logging.warning("handling error..")
 
426
        """Handle a POLLERR event."""
 
427
        logging.error("handling error..")
223
428
        if self._errback is not None:
224
429
            self._errback()
225
430
        else:
226
431
            raise zmq.ZMQError()
227
432
 
228
 
 
229
 
        # raise zmq.ZMQError()
230
 
 
231
433
    def _check_closed(self):
232
434
        if not self.socket:
233
435
            raise IOError("Stream is closed")
234
436
 
235
437
    def _add_io_state(self, state):
 
438
        """Add io_state to poller."""
236
439
        if not self._state & state:
237
440
            self._state = self._state | state
238
441
            self.io_loop.update_handler(self.socket, self._state)
239
442
    
240
443
    def _drop_io_state(self, state):
 
444
        """Stop poller from watching an io_state."""
241
445
        if self._state & state:
242
446
            self._state = self._state & (~state)
243
447
            self.io_loop.update_handler(self.socket, self._state)