2
1
# -*- coding: utf8 -*-
4
# Copyright (c) 2010 Brian E. Granger
6
# This file is part of pyzmq.
8
# pyzmq is free software; you can redistribute it and/or modify it under
9
# the terms of the Lesser GNU General Public License as published by
10
# the Free Software Foundation; either version 3 of the License, or
11
# (at your option) any later version.
13
# pyzmq is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
# Lesser GNU General Public License for more details.
18
# You should have received a copy of the Lesser GNU General Public License
19
# along with this program. If not, see <http://www.gnu.org/licenses/>.
2
#-----------------------------------------------------------------------------
3
# Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley
5
# This file is part of pyzmq
7
# Distributed under the terms of the New BSD License. The full license is in
8
# the file COPYING.BSD, distributed as part of this software.
9
#-----------------------------------------------------------------------------
22
11
#-----------------------------------------------------------------------------
58
51
self.assertEquals(p.recv_unicode, p.recv_unicode)
59
52
self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
60
53
self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
54
s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
61
55
self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
62
56
s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
63
s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
64
57
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
65
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.SUBSCRIBE)
58
self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)
66
60
st = s.getsockopt(zmq.IDENTITY)
67
61
self.assertEquals(st.decode('utf16'), s.getsockopt_unicode(zmq.IDENTITY, 'utf16'))
68
62
time.sleep(0.1) # wait for connection/subscription
71
65
self.assertEquals(topic, s.recv_unicode())
72
66
self.assertEquals(topic*2, s.recv_unicode(encoding='latin-1'))
68
def test_int_sockopts(self):
69
"test non-uint64 sockopts"
70
v = zmq.zmq_version_info()
72
raise SkipTest("only on libzmq >= 2.1")
79
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
80
p.setsockopt(zmq.LINGER, 0)
81
self.assertEquals(p.getsockopt(zmq.LINGER), 0)
82
p.setsockopt(zmq.LINGER, -1)
83
self.assertEquals(p.getsockopt(zmq.LINGER), -1)
84
self.assertEquals(p.getsockopt(hwm), default_hwm)
86
self.assertEquals(p.getsockopt(hwm), 11)
87
# p.setsockopt(zmq.EVENTS, zmq.POLLIN)
88
self.assertEquals(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
89
self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1)
90
self.assertEquals(p.getsockopt(zmq.TYPE), p.socket_type)
91
self.assertEquals(p.getsockopt(zmq.TYPE), zmq.PUB)
92
self.assertEquals(s.getsockopt(zmq.TYPE), s.socket_type)
93
self.assertEquals(s.getsockopt(zmq.TYPE), zmq.SUB)
95
# check for overflow / wrong type:
98
constants = zmq.core.constants
99
for name in constants.__all__:
100
value = getattr(constants, name)
101
if isinstance(value, int):
102
backref[value] = name
103
for opt in zmq.core.constants.int_sockopts+zmq.core.constants.int64_sockopts:
105
if sopt == 'ROUTER_BEHAVIOR' or 'TCP' in sopt:
106
# fail_unroutable is write-only
109
n = p.getsockopt(opt)
110
except zmq.ZMQError as e:
111
errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
114
errors.append("getsockopt(zmq.%s) returned a ridiculous value."
115
" It is probably the wrong type."%sopt)
117
self.fail('\n'.join([''] + errors))
119
def test_bad_sockopts(self):
120
"""Test that appropriate errors are raised on bad socket options"""
121
s = self.context.socket(zmq.PUB)
122
self.sockets.append(s)
123
s.setsockopt(zmq.LINGER, 0)
124
# unrecognized int sockopts pass through to libzmq, and should raise EINVAL
125
self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
126
self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
127
# but only int sockopts are allowed through this way, otherwise raise a TypeError
128
self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
129
# some sockopts are valid in general, but not on every socket:
130
self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
132
def test_sockopt_roundtrip(self):
133
"test set/getsockopt roundtrip."
134
p = self.context.socket(zmq.PUB)
135
self.sockets.append(p)
136
self.assertEquals(p.getsockopt(zmq.LINGER), -1)
137
p.setsockopt(zmq.LINGER, 11)
138
self.assertEquals(p.getsockopt(zmq.LINGER), 11)
141
"""test Socket.poll()"""
142
req, rep = self.create_bound_pair(zmq.REQ, zmq.REP)
143
# default flag is POLLIN, nobody has anything to recv:
144
self.assertEquals(req.poll(0), 0)
145
self.assertEquals(rep.poll(0), 0)
146
self.assertEquals(req.poll(0, zmq.POLLOUT), zmq.POLLOUT)
147
self.assertEquals(rep.poll(0, zmq.POLLOUT), 0)
148
self.assertEquals(req.poll(0, zmq.POLLOUT|zmq.POLLIN), zmq.POLLOUT)
149
self.assertEquals(rep.poll(0, zmq.POLLOUT), 0)
151
self.assertEquals(req.poll(0), 0)
152
self.assertEquals(rep.poll(1), zmq.POLLIN)
153
self.assertEquals(req.poll(0, zmq.POLLOUT), 0)
154
self.assertEquals(rep.poll(0, zmq.POLLOUT), 0)
155
self.assertEquals(req.poll(0, zmq.POLLOUT|zmq.POLLIN), 0)
156
self.assertEquals(rep.poll(0, zmq.POLLOUT), zmq.POLLIN)
74
158
def test_send_unicode(self):
75
159
"test sending unicode objects"
76
160
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
91
175
def test_tracker(self):
92
176
"test the MessageTracker object for tracking when zmq is done with a buffer"
93
177
addr = 'tcp://127.0.0.1'
94
a = self.context.socket(zmq.XREQ)
178
a = self.context.socket(zmq.PUB)
95
179
port = a.bind_to_random_port(addr)
98
181
iface = "%s:%i"%(addr,port)
99
a = self.context.socket(zmq.XREQ)
100
a.setsockopt(zmq.IDENTITY, "a".encode())
101
b = self.context.socket(zmq.XREP)
182
a = self.context.socket(zmq.PAIR)
183
# a.setsockopt(zmq.IDENTITY, b"a")
184
b = self.context.socket(zmq.PAIR)
102
185
self.sockets.extend([a,b])
105
p1 = a.send('something'.encode(), copy=False, track=True)
188
p1 = a.send(b'something', copy=False, track=True)
106
189
self.assertTrue(isinstance(p1, zmq.MessageTracker))
107
190
self.assertFalse(p1.done)
108
p2 = a.send_multipart(list(map(str.encode, ['something', 'else'])), copy=False, track=True)
191
p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
109
192
self.assert_(isinstance(p2, zmq.MessageTracker))
110
193
self.assertEquals(p2.done, False)
111
194
self.assertEquals(p1.done, False)
114
197
msg = b.recv_multipart()
115
198
self.assertEquals(p1.done, True)
116
self.assertEquals(msg, (list(map(str.encode, ['a', 'something']))))
199
self.assertEquals(msg, [b'something'])
117
200
msg = b.recv_multipart()
118
201
self.assertEquals(p2.done, True)
119
self.assertEquals(msg, list(map(str.encode, ['a', 'something', 'else'])))
120
m = zmq.Message("again".encode(), track=True)
121
self.assertEquals(m.done, False)
202
self.assertEquals(msg, [b'something', b'else'])
203
m = zmq.Frame(b"again", track=True)
204
self.assertEquals(m.tracker.done, False)
122
205
p1 = a.send(m, copy=False)
123
206
p2 = a.send(m, copy=False)
124
self.assertEquals(m.done, False)
207
self.assertEquals(m.tracker.done, False)
125
208
self.assertEquals(p1.done, False)
126
209
self.assertEquals(p2.done, False)
127
210
msg = b.recv_multipart()
128
self.assertEquals(m.done, False)
129
self.assertEquals(msg, list(map(str.encode, ['a', 'again'])))
211
self.assertEquals(m.tracker.done, False)
212
self.assertEquals(msg, [b'again'])
130
213
msg = b.recv_multipart()
131
self.assertEquals(m.done, False)
132
self.assertEquals(msg, list(map(str.encode, ['a', 'again'])))
214
self.assertEquals(m.tracker.done, False)
215
self.assertEquals(msg, [b'again'])
133
216
self.assertEquals(p1.done, False)
134
217
self.assertEquals(p2.done, False)
138
221
self.assertEquals(p1.done, True)
139
222
self.assertEquals(p2.done, True)
140
m = zmq.Message('something'.encode(), track=False)
223
m = zmq.Frame(b'something', track=False)
141
224
self.assertRaises(ValueError, a.send, m, copy=False, track=True)
144
227
def test_close(self):
145
s = self.context.socket(zmq.PUB)
229
s = ctx.socket(zmq.PUB)
147
self.assertRaises(zmq.ZMQError, s.bind, ''.encode())
148
self.assertRaises(zmq.ZMQError, s.connect, ''.encode())
149
self.assertRaises(zmq.ZMQError, s.setsockopt, zmq.SUBSCRIBE, ''.encode())
150
self.assertRaises(zmq.ZMQError, s.send, 'asdf'.encode())
231
self.assertRaises(zmq.ZMQError, s.bind, b'')
232
self.assertRaises(zmq.ZMQError, s.connect, b'')
233
self.assertRaises(zmq.ZMQError, s.setsockopt, zmq.SUBSCRIBE, b'')
234
self.assertRaises(zmq.ZMQError, s.send, b'asdf')
151
235
self.assertRaises(zmq.ZMQError, s.recv)
239
"""set setting/getting sockopts as attributes"""
240
s = self.context.socket(zmq.DEALER)
241
self.sockets.append(s)
244
self.assertEquals(linger, s.linger)
245
self.assertEquals(linger, s.getsockopt(zmq.LINGER))
246
self.assertEquals(s.fd, s.getsockopt(zmq.FD))
248
def test_bad_attr(self):
249
s = self.context.socket(zmq.DEALER)
250
self.sockets.append(s)
253
except AttributeError:
256
self.fail("bad setattr should have raised AttributeError")
259
except AttributeError:
262
self.fail("bad getattr should have raised AttributeError")
264
def test_subclass(self):
265
"""subclasses can assign attributes"""
267
def __init__(self, *a, **kw):
269
s = S(self.context, zmq.REP)
270
self.sockets.append(s)
271
self.assertEquals(s.a, -1)
273
self.assertEquals(s.a, 1)
275
self.assertEquals(a, 1)
277
def test_recv_multipart(self):
278
a,b = self.create_bound_pair()
284
self.assertEquals(b.recv_multipart(), [msg])
286
def test_close_after_destroy(self):
287
"""s.close() after ctx.destroy() should be fine"""
289
s = ctx.socket(zmq.REP)
291
# reaper is not instantaneous
294
self.assertTrue(s.closed)
297
a,b = self.create_bound_pair()
300
self.assertEquals(evt, 0)
301
evt = a.poll(50, zmq.POLLOUT)
302
self.assertEquals(evt, zmq.POLLOUT)
306
self.assertEquals(evt, zmq.POLLIN)
309
self.assertEquals(evt, 0)
310
self.assertEquals(msg2, msg)
312
def test_ipc_path_max_length(self):
313
"""IPC_PATH_MAX_LEN is a sensible value"""
314
if zmq.IPC_PATH_MAX_LEN == 0:
315
raise SkipTest("IPC_PATH_MAX_LEN undefined")
317
msg = "Surprising value for IPC_PATH_MAX_LEN: %s" % zmq.IPC_PATH_MAX_LEN
318
self.assertTrue(zmq.IPC_PATH_MAX_LEN > 30, msg)
319
self.assertTrue(zmq.IPC_PATH_MAX_LEN < 1025, msg)
321
def test_ipc_path_max_length_msg(self):
322
if zmq.IPC_PATH_MAX_LEN == 0:
323
raise SkipTest("IPC_PATH_MAX_LEN undefined")
325
s = self.context.socket(zmq.PUB)
326
self.sockets.append(s)
328
s.bind('ipc://{0}'.format('a' * (zmq.IPC_PATH_MAX_LEN + 1)))
329
except zmq.ZMQError as e:
330
self.assertTrue(str(zmq.IPC_PATH_MAX_LEN) in e.strerror)
336
class TestSocketGreen(GreenTest, TestSocket):
337
test_bad_attr = GreenTest.skip_green
338
test_close_after_destroy = GreenTest.skip_green
340
def test_timeout(self):
341
a,b = self.create_bound_pair()
342
g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
343
timeout = gevent.Timeout(0.1)
345
self.assertRaises(gevent.Timeout, b.recv)