27
27
cdef extern from "pyversion_compat.h":
30
from cpython cimport PyBytes_FromStringAndSize
31
30
from cpython cimport Py_DECREF, Py_INCREF
33
from buffers cimport asbuffer_r, frombuffer_r, viewfromobject_r
32
from buffers cimport asbuffer_r, viewfromobject_r
35
34
cdef extern from "Python.h":
36
35
ctypedef int Py_ssize_t
43
from queue import Queue, Empty
45
from Queue import Queue, Empty
43
from threading import _Event as Event
44
except (ImportError, AttributeError):
45
# python throws ImportError, cython throws AttributeError
46
from threading import Event
47
48
from zmq.core.error import ZMQError, NotDone
48
49
from zmq.utils.strtypes import bytes,unicode,basestring
55
56
cdef void free_python_msg(void *data, void *hint) with gil:
56
57
"""A function for DECREF'ing Python based messages."""
58
tracker_queue = (<tuple>hint)[1]
59
tracker_event = (<tuple>hint)[1]
59
60
Py_DECREF(<object>hint)
60
if isinstance(tracker_queue, Queue):
61
if isinstance(tracker_event, Event):
61
62
# don't assert before DECREF:
62
63
# assert tracker_queue.empty(), "somebody else wrote to my Queue!"
66
cdef inline object copy_zmq_msg_bytes(zmq_msg_t *zmq_msg):
67
""" Copy the data from a zmq_msg_t """
68
cdef char *data_c = NULL
69
cdef Py_ssize_t data_len_c
71
data_c = <char *>zmq_msg_data(zmq_msg)
72
data_len_c = zmq_msg_size(zmq_msg)
73
return PyBytes_FromStringAndSize(data_c, data_len_c)
76
67
cdef class MessageTracker(object):
77
68
"""MessageTracker(*towatch)
79
70
A class for tracking if 0MQ is done using one or more messages.
81
When you send a 0MQ mesage, it is not sent immeidately. The 0MQ IO thread
82
send the message at some later time. Often you want to know when 0MQ has
72
When you send a 0MQ mesage, it is not sent immediately. The 0MQ IO thread
73
sends the message at some later time. Often you want to know when 0MQ has
83
74
actually sent the message though. This is complicated by the fact that
84
a single 0MQ message can be sent multiple times using differen sockets.
75
a single 0MQ message can be sent multiple times using different sockets.
85
76
This class allows you to track all of the 0MQ usages of a message.
89
*towatch : tuple of Queue, MessageTracker, Message instances.
80
*towatch : tuple of Event, MessageTracker, Message instances.
90
81
This list of objects to track. This class can track the low-level
91
Queues used by the Message class, other MessageTrackers or
82
Events used by the Message class, other MessageTrackers or
95
86
def __init__(self, *towatch):
102
*towatch : tuple of Queue, MessageTracker, Message instances.
93
*towatch : tuple of Event, MessageTracker, Message instances.
103
94
This list of objects to track. This class can track the low-level
104
Queues used by the Message class, other MessageTrackers or
95
Events used by the Message class, other MessageTrackers or
108
99
self.peers = set()
109
100
for obj in towatch:
110
if isinstance(obj, Queue):
101
if isinstance(obj, Event):
112
103
elif isinstance(obj, MessageTracker):
113
104
self.peers.add(obj)
114
elif isinstance(obj, Message):
105
elif isinstance(obj, Frame):
115
106
if not obj.tracker:
116
107
raise ValueError("Not a tracked message")
117
108
self.peers.add(obj.tracker)
119
raise TypeError("Require Queues or Messages, not %s"%type(obj))
110
raise TypeError("Require Events or Message Frames, not %s"%type(obj))
123
"""Is 0MQ completely done with the messages being tracked."""
124
for queue in self.queues:
114
"""Is 0MQ completely done with the message(s) being tracked?"""
115
for evt in self.events:
127
118
for pm in self.peers:
132
123
def wait(self, timeout=-1):
133
124
"""mt.wait(timeout=-1)
135
Wait until 0MQ is completely done with the messages, then return.
126
Wait for 0MQ to be done with the message or until `timeout`.
130
timeout : float [default: -1, wait forever]
140
131
Maximum time in (s) to wait before raising NotDone.
144
Raises NotDone if `timeout` reached before I am done.
136
if done before `timeout`
141
if `timeout` reached before I am done.
146
143
tic = time.time()
147
144
if timeout is False or timeout < 0:
179
"""Message(data=None, track=False)
174
"""Frame(data=None, track=False)
181
A Message class for non-copy send/recvs.
176
A zmq message Frame class for non-copy send/recvs.
183
178
This class is only needed if you want to do non-copying send and recvs.
184
When you pass a string to this class, like ``Message(s)``, the
185
ref-count of s is increased by two: once because Message saves s as
179
When you pass a string to this class, like ``Frame(s)``, the
180
ref-count of `s` is increased by two: once because the Frame saves `s` as
186
181
an instance attribute and another because a ZMQ message is created that
187
points to the buffer of s. This second ref-count increase makes sure
188
that s lives until all messages that use it have been sent. Once 0MQ
182
points to the buffer of `s`. This second ref-count increase makes sure
183
that `s` lives until all messages that use it have been sent. Once 0MQ
189
184
sends all the messages and it doesn't need the buffer of s, 0MQ will call
195
190
data : object, optional
196
191
any object that provides the buffer interface will be used to
197
192
construct the 0MQ message data.
193
track : bool [default: False]
194
whether a MessageTracker_ should be created to track this object.
195
Tracking a message has a cost at creation, because it creates a threadsafe
200
200
def __cinit__(self, object data=None, track=False, **kwargs):
203
203
cdef Py_ssize_t data_len_c=0
206
209
# Save the data object in case the user wants the the data as a str.
207
210
self._data = data
208
211
self._failed_init = True # bool switch for dealloc
209
212
self._buffer = None # buffer view of data
210
213
self._bytes = None # bytes copy of data
212
# Queue and MessageTracker for monitoring when zmq is done with data:
215
# Event and MessageTracker for monitoring when zmq is done with data:
214
self.tracker_queue = Queue()
215
self.tracker = MessageTracker(self.tracker_queue)
218
self.tracker_event = evt
219
self.tracker = MessageTracker(evt)
217
self.tracker_queue = None
221
self.tracker_event = None
218
222
self.tracker = None
220
224
if isinstance(data, unicode):
231
235
asbuffer_r(data, <void **>&data_c, &data_len_c)
232
236
# We INCREF the *original* Python object (not self) and pass it
233
# as the hint below. This allows other copies of this Message
237
# as the hint below. This allows other copies of this Frame
234
238
# object to take over the ref counting of data properly.
235
hint = (data, self.tracker_queue)
239
hint = (data, self.tracker_event)
238
242
rc = zmq_msg_init_data(
253
257
if self._failed_init:
255
259
# This simply decreases the 0MQ ref-count of zmq_msg.
256
rc = zmq_msg_close(&self.zmq_msg)
261
rc = zmq_msg_close(&self.zmq_msg)
265
# buffer interface code adapted from petsc4py by Lisandro Dalcin, a BSD project
267
def __getbuffer__(self, Py_buffer* buffer, int flags):
268
# new-style (memoryview) buffer interface
270
buffer.buf = zmq_msg_data(&self.zmq_msg)
271
buffer.len = zmq_msg_size(&self.zmq_msg)
278
buffer.strides = NULL
279
buffer.suboffsets = NULL
281
buffer.internal = NULL
283
def __getsegcount__(self, Py_ssize_t *lenp):
284
# required for getreadbuffer
287
lenp[0] = zmq_msg_size(&self.zmq_msg)
290
def __getreadbuffer__(self, Py_ssize_t idx, void **p):
291
# old-style (buffer) interface
292
cdef char *data_c = NULL
293
cdef Py_ssize_t data_len_c
295
raise SystemError("accessing non-existent buffer segment")
296
# read-only, because we don't want to allow
297
# editing of the message in-place
299
data_c = <char *>zmq_msg_data(&self.zmq_msg)
300
data_len_c = zmq_msg_size(&self.zmq_msg)
305
# end buffer interface
260
307
def __copy__(self):
261
308
"""Create a shallow copy of the message.
263
This does not copy the contents of the Message, just the pointer.
310
This does not copy the contents of the Frame, just the pointer.
264
311
This will increment the 0MQ ref count of the message, but not
265
312
the ref count of the Python object. That is only done once when
266
313
the Python is first turned into a 0MQ message.
268
315
return self.fast_copy()
270
cdef Message fast_copy(self):
271
"""Fast, cdef'd version of shallow copy of the message."""
317
cdef Frame fast_copy(self):
318
"""Fast, cdef'd version of shallow copy of the Frame."""
274
321
# This does not copy the contents, but just increases the ref-count
275
322
# of the zmq_msg by one.
276
zmq_msg_copy(&new_msg.zmq_msg, &self.zmq_msg)
324
zmq_msg_copy(&new_msg.zmq_msg, &self.zmq_msg)
277
325
# Copy the ref to data so the copy won't create a copy when str is
279
327
if self._data is not None:
283
331
if self._bytes is not None:
284
332
new_msg._bytes = self._bytes
286
# Message copies share the tracker and tracker_queue
287
new_msg.tracker_queue = self.tracker_queue
334
# Frame copies share the tracker and tracker_event
335
new_msg.tracker_event = self.tracker_event
288
336
new_msg.tracker = self.tracker
292
340
def __len__(self):
293
341
"""Return the length of the message in bytes."""
294
return <int>zmq_msg_size(&self.zmq_msg)
344
sz = zmq_msg_size(&self.zmq_msg)
346
# return <int>zmq_msg_size(&self.zmq_msg)
296
348
def __str__(self):
297
349
"""Return the str form of the message."""
303
355
return b.decode()
309
"""Is 0MQ completely done with the message?"""
311
raise ValueError("Not a tracked message")
312
return self.tracker.done
314
def wait(self, timeout=-1):
315
"""m.wait(timeout=-1)
317
Wait for 0MQ to be done with the message, or until timeout.
322
Maximum time in (s) to wait before raising NotDone.
324
Raises NotDone if ``timeout`` reached before I am done.
327
raise ValueError("Not a tracked message")
328
return self.tracker.wait(timeout=timeout)
331
cdef object _getbuffer(self):
359
cdef inline object _getbuffer(self):
332
360
"""Create a Python buffer/view of the message data.
334
This will be called only once, the first time the ``buffer`` property
362
This will be called only once, the first time the `buffer` property
335
363
is accessed. Subsequent calls use a cached copy.
337
cdef char *data_c = NULL
338
cdef Py_ssize_t data_len_c
339
# read-only, because we don't want to allow
340
# editing of the message in-place
341
365
if self._data is None:
342
# return buffer on input object, to preserve refcounting
343
data_c = <char *>zmq_msg_data(&self.zmq_msg)
344
data_len_c = zmq_msg_size(&self.zmq_msg)
345
return frombuffer_r(data_c, data_len_c)
366
return viewfromobject_r(self)
347
368
return viewfromobject_r(self._data)