~ubuntu-branches/ubuntu/wily/pyzmq/wily

« back to all changes in this revision

Viewing changes to zmq/core/message.pyx

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2013-02-24 19:23:15 UTC
  • mfrom: (1.2.1) (9 sid)
  • mto: This revision was merged to the branch mainline in revision 10.
  • Revision ID: package-import@ubuntu.com-20130224192315-qhmwp3m3ymk8r60d
Tags: 2.2.0.1-1
* New upstream release
* relicense debian packaging to LGPL-3
* update watch file to use github directly
  thanks to Bart Martens for the file
* add autopkgtests
* drop obsolete DM-Upload-Allowed
* bump standard to 3.9.4, no changes required

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
"""0MQ Message related classes."""
2
2
 
3
3
#
4
 
#    Copyright (c) 2010 Brian E. Granger
 
4
#    Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley
5
5
#
6
6
#    This file is part of pyzmq.
7
7
#
27
27
cdef extern from "pyversion_compat.h":
28
28
    pass
29
29
 
30
 
from cpython cimport PyBytes_FromStringAndSize
31
30
from cpython cimport Py_DECREF, Py_INCREF
32
31
 
33
 
from buffers cimport asbuffer_r, frombuffer_r, viewfromobject_r
 
32
from buffers cimport asbuffer_r, viewfromobject_r
34
33
 
35
34
cdef extern from "Python.h":
36
35
    ctypedef int Py_ssize_t
37
36
 
38
 
from czmq cimport *
 
37
from libzmq cimport *
39
38
 
40
39
import time
41
40
 
42
 
try:    # 3.x
43
 
    from queue import Queue, Empty
44
 
except: # 2.x
45
 
    from Queue import Queue, Empty
 
41
try:
 
42
    # below 3.3
 
43
    from threading import _Event as Event
 
44
except (ImportError, AttributeError):
 
45
    # python throws ImportError, cython throws AttributeError
 
46
    from threading import Event
46
47
 
47
48
from zmq.core.error import ZMQError, NotDone
48
49
from zmq.utils.strtypes import bytes,unicode,basestring
55
56
cdef void free_python_msg(void *data, void *hint) with gil:
56
57
    """A function for DECREF'ing Python based messages."""
57
58
    if hint != NULL:
58
 
        tracker_queue = (<tuple>hint)[1]
 
59
        tracker_event = (<tuple>hint)[1]
59
60
        Py_DECREF(<object>hint)
60
 
        if isinstance(tracker_queue, Queue):
 
61
        if isinstance(tracker_event, Event):
61
62
            # don't assert before DECREF:
62
63
            # assert tracker_queue.empty(), "somebody else wrote to my Queue!"
63
 
            tracker_queue.put(0)
64
 
        tracker_queue = None
65
 
 
66
 
cdef inline object copy_zmq_msg_bytes(zmq_msg_t *zmq_msg):
67
 
    """ Copy the data from a zmq_msg_t """
68
 
    cdef char *data_c = NULL
69
 
    cdef Py_ssize_t data_len_c
70
 
    with nogil:
71
 
        data_c = <char *>zmq_msg_data(zmq_msg)
72
 
        data_len_c = zmq_msg_size(zmq_msg)
73
 
    return PyBytes_FromStringAndSize(data_c, data_len_c)
74
 
 
 
64
            tracker_event.set()
 
65
        tracker_event = None
75
66
 
76
67
cdef class MessageTracker(object):
77
68
    """MessageTracker(*towatch)
78
69
 
79
70
    A class for tracking if 0MQ is done using one or more messages.
80
71
 
81
 
    When you send a 0MQ mesage, it is not sent immeidately. The 0MQ IO thread
82
 
    send the message at some later time. Often you want to know when 0MQ has
 
72
    When you send a 0MQ mesage, it is not sent immediately. The 0MQ IO thread
 
73
    sends the message at some later time. Often you want to know when 0MQ has
83
74
    actually sent the message though. This is complicated by the fact that
84
 
    a single 0MQ message can be sent multiple times using differen sockets.
 
75
    a single 0MQ message can be sent multiple times using different sockets.
85
76
    This class allows you to track all of the 0MQ usages of a message.
86
77
 
87
78
    Parameters
88
79
    ----------
89
 
    *towatch : tuple of Queue, MessageTracker, Message instances.
 
80
    *towatch : tuple of Event, MessageTracker, Message instances.
90
81
        This list of objects to track. This class can track the low-level
91
 
        Queues used by the Message class, other MessageTrackers or
92
 
        actual Messsages.
 
82
        Events used by the Message class, other MessageTrackers or
 
83
        actual Messages.
93
84
    """
