~ubuntu-branches/ubuntu/saucy/pyzmq/saucy-proposed

« back to all changes in this revision

Viewing changes to zmq/eventloop/zmqstream.py

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2013-05-12 13:59:20 UTC
  • mfrom: (1.2.2)
  • Revision ID: package-import@ubuntu.com-20130512135920-1d2md9w425iq3sb6
Tags: 13.1.0-1
* New upstream release built with zeromq3 (Closes: #698830)
  - drop all patches
* workaround-gevent.patch: workaround issue with gevent < 1.0
* noncopysend-test.patch: avoid uninitialized values in tests
* update copyright

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import logging
22
22
 
23
23
import zmq
24
 
from zmq.core.socket import jsonapi, pickle
 
24
from zmq.utils import jsonapi
 
25
 
 
26
try:
 
27
    import cPickle as pickle
 
28
except ImportError:
 
29
    import pickle
25
30
 
26
31
from zmq.eventloop.ioloop import IOLoop
27
 
from zmq.eventloop import stack_context
 
32
from zmq.eventloop.minitornado import stack_context
28
33
 
29
34
try:
30
35
    from queue import Queue
38
43
except NameError:
39
44
    callable = lambda obj: hasattr(obj, '__call__')
40
45
 
41
 
def maybe_threadsafe(method):
42
 
    """decorator for wrapping a method in IOLoop.add_callback for threadsafety
43
 
    
44
 
    use ZMQStream(..., threadsafe=True) to enable.
45
 
    """
46
 
    def ts_method(self, *args, **kwargs):
47
 
        if self.threadsafe:
48
 
            return self.io_loop.add_callback(lambda : method(self, *args, **kwargs))
49
 
        else:
50
 
            return method(self, *args, **kwargs)
51
 
    
52
 
    ts_method.__doc__ = method.__doc__
53
 
    
54
 
    return ts_method
55
46
 
56
47
class ZMQStream(object):
57
48
    """A utility class to register callbacks when a zmq socket sends and receives
58
49
    
59
50
    For use with zmq.eventloop.ioloop
60
51
 
61
 
    There are 4 main methods
 
52
    There are three main methods
62
53
    
63
54
    Methods:
64
55
    
93
84
    socket = None
94
85
    io_loop = None
95
86
    poller = None
96
 
    threadsafe = False
97
87
    
98
 
    def __init__(self, socket, io_loop=None, threadsafe=False):
 
88
    def __init__(self, socket, io_loop=None):
99
89
        self.socket = socket
100
90
        self.io_loop = io_loop or IOLoop.instance()
101
91
        self.poller = zmq.Poller()
102
 
        self.threadsafe = threadsafe
103
92
        
104
93
        self._send_queue = Queue()
105
94
        self._recv_callback = None
273
262
        return self.send(u.encode(encoding), flags=flags, callback=callback)
274
263
    
275
264
    send_unicode = send_string
276
 
 
277
 
    send_unicode = send_string
278
 
 
 
265
    
279
266
    def send_json(self, obj, flags=0, callback=None):
280
267
        """Send json-serialized version of an object.
281
268
        See zmq.socket.send_json for details.
486
473
        try:
487
474
            status = self.socket.send_multipart(msg, **kwargs)
488
475
        except zmq.ZMQError as e:
 
476
            logging.error("SEND Error: %s", e)
489
477
            status = e
490
478
        if self._send_callback:
491
479
            callback = self._send_callback
522
510
            self._state = self._state & (~state)
523
511
            self._update_handler(self._state)
524
512
    
525
 
    @maybe_threadsafe
526
513
    def _update_handler(self, state):
527
 
        """update IOLoop handler with state
528
 
        
529
 
        This is the only method
530
 
        threadsafe when self.threadsafe is True
531
 
        """
 
514
        """Update IOLoop handler with state."""
532
515
        if self.socket is None:
533
516
            return
534
517
        self.io_loop.update_handler(self.socket, state)
535
518
    
536
 
    @maybe_threadsafe
537
519
    def _init_io_state(self):
538
520
        """initialize the ioloop event handler"""
539
521
        with stack_context.NullContext():