~ubuntu-branches/ubuntu/raring/pyzmq/raring-proposed

« back to all changes in this revision

Viewing changes to zmq/eventloop/zmqstream.py

  • Committer: Package Import Robot
  • Author(s): Julian Taylor, Julian Taylor, Bernd Zeimetz
  • Date: 2012-05-20 17:41:34 UTC
  • mfrom: (7.1.2 sid)
  • Revision ID: package-import@ubuntu.com-20120520174134-8e5i7svrebr0iaia
Tags: 2.2.0-1
[ Julian Taylor ]
* New upstream release
* update debian/copyright for relicensing of non core files to BSD-3-clause
* bump standard to 3.9.3, no changes required
* build with hardening flags

[ Bernd Zeimetz ]
* Set DM-Upload-Allowed: yes

Show diffs side-by-side

added added

removed removed

Lines of Context:
48
48
    
49
49
    Methods:
50
50
    
51
 
    * **on_recv(callback,copy=True):**
 
51
    * **on_recv(callback, copy=True):**
52
52
        register a callback to be run every time the socket has something to receive
53
53
    * **on_send(callback):**
54
54
        register a callback to be run every time you call send
55
 
    * **on_err(callback):**
56
 
        register a callback to be run every time there is an error
57
55
    * **send(self, msg, flags=0, copy=False, callback=None):**
58
56
        perform a send that will trigger the callback
59
 
        if callback is passed, on_send is also called
 
57
        if callback is passed, on_send is also called.
60
58
        
61
59
        There are also send_multipart(), send_json(), send_pyobj()
62
60
    
66
64
        turn off the recv callback
67
65
    * **stop_on_send():**
68
66
        turn off the send callback
69
 
    * **stop_on_err():**
70
 
        turn off the error callback
71
67
    
72
 
    All of which simply call ``on_<evt>(None)``.
 
68
    which simply call ``on_<evt>(None)``.
73
69
    
74
70
    The entire socket interface, excluding direct recv methods, is also
75
71
    provided, primarily through direct-linking the methods.
93
89
        self._recv_callback = None
94
90
        self._send_callback = None
95
91
        self._close_callback = None
96
 
        self._errback = None
97
92
        self._recv_copy = False
98
93
        self._flushed = False
99
94
        
121
116
        return self.on_send(None)
122
117
    
123
118
    def stop_on_err(self):
124
 
        """Disable callback on errors."""
125
 
        return self.on_err(None)
 
119
        """DEPRECATED, does nothing"""
 
120
        logging.warn("on_err does nothing, and will be removed")
 
121
    
 
122
    def on_err(self, callback):
 
123
        """DEPRECATED, does nothing"""
 
124
        logging.warn("on_err does nothing, and will be removed")
126
125
    
127
126
    def on_recv(self, callback, copy=True):
128
 
        """Register a callback to be called when a message is ready to recv.
 
127
        """Register a callback for when a message is ready to recv.
 
128
        
129
129
        There can be only one callback registered at a time, so each
130
 
        call to on_recv replaces previously registered callbacks.
 
130
        call to `on_recv` replaces previously registered callbacks.
131
131
        
132
132
        on_recv(None) disables recv event polling.
133
133
        
 
134
        Use on_recv_stream(callback) instead, to register a callback that will receive
 
135
        both this ZMQStream and the message, instead of just the message.
 
136
        
134
137
        Parameters
135
138
        ----------
136
139
        
145
148
        
146
149
        Returns : None
147
150
        """
 
151
        
148
152
        self._check_closed()
149
153
        assert callback is None or callable(callback)
150
154
        self._recv_callback = stack_context.wrap(callback)
154
158
        else:
155
159
            self._add_io_state(self.io_loop.READ)
156
160
    
 
161
    def on_recv_stream(self, callback, copy=True):
 
162
        """Same as on_recv, but callback will get this stream as first argument
 
163
        
 
164
        callback must take exactly two arguments, as it will be called as::
 
165
        
 
166
            callback(stream, msg)
 
167
        
 
168
        Useful when a single callback should be used with multiple streams.
 
169
        """
 
170
        if callback is None:
 
171
            self.stop_on_recv()
 
172
        else:
 
173
            self.on_recv(lambda msg: callback(self, msg), copy=copy)
 
174
    
157
175
    def on_send(self, callback):
158
176
        """Register a callback to be called on each send
159
 
        There will be two arguments: the message being sent (always a list), 
160
 
        and the return result of socket.send_multipart(msg).
 
177
        
 
178
        There will be two arguments::
 
179
        
 
180
            callback(msg, status)
 
181
        
 
182
        * `msg` will be the list of sendable objects that was just sent
 
183
        * `status` will be the return result of socket.send_multipart(msg) -
 
