1
"""0MQ Socket pure Python methods."""
4
# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley
6
# This file is part of pyzmq.
8
# pyzmq is free software; you can redistribute it and/or modify it under
9
# the terms of the Lesser GNU General Public License as published by
10
# the Free Software Foundation; either version 3 of the License, or
11
# (at your option) any later version.
13
# pyzmq is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
# Lesser GNU General Public License for more details.
18
# You should have received a copy of the Lesser GNU General Public License
19
# along with this program. If not, see <http://www.gnu.org/licenses/>.
22
#-----------------------------------------------------------------------------
24
#-----------------------------------------------------------------------------
30
from zmq.core import constants
31
from zmq.core.constants import *
32
from zmq.core.error import ZMQError, ZMQBindError
33
from zmq.utils import jsonapi
34
from zmq.utils.strtypes import bytes,unicode,basestring
43
#-----------------------------------------------------------------------------
45
#-----------------------------------------------------------------------------
47
def setsockopt_string(self, option, optval, encoding='utf-8'):
48
"""s.setsockopt_string(option, optval, encoding='utf-8')
50
Set socket options with a unicode object it is simply a wrapper
51
for setsockopt to protect from encoding ambiguity.
53
See the 0MQ documentation for details on specific options.
58
The name of the option to set. Can be any of: SUBSCRIBE,
60
optval : unicode string (unicode on py2, str on py3)
61
The value of the option to set.
63
The encoding to be used, default is utf8
65
if not isinstance(optval, unicode):
66
raise TypeError("unicode strings only")
67
return self.setsockopt(option, optval.encode(encoding))
69
def setsockopt_string(self, option, optval, encoding='utf-8'):
70
"""s.setsockopt_string(option, optval, encoding='utf-8')
72
Set socket options with a unicode object it is simply a wrapper
73
for setsockopt to protect from encoding ambiguity.
75
See the 0MQ documentation for details on specific options.
80
The name of the option to set. Can be any of: SUBSCRIBE,
82
optval : unicode string (unicode on py2, str on py3)
83
The value of the option to set.
85
The encoding to be used, default is utf8
87
if not isinstance(optval, unicode):
88
raise TypeError("unicode strings only")
89
return self.setsockopt(option, optval.encode(encoding))
91
def getsockopt_string(self, option, encoding='utf-8'):
92
"""s.getsockopt_string(option, encoding='utf-8')
94
Get the value of a socket option.
96
See the 0MQ documentation for details on specific options.
101
The option to retrieve. Currently, IDENTITY is the only
102
gettable option that can return a string.
106
optval : unicode string (unicode on py2, str on py3)
107
The value of the option as a unicode string.
110
if option not in constants.bytes_sockopts:
111
raise TypeError("option %i will not return a string to be decoded"%option)
112
return self.getsockopt(option).decode(encoding)
114
def bind_to_random_port(self, addr, min_port=49152, max_port=65536, max_tries=100):
115
"""s.bind_to_random_port(addr, min_port=49152, max_port=65536, max_tries=100)
117
Bind this socket to a random port in a range.
122
The address string without the port to pass to ``Socket.bind()``.
123
min_port : int, optional
124
The minimum port in the range of ports to try (inclusive).
125
max_port : int, optional
126
The maximum port in the range of ports to try (exclusive).
127
max_tries : int, optional
128
The maximum number of bind attempts to make.
133
The port the socket was bound to.
138
if `max_tries` reached before successful bind
140
for i in range(max_tries):
142
port = random.randrange(min_port, max_port)
143
self.bind('%s:%s' % (addr, port))
148
raise ZMQBindError("Could not bind socket to random port.")
150
#-------------------------------------------------------------------------
151
# Sending and receiving messages
152
#-------------------------------------------------------------------------
154
def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
155
"""s.send_multipart(msg_parts, flags=0, copy=True, track=False)
157
Send a sequence of buffers as a multipart message.
162
A sequence of objects to send as a multipart message. Each element
163
can be any sendable object (Frame, bytes, buffer-providers)
164
flags : int, optional
165
SNDMORE is handled automatically for frames before the last.
166
copy : bool, optional
167
Should the frame(s) be sent in a copying or non-copying manner.
168
track : bool, optional
169
Should the frame(s) be tracked for notification that ZMQ has
170
finished with it (ignored if copy=True).
174
None : if copy or not track
175
MessageTracker : if track and not copy
176
a MessageTracker object, whose `pending` property will
177
be True until the last send is completed.
179
for msg in msg_parts[:-1]:
180
self.send(msg, SNDMORE|flags, copy=copy, track=track)
181
# Send the last part without the extra SNDMORE flag.
182
return self.send(msg_parts[-1], flags, copy=copy, track=track)
184
def recv_multipart(self, flags=0, copy=True, track=False):
185
"""s.recv_multipart(flags=0, copy=True, track=False)
187
Receive a multipart message as a list of bytes or Frame objects.
191
flags : int, optional
192
Any supported flag: NOBLOCK. If NOBLOCK is set, this method
193
will raise a ZMQError with EAGAIN if a message is not ready.
194
If NOBLOCK is not set, then this method will block until a
196
copy : bool, optional
197
Should the message frame(s) be received in a copying or non-copying manner?
198
If False a Frame object is returned for each part, if True a copy of
199
the bytes is made for each frame.
200
track : bool, optional
201
Should the message frame(s) be tracked for notification that ZMQ has
202
finished with it? (ignored if copy=True)
206
A list of frames in the multipart message; either Frames or bytes,
210
parts = [self.recv(flags, copy=copy, track=track)]
211
# have first part already, only loop while more to receive
212
while self.getsockopt(zmq.RCVMORE):
213
part = self.recv(flags, copy=copy, track=track)
218
def send_string(self, u, flags=0, copy=False, encoding='utf-8'):
219
"""s.send_string(u, flags=0, copy=False, encoding='utf-8')
221
Send a Python unicode string as a message with an encoding.
223
0MQ communicates with raw bytes, so you must encode/decode
224
text (unicode on py2, str on py3) around 0MQ.
228
u : Python unicode string (unicode on py2, str on py3)
229
The unicode string to send.
230
flags : int, optional
232
encoding : str [default: 'utf-8']
233
The encoding to be used
235
if not isinstance(u, basestring):
236
raise TypeError("unicode/str objects only")
237
return self.send(u.encode(encoding), flags=flags, copy=copy)
239
def recv_string(self, flags=0, encoding='utf-8'):
240
"""s.recv_string(flags=0, encoding='utf-8')
242
Receive a unicode string, as sent by send_string.
248
encoding : str [default: 'utf-8']
249
The encoding to be used
253
s : unicode string (unicode on py2, str on py3)
254
The Python unicode string that arrives as encoded bytes.
256
msg = self.recv(flags=flags, copy=False)
257
return codecs.decode(msg.bytes, encoding)
259
def send_pyobj(self, obj, flags=0, protocol=-1):
260
"""s.send_pyobj(obj, flags=0, protocol=-1)
262
Send a Python object as a message using pickle to serialize.
267
The Python object to send.
271
The pickle protocol number to use. Default of -1 will select
272
the highest supported number. Use 0 for multiple platform
275
msg = pickle.dumps(obj, protocol)
276
return self.send(msg, flags)
278
def recv_pyobj(self, flags=0):
279
"""s.recv_pyobj(flags=0)
281
Receive a Python object as a message using pickle to serialize.
291
The Python object that arrives as a message.
294
return pickle.loads(s)
296
def send_json(self, obj, flags=0):
297
"""s.send_json(obj, flags=0)
299
Send a Python object as a message using json to serialize.
304
The Python object to send.
308
if jsonapi.jsonmod is None:
309
raise ImportError('jsonlib{1,2}, json or simplejson library is required.')
311
msg = jsonapi.dumps(obj)
312
return self.send(msg, flags)
314
def recv_json(self, flags=0):
315
"""s.recv_json(flags=0)
317
Receive a Python object as a message using json to serialize.
327
The Python object that arrives as a message.
329
if jsonapi.jsonmod is None:
330
raise ImportError('jsonlib{1,2}, json or simplejson library is required.')
332
msg = self.recv(flags)
333
return jsonapi.loads(msg)
335
def poll(self, timeout=None, flags=POLLIN):
336
"""s.poll(timeout=None, flags=POLLIN)
338
Poll the socket for events. The default is to poll forever for incoming
339
events. Timeout is in milliseconds, if specified.
343
timeout : int [default: None]
344
The timeout (in milliseconds) to wait for an event. If unspecified
345
(or secified None), will wait forever for an event.
346
flags : bitfield (int) [default: POLLIN]
347
The event flags to poll for (any combination of POLLIN|POLLOUT).
348
The default is to check for incoming events (POLLIN).
352
events : bitfield (int)
353
The events that are ready and waiting. Will be 0 if no events were ready
354
by the time timeout was reached.
358
raise ZMQError(ENOTSUP)
361
p.register(self, flags)
362
evts = dict(p.poll(timeout))
363
# return 0 if no events, otherwise return event bitfield
364
return evts.get(self, 0)