94
85
 
95
86
    def __init__(self, *towatch):
99
90
 
100
91
        Parameters
101
92
        ----------
102
 
        *towatch : tuple of Queue, MessageTracker, Message instances.
 
93
        *towatch : tuple of Event, MessageTracker, Message instances.
103
94
            This list of objects to track. This class can track the low-level
104
 
            Queues used by the Message class, other MessageTrackers or 
105
 
            actual Messsages.
 
95
            Events used by the Message class, other MessageTrackers or 
 
96
            actual Messages.
106
97
        """
107
 
        self.queues = set()
 
98
        self.events = set()
108
99
        self.peers = set()
109
100
        for obj in towatch:
110
 
            if isinstance(obj, Queue):
111
 
                self.queues.add(obj)
 
101
            if isinstance(obj, Event):
 
102
                self.events.add(obj)
112
103
            elif isinstance(obj, MessageTracker):
113
104
                self.peers.add(obj)
114
 
            elif isinstance(obj, Message):
 
105
            elif isinstance(obj, Frame):
115
106
                if not obj.tracker:
116
107
                    raise ValueError("Not a tracked message")
117
108
                self.peers.add(obj.tracker)
118
109
            else:
119
 
                raise TypeError("Require Queues or Messages, not %s"%type(obj))
 
110
                raise TypeError("Require Events or Message Frames, not %s"%type(obj))
120
111
    
121
112
    @property
122
113
    def done(self):
123
 
        """Is 0MQ completely done with the messages being tracked."""
124
 
        for queue in self.queues:
125
 
            if queue.empty():
 
114
        """Is 0MQ completely done with the message(s) being tracked?"""
 
115
        for evt in self.events:
 
116
            if not evt.is_set():
126
117
                return False
127
118
        for pm in self.peers:
128
119
            if not pm.done:
132
123
    def wait(self, timeout=-1):
133
124
        """mt.wait(timeout=-1)
134
125
 
135
 
        Wait until 0MQ is completely done with the messages, then return.
 
126
        Wait for 0MQ to be done with the message or until `timeout`.
136
127
 
137
128
        Parameters
138
129
        ----------
139
 
        timeout : int
 
130
        timeout : float [default: -1, wait forever]
140
131
            Maximum time in (s) to wait before raising NotDone.
141
132
 
142
133
        Returns
143
134
        -------
144
 
        Raises NotDone if `timeout` reached before I am done.
 
135
        None
 
136
            if done before `timeout`
 
137
        
 
138
        Raises
 
139
        ------
 
140
        NotDone
 
141
            if `timeout` reached before I am done.
145
142
        """
146
143
        tic = time.time()
147
144
        if timeout is False or timeout < 0:
149
146
        else:
150
147
            remaining = timeout
151
148
        done = False
152
 
        try:
153
 
            for queue in self.queues:
154
 
                if remaining < 0:
155
 
                    raise NotDone
156
 
                queue.get(timeout=remaining)
157
 
                queue.put(0)
158
 
                toc = time.time()
159
 
                remaining -= (toc-tic)
160
 
                tic = toc
161
 
        except Empty:
162
 
            raise NotDone
 
149
        for evt in self.events:
 
150
            if remaining < 0:
 
151
                raise NotDone
 
152
            evt.wait(timeout=remaining)
 
153
            if not evt.is_set():
 
154
                raise NotDone
 
155
            toc = time.time()
 
156
            remaining -= (toc-tic)
 
157
            tic = toc
163
158
        
164
159
        for peer in self.peers:
165
160
            if remaining < 0:
175
170
            time.sleep(.001)
176
171
 
177
172
 
178
 
cdef class Message:
179
 
    """Message(data=None, track=False)
 
173
cdef class Frame:
 
174
    """Frame(data=None, track=False)
180
175
 
181
 
    A Message class for non-copy send/recvs.
 
176
    A zmq message Frame class for non-copy send/recvs.
182
177
 
183
178
    This class is only needed if you want to do non-copying send and recvs.
184
 
    When you pass a string to this class, like ``Message(s)``, the 
185
 
    ref-count of s is increased by two: once because Message saves s as 
 
179
    When you pass a string to this class, like ``Frame(s)``, the 
 
180
    ref-count of `s` is increased by two: once because the Frame saves `s` as 
186
181
    an instance attribute and another because a ZMQ message is created that
187
 
    points to the buffer of s. This second ref-count increase makes sure
188
 
    that s lives until all messages that use it have been sent. Once 0MQ
 
182
    points to the buffer of `s`. This second ref-count increase makes sure
 
183
    that `s` lives until all messages that use it have been sent. Once 0MQ
189
184
    sends all the messages and it doesn't need the buffer of s, 0MQ will call
190
 
    Py_DECREF(s).
 
185
    ``Py_DECREF(s)``.
191
186
 
192
187
    Parameters
193
188
    ----------
195
190
    data : object, optional
196
191
        any object that provides the buffer interface will be used to
197
192
        construct the 0MQ message data.
 
193
    track : bool [default: False]
 
194
        whether a MessageTracker_ should be created to track this object.
 
195
        Tracking a message has a cost at creation, because it creates a threadsafe
 
196
        Event object.
 
197
    
198
198
    """