184
          MessageTracker or None.
161
185
        
162
186
        Non-copying sends return a MessageTracker object whose
163
 
        `done` attribute will be True when the send is complete. 
 
187
        `done` attribute will be True when the send is complete.
164
188
        This allows users to track when an object is safe to write to
165
189
        again.
166
190
        
167
191
        The second argument will always be None if copy=True
168
192
        on the send.
169
193
        
 
194
        Use on_send_stream(callback) to register a callback that will be passed
 
195
        this ZMQStream as the first argument, in addition to the other two.
 
196
        
170
197
        on_send(None) disables recv event polling.
171
198
        
172
199
        Parameters
174
201
        
175
202
        callback : callable
176
203
            callback must take exactly two arguments, which will be
177
 
            There will be two arguments: the message being sent (always a list), 
178
 
            and the return result of socket.send_multipart(msg) - 
 
204
            the message being sent (always a list),
 
205
            and the return result of socket.send_multipart(msg) -
179
206
            MessageTracker or None.
180
207
            
181
208
            if callback is None, send callbacks are disabled.
182
209
        """
 
210
        
183
211
        self._check_closed()
184
212
        assert callback is None or callable(callback)
185
213
        self._send_callback = stack_context.wrap(callback)
186
214
        
187
 
    def on_err(self, callback):
188
 
        """register a callback to be called on POLLERR events
189
 
        with no arguments.
190
 
        
191
 
        Parameters
192
 
        ----------
193
 
        
194
 
        callback : callable
195
 
            callback will be passed no arguments.
 
215
    
 
216
    def on_send_stream(self, callback):
 
217
        """Same as on_send, but callback will get this stream as first argument
 
218
        
 
219
        Callback will be passed three arguments::
 
220
        
 
221
            callback(stream, msg, status)
 
222
        
 
223
        Useful when a single callback should be used with multiple streams.
196
224
        """
197
 
        self._check_closed()
198
 
        assert callback is None or callable(callback)
199
 
        self._errback = stack_context.wrap(callback)
200
 
        
201
 
                
202
 
    def send(self, msg, flags=0, copy=False, track=False, callback=None):
 
225
        if callback is None:
 
226
            self.stop_on_send()
 
227
        else:
 
228
            self.on_send(lambda msg, status: callback(stream, msg, status))
 
229
        
 
230
        
 
231
    def send(self, msg, flags=0, copy=True, track=False, callback=None):
203
232
        """Send a message, optionally also register a new callback for sends.
204
233
        See zmq.socket.send for details.
205
234
        """
206
235
        return self.send_multipart([msg], flags=flags, copy=copy, track=track, callback=callback)
207
236
 
208
 
    def send_multipart(self, msg, flags=0, copy=False, track=False, prefix=None, callback=None):
 
237
    def send_multipart(self, msg, flags=0, copy=True, track=False, callback=None):
209
238
        """Send a multipart message, optionally also register a new callback for sends.
210
239
        See zmq.socket.send_multipart for details.
211
240
        """
212
 
        kwargs = dict(flags=flags, copy=copy, track=track, prefix=prefix)
 
241
        kwargs = dict(flags=flags, copy=copy, track=track)
213
242
        self._send_queue.put((msg, kwargs))
214
243
        callback = callback or self._send_callback
215
244
        if callback is not None:
385
414
        try:
386
415
            # dispatch events:
387
416
            if events & IOLoop.ERROR:
388
 
                self._handle_error()
 
417
                logging.error("got POLLERR event on ZMQStream, which doesn't make sense")
389
418
                return
390
419
            if events & IOLoop.READ:
391
420
                self._handle_recv()
410
439
            return
411
440
        try:
412
441
            msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
413
 
        except zmq.ZMQError:
414
 
            e = sys.exc_info()[1]
 
442
        except zmq.ZMQError as e:
415
443
            if e.errno == zmq.EAGAIN:
416
444
                # state changed since poll event
417
445
                pass
437
465
        msg, kwargs = self._send_queue.get()
438
466
        try:
439
467
            status = self.socket.send_multipart(msg, **kwargs)
440
 
        except zmq.ZMQError:
441
 
            e = sys.exc_info()[1]
 
468
        except zmq.ZMQError as e:
442
469
            status = e
443
470
        if self._send_callback:
444
471
            callback = self._send_callback
446
473
        
447
474
        # self.update_state()
448
475
    
449
 
    def _handle_error(self):
450
 
        """Handle a POLLERR event."""
451
 
        logging.error("handling error..")
452
 
        if self._errback is not None:
453
 
            self._errback()
454
 
        else:
455
 
            raise zmq.ZMQError()
456
 
 
457
476
    def _check_closed(self):
458
477
        if not self.socket:
459
478
            raise IOError("Stream is closed")