199
199
 
200
200
    def __cinit__(self, object data=None, track=False, **kwargs):
203
203
        cdef Py_ssize_t data_len_c=0
204
204
        cdef object hint
205
205
 
 
206
        # init more as False
 
207
        self.more = False
 
208
 
206
209
        # Save the data object in case the user wants the the data as a str.
207
210
        self._data = data
208
211
        self._failed_init = True  # bool switch for dealloc
209
212
        self._buffer = None       # buffer view of data
210
213
        self._bytes = None        # bytes copy of data
211
214
 
212
 
        # Queue and MessageTracker for monitoring when zmq is done with data:
 
215
        # Event and MessageTracker for monitoring when zmq is done with data:
213
216
        if track:
214
 
            self.tracker_queue = Queue()
215
 
            self.tracker = MessageTracker(self.tracker_queue)
 
217
            evt = Event()
 
218
            self.tracker_event = evt
 
219
            self.tracker = MessageTracker(evt)
216
220
        else:
217
 
            self.tracker_queue = None
 
221
            self.tracker_event = None
218
222
            self.tracker = None
219
223
 
220
224
        if isinstance(data, unicode):
230
234
        else:
231
235
            asbuffer_r(data, <void **>&data_c, &data_len_c)
232
236
        # We INCREF the *original* Python object (not self) and pass it
233
 
        # as the hint below. This allows other copies of this Message
 
237
        # as the hint below. This allows other copies of this Frame
234
238
        # object to take over the ref counting of data properly.
235
 
        hint = (data, self.tracker_queue)
 
239
        hint = (data, self.tracker_event)
236
240
        Py_INCREF(hint)
237
241
        with nogil:
238
242
            rc = zmq_msg_init_data(
253
257
        if self._failed_init:
254
258
            return
255
259
        # This simply decreases the 0MQ ref-count of zmq_msg.
256
 
        rc = zmq_msg_close(&self.zmq_msg)
 
260
        with nogil:
 
261
            rc = zmq_msg_close(&self.zmq_msg)
257
262
        if rc != 0:
258
263
            raise ZMQError()
259
 
 
 
264
    
 
265
    # buffer interface code adapted from petsc4py by Lisandro Dalcin, a BSD project
 
266
    
 
267
    def __getbuffer__(self, Py_buffer* buffer, int flags):
 
268
        # new-style (memoryview) buffer interface
 
269
        with nogil:
 
270
            buffer.buf = zmq_msg_data(&self.zmq_msg)
 
271
            buffer.len = zmq_msg_size(&self.zmq_msg)
 
272
        
 
273
        buffer.obj = self
 
274
        buffer.readonly = 1
 
275
        buffer.format = "B"
 
276
        buffer.ndim = 0
 
277
        buffer.shape = NULL
 
278
        buffer.strides = NULL
 
279
        buffer.suboffsets = NULL
 
280
        buffer.itemsize = 1
 
281
        buffer.internal = NULL
 
282
    
 
283
    def __getsegcount__(self, Py_ssize_t *lenp):
 
284
        # required for getreadbuffer
 
285
        if lenp != NULL:
 
286
            with nogil:
 
287
                lenp[0] = zmq_msg_size(&self.zmq_msg)
 
288
        return 1
 
289
    
 
290
    def __getreadbuffer__(self, Py_ssize_t idx, void **p):
 
291
        # old-style (buffer) interface
 
292
        cdef char *data_c = NULL
 
293
        cdef Py_ssize_t data_len_c
 
294
        if idx != 0:
 
295
            raise SystemError("accessing non-existent buffer segment")
 
296
        # read-only, because we don't want to allow
 
297
        # editing of the message in-place
 
298
        with nogil:
 
299
            data_c = <char *>zmq_msg_data(&self.zmq_msg)
 
300
            data_len_c = zmq_msg_size(&self.zmq_msg)
 
301
        if p != NULL:
 
302
            p[0] = <void*>data_c
 
303
        return data_len_c
 
304
    
 
305
    # end buffer interface
 
306
    
260
307
    def __copy__(self):
261
308
        """Create a shallow copy of the message.
262
309
 
263
 
        This does not copy the contents of the Message, just the pointer.
 
310
        This does not copy the contents of the Frame, just the pointer.
264
311
        This will increment the 0MQ ref count of the message, but not
265
312
        the ref count of the Python object. That is only done once when
266
313
        the Python is first turned into a 0MQ message.
267
314
        """
268
315
        return self.fast_copy()
269
316
 
270
 
    cdef Message fast_copy(self):
271
 
        """Fast, cdef'd version of shallow copy of the message."""
272
 
        cdef Message new_msg
273
 
        new_msg = Message()
 
317
    cdef Frame fast_copy(self):
 
318
        """Fast, cdef'd version of shallow copy of the Frame."""
 
319
        cdef Frame new_msg
 
320
        new_msg = Frame()
274
321
        # This does not copy the contents, but just increases the ref-count 
275
322
        # of the zmq_msg by one.
276
 
        zmq_msg_copy(&new_msg.zmq_msg, &self.zmq_msg)
 
323
        with nogil:
 
324
            zmq_msg_copy(&new_msg.zmq_msg, &self.zmq_msg)
277
325
        # Copy the ref to data so the copy won't create a copy when str is
278
326
        # called.
279
327
        if self._data is not None:
283
331
        if self._bytes is not None:
284
332
            new_msg._bytes = self._bytes
285
333
 
286
 
        # Message copies share the tracker and tracker_queue
287
 
        new_msg.tracker_queue = self.tracker_queue
 
334
        # Frame copies share the tracker and tracker_event
 
335
        new_msg.tracker_event = self.tracker_event
288
336
        new_msg.tracker = self.tracker
289
337
 
290
338
        return new_msg
291
339
 
292
340
    def __len__(self):
293
341
        """Return the length of the message in bytes."""
294
 
        return <int>zmq_msg_size(&self.zmq_msg)
 
342
        cdef int sz
 
343
        with nogil:
 
344
            sz = zmq_msg_size(&self.zmq_msg)
 
345
        return sz
 
346
        # return <int>zmq_msg_size(&self.zmq_msg)
295
347
 
296
348
    def __str__(self):
297
349
        """Return the str form of the message."""
303
355
            return b.decode()
304
356
        else:
305
357
            return b
306
 
    
307
 
    @property
308
 
    def done(self):
309
 
        """Is 0MQ completely done with the message?"""
310
 
        if not self.tracker:
311
 
            raise ValueError("Not a tracked message")
312
 
        return self.tracker.done
313
 
    
314
 
    def wait(self, timeout=-1):
315
 
        """m.wait(timeout=-1)
316
 
 
317
 
        Wait for 0MQ to be done with the message, or until timeout.
318
 
        
319
 
        Parameters
320
 
        ----------
321
 
        timeout : int
322
 
            Maximum time in (s) to wait before raising NotDone.
323
 
        
324
 
        Raises NotDone if ``timeout`` reached before I am done.
325
 
        """
326
 
        if not self.tracker:
327
 
            raise ValueError("Not a tracked message")
328
 
        return self.tracker.wait(timeout=timeout)
329
 
 
330
 
    
331
 
    cdef object _getbuffer(self):
 
358
 
 
359
    cdef inline object _getbuffer(self):
332
360
        """Create a Python buffer/view of the message data.
333
361
 
334
 
        This will be called only once, the first time the ``buffer`` property
 
362
        This will be called only once, the first time the `buffer` property
335
363
        is accessed. Subsequent calls use a cached copy.
336
364
        """
337
 
        cdef char *data_c = NULL
338
 
        cdef Py_ssize_t data_len_c
339
 
        # read-only, because we don't want to allow
340
 
        # editing of the message in-place
341
365
        if self._data is None:
342
 
            # return buffer on input object, to preserve refcounting
343
 
            data_c = <char *>zmq_msg_data(&self.zmq_msg)
344
 
            data_len_c = zmq_msg_size(&self.zmq_msg)
345
 
            return frombuffer_r(data_c, data_len_c)
 
366
            return viewfromobject_r(self)
346
367
        else:
347
368
            return viewfromobject_r(self._data)
348
369
    
365
386
            self._bytes = copy_zmq_msg_bytes(&self.zmq_msg)
366
387
        return self._bytes
367
388
 
 
389
# legacy Message name
 
390
Message = Frame
368
391
 
369
 
__all__ = ['MessageTracker', 'Message']
 
392
__all__ = ['MessageTracker', 'Frame', 'Message']