~ubuntu-branches/ubuntu/wily/pyzmq/wily

« back to all changes in this revision

Viewing changes to zmq/tests/test_socket.py

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2013-02-24 19:23:15 UTC
  • mfrom: (1.2.1) (9 sid)
  • mto: This revision was merged to the branch mainline in revision 10.
  • Revision ID: package-import@ubuntu.com-20130224192315-qhmwp3m3ymk8r60d
Tags: 2.2.0.1-1
* New upstream release
* relicense debian packaging to LGPL-3
* update watch file to use github directly
  thanks to Bart Martens for the file
* add autopkgtests
* drop obsolete DM-Upload-Allowed
* bump standard to 3.9.4, no changes required

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/env python
2
1
# -*- coding: utf8 -*-
3
 
#
4
 
#    Copyright (c) 2010 Brian E. Granger
5
 
#
6
 
#    This file is part of pyzmq.
7
 
#
8
 
#    pyzmq is free software; you can redistribute it and/or modify it under
9
 
#    the terms of the Lesser GNU General Public License as published by
10
 
#    the Free Software Foundation; either version 3 of the License, or
11
 
#    (at your option) any later version.
12
 
#
13
 
#    pyzmq is distributed in the hope that it will be useful,
14
 
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 
#    Lesser GNU General Public License for more details.
17
 
#
18
 
#    You should have received a copy of the Lesser GNU General Public License
19
 
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
 
#
 
2
#-----------------------------------------------------------------------------
 
3
#  Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley
 
4
#
 
5
#  This file is part of pyzmq
 
6
#
 
7
#  Distributed under the terms of the New BSD License.  The full license is in
 
8
#  the file COPYING.BSD, distributed as part of this software.
 
9
#-----------------------------------------------------------------------------
21
10
 
22
11
#-----------------------------------------------------------------------------
23
12
# Imports
25
14
 
26
15
import sys
27
16
import time
 
17
import errno
28
18
 
29
19
import zmq
30
 
from zmq.tests import BaseZMQTestCase, SkipTest
 
20
from zmq.tests import BaseZMQTestCase, SkipTest, have_gevent, GreenTest
31
21
from zmq.utils.strtypes import bytes, unicode
 
22
 
32
23
try:
33
24
    from queue import Queue
34
25
except:
41
32
class TestSocket(BaseZMQTestCase):
42
33
 
43
34
    def test_create(self):
44
 
        s = self.context.socket(zmq.PUB)
 
35
        ctx = self.Context()
 
36
        s = ctx.socket(zmq.PUB)
45
37
        # Superluminal protocol not yet implemented
46
 
        self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.bind, 'ftl://')
47
 
        self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.connect, 'ftl://')
 
38
        self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.bind, 'ftl://a')
 
39
        self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.connect, 'ftl://a')
 
40
        self.assertRaisesErrno(zmq.EINVAL, s.bind, 'tcp://')
48
41
        s.close()
49
 
        # del ctx
 
42
        del ctx
50
43
    
51
44
    def test_unicode_sockopts(self):
52
45
        """test setting/getting sockopts with unicode strings"""
58
51
        self.assertEquals(p.recv_unicode, p.recv_unicode)
59
52
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
60
53
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
 
54
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
61
55
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
62
56
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
63
 
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
64
57
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
65
 
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.SUBSCRIBE)
 
58
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)
 
59
        
66
60
        st = s.getsockopt(zmq.IDENTITY)
67
61
        self.assertEquals(st.decode('utf16'), s.getsockopt_unicode(zmq.IDENTITY, 'utf16'))
68
62
        time.sleep(0.1) # wait for connection/subscription
71
65
        self.assertEquals(topic, s.recv_unicode())
72
66
        self.assertEquals(topic*2, s.recv_unicode(encoding='latin-1'))
73
67
    
 
68
    def test_int_sockopts(self):
 
69
        "test non-uint64 sockopts"
 
70
        v = zmq.zmq_version_info()
 
71
        if not v >= (2,1):
 
72
            raise SkipTest("only on libzmq >= 2.1")
 
73
        elif v < (3,0):
 
74
            hwm = zmq.HWM
 
75
            default_hwm = 0
 
76
        else:
 
77
            hwm = zmq.SNDHWM
 
78
            default_hwm = 1000
 
79
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
 
80
        p.setsockopt(zmq.LINGER, 0)
 
81
        self.assertEquals(p.getsockopt(zmq.LINGER), 0)
 
82
        p.setsockopt(zmq.LINGER, -1)
 
83
        self.assertEquals(p.getsockopt(zmq.LINGER), -1)
 
84
        self.assertEquals(p.getsockopt(hwm), default_hwm)
 
85
        p.setsockopt(hwm, 11)
 
86
        self.assertEquals(p.getsockopt(hwm), 11)
 
87
        # p.setsockopt(zmq.EVENTS, zmq.POLLIN)
 
88
        self.assertEquals(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
 
89
        self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1)
 
90
        self.assertEquals(p.getsockopt(zmq.TYPE), p.socket_type)
 
91
        self.assertEquals(p.getsockopt(zmq.TYPE), zmq.PUB)
 
92
        self.assertEquals(s.getsockopt(zmq.TYPE), s.socket_type)
 
93
        self.assertEquals(s.getsockopt(zmq.TYPE), zmq.SUB)
 
94
        
 
95
        # check for overflow / wrong type:
 
96
        errors = []
 
97
        backref = {}
 
98
        constants = zmq.core.constants
 
99
        for name in constants.__all__:
 
100
            value = getattr(constants, name)
 
101
            if isinstance(value, int):
 
102
                backref[value] = name
 
103
        for opt in zmq.core.constants.int_sockopts+zmq.core.constants.int64_sockopts:
 
104
            sopt = backref[opt]
 
105
            if sopt == 'ROUTER_BEHAVIOR' or 'TCP' in sopt:
 
106
                # fail_unroutable is write-only
 
107
                continue
 
108
            try:
 
109
                n = p.getsockopt(opt)
 
110
            except zmq.ZMQError as e:
 
111
                errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
 
112
            else:
 
113
                if n > 2**31:
 
114
                    errors.append("getsockopt(zmq.%s) returned a ridiculous value."
 
115
                                    " It is probably the wrong type."%sopt)
 
116
        if errors:
 
117
            self.fail('\n'.join([''] + errors))
 
118
    
 
119
    def test_bad_sockopts(self):
 
120
        """Test that appropriate errors are raised on bad socket options"""
 
121
        s = self.context.socket(zmq.PUB)
 
122
        self.sockets.append(s)
 
123
        s.setsockopt(zmq.LINGER, 0)
 
124
        # unrecognized int sockopts pass through to libzmq, and should raise EINVAL
 
125
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
 
126
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
 
127
        # but only int sockopts are allowed through this way, otherwise raise a TypeError
 
128
        self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
 
129
        # some sockopts are valid in general, but not on every socket:
 
130
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
 
131
    
 
132
    def test_sockopt_roundtrip(self):
 
133
        "test set/getsockopt roundtrip."
 
134
        p = self.context.socket(zmq.PUB)
 
135
        self.sockets.append(p)
 
136
        self.assertEquals(p.getsockopt(zmq.LINGER), -1)
 
137
        p.setsockopt(zmq.LINGER, 11)
 
138
        self.assertEquals(p.getsockopt(zmq.LINGER), 11)
 
139
    
 
140
    def test_poll(self):
 
141
        """test Socket.poll()"""
 
142
        req, rep = self.create_bound_pair(zmq.REQ, zmq.REP)
 
143
        # default flag is POLLIN, nobody has anything to recv:
 
144
        self.assertEquals(req.poll(0), 0)
 
145
        self.assertEquals(rep.poll(0), 0)
 
146
        self.assertEquals(req.poll(0, zmq.POLLOUT), zmq.POLLOUT)
 
147
        self.assertEquals(rep.poll(0, zmq.POLLOUT), 0)
 
148
        self.assertEquals(req.poll(0, zmq.POLLOUT|zmq.POLLIN), zmq.POLLOUT)
 
149
        self.assertEquals(rep.poll(0, zmq.POLLOUT), 0)
 
150
        req.send('hi')
 
151
        self.assertEquals(req.poll(0), 0)
 
152
        self.assertEquals(rep.poll(1), zmq.POLLIN)
 
153
        self.assertEquals(req.poll(0, zmq.POLLOUT), 0)
 
154
        self.assertEquals(rep.poll(0, zmq.POLLOUT), 0)
 
155
        self.assertEquals(req.poll(0, zmq.POLLOUT|zmq.POLLIN), 0)
 
156
        self.assertEquals(rep.poll(0, zmq.POLLOUT), zmq.POLLIN)
 
157
    
74
158
    def test_send_unicode(self):
75
159
        "test sending unicode objects"
76
160
        a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
91
175
    def test_tracker(self):
92
176
        "test the MessageTracker object for tracking when zmq is done with a buffer"
93
177
        addr = 'tcp://127.0.0.1'
94
 
        a = self.context.socket(zmq.XREQ)
 
178
        a = self.context.socket(zmq.PUB)
95
179
        port = a.bind_to_random_port(addr)
96
180
        a.close()
97
 
        del a 
98
181
        iface = "%s:%i"%(addr,port)
99
 
        a = self.context.socket(zmq.XREQ)
100
 
        a.setsockopt(zmq.IDENTITY, "a".encode())
101
 
        b = self.context.socket(zmq.XREP)
 
182
        a = self.context.socket(zmq.PAIR)
 
183
        # a.setsockopt(zmq.IDENTITY, b"a")
 
184
        b = self.context.socket(zmq.PAIR)
102
185
        self.sockets.extend([a,b])
103
186
        a.connect(iface)
104
187
        time.sleep(0.1)
105
 
        p1 = a.send('something'.encode(), copy=False, track=True)
 
188
        p1 = a.send(b'something', copy=False, track=True)
106
189
        self.assertTrue(isinstance(p1, zmq.MessageTracker))
107
190
        self.assertFalse(p1.done)
108
 
        p2 = a.send_multipart(list(map(str.encode, ['something', 'else'])), copy=False, track=True)
 
191
        p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
109
192
        self.assert_(isinstance(p2, zmq.MessageTracker))
110
193
        self.assertEquals(p2.done, False)
111
194
        self.assertEquals(p1.done, False)
113
196
        b.bind(iface)
114
197
        msg = b.recv_multipart()
115
198
        self.assertEquals(p1.done, True)
116
 
        self.assertEquals(msg, (list(map(str.encode, ['a', 'something']))))
 
199
        self.assertEquals(msg, [b'something'])
117
200
        msg = b.recv_multipart()
118
201
        self.assertEquals(p2.done, True)
119
 
        self.assertEquals(msg, list(map(str.encode, ['a', 'something', 'else'])))
120
 
        m = zmq.Message("again".encode(), track=True)
121
 
        self.assertEquals(m.done, False)
 
202
        self.assertEquals(msg, [b'something', b'else'])
 
203
        m = zmq.Frame(b"again", track=True)
 
204
        self.assertEquals(m.tracker.done, False)
122
205
        p1 = a.send(m, copy=False)
123
206
        p2 = a.send(m, copy=False)
124
 
        self.assertEquals(m.done, False)
 
207
        self.assertEquals(m.tracker.done, False)
125
208
        self.assertEquals(p1.done, False)
126
209
        self.assertEquals(p2.done, False)
127
210
        msg = b.recv_multipart()
128
 
        self.assertEquals(m.done, False)
129
 
        self.assertEquals(msg, list(map(str.encode, ['a', 'again'])))
 
211
        self.assertEquals(m.tracker.done, False)
 
212
        self.assertEquals(msg, [b'again'])
130
213
        msg = b.recv_multipart()
131
 
        self.assertEquals(m.done, False)
132
 
        self.assertEquals(msg, list(map(str.encode, ['a', 'again'])))
 
214
        self.assertEquals(m.tracker.done, False)
 
215
        self.assertEquals(msg, [b'again'])
133
216
        self.assertEquals(p1.done, False)
134
217
        self.assertEquals(p2.done, False)
135
218
        pm = m.tracker
137
220
        time.sleep(0.1)
138
221
        self.assertEquals(p1.done, True)
139
222
        self.assertEquals(p2.done, True)
140
 
        m = zmq.Message('something'.encode(), track=False)
 
223
        m = zmq.Frame(b'something', track=False)
141
224
        self.assertRaises(ValueError, a.send, m, copy=False, track=True)
142
225
        
143
226
 
144
227
    def test_close(self):
145
 
        s = self.context.socket(zmq.PUB)
 
228
        ctx = self.Context()
 
229
        s = ctx.socket(zmq.PUB)
146
230
        s.close()
147
 
        self.assertRaises(zmq.ZMQError, s.bind, ''.encode())
148
 
        self.assertRaises(zmq.ZMQError, s.connect, ''.encode())
149
 
        self.assertRaises(zmq.ZMQError, s.setsockopt, zmq.SUBSCRIBE, ''.encode())
150
 
        self.assertRaises(zmq.ZMQError, s.send, 'asdf'.encode())
 
231
        self.assertRaises(zmq.ZMQError, s.bind, b'')
 
232
        self.assertRaises(zmq.ZMQError, s.connect, b'')
 
233
        self.assertRaises(zmq.ZMQError, s.setsockopt, zmq.SUBSCRIBE, b'')
 
234
        self.assertRaises(zmq.ZMQError, s.send, b'asdf')
151
235
        self.assertRaises(zmq.ZMQError, s.recv)
152
 
    
 
236
        del ctx
 
237
    
 
238
    def test_attr(self):
 
239
        """set setting/getting sockopts as attributes"""
 
240
        s = self.context.socket(zmq.DEALER)
 
241
        self.sockets.append(s)
 
242
        linger = 10
 
243
        s.linger = linger
 
244
        self.assertEquals(linger, s.linger)
 
245
        self.assertEquals(linger, s.getsockopt(zmq.LINGER))
 
246
        self.assertEquals(s.fd, s.getsockopt(zmq.FD))
 
247
    
 
248
    def test_bad_attr(self):
 
249
        s = self.context.socket(zmq.DEALER)
 
250
        self.sockets.append(s)
 
251
        try:
 
252
            s.apple='foo'
 
253
        except AttributeError:
 
254
            pass
 
255
        else:
 
256
            self.fail("bad setattr should have raised AttributeError")
 
257
        try:
 
258
            s.apple
 
259
        except AttributeError:
 
260
            pass
 
261
        else:
 
262
            self.fail("bad getattr should have raised AttributeError")
 
263
 
 
264
    def test_subclass(self):
 
265
        """subclasses can assign attributes"""
 
266
        class S(zmq.Socket):
 
267
            def __init__(self, *a, **kw):
 
268
                self.a=-1
 
269
        s = S(self.context, zmq.REP)
 
270
        self.sockets.append(s)
 
271
        self.assertEquals(s.a, -1)
 
272
        s.a=1
 
273
        self.assertEquals(s.a, 1)
 
274
        a=s.a
 
275
        self.assertEquals(a, 1)
 
276
    
 
277
    def test_recv_multipart(self):
 
278
        a,b = self.create_bound_pair()
 
279
        msg = b'hi'
 
280
        for i in range(3):
 
281
            a.send(msg)
 
282
        time.sleep(0.1)
 
283
        for i in range(3):
 
284
            self.assertEquals(b.recv_multipart(), [msg])
 
285
    
 
286
    def test_close_after_destroy(self):
 
287
        """s.close() after ctx.destroy() should be fine"""
 
288
        ctx = self.Context()
 
289
        s = ctx.socket(zmq.REP)
 
290
        ctx.destroy()
 
291
        # reaper is not instantaneous
 
292
        time.sleep(1e-2)
 
293
        s.close()
 
294
        self.assertTrue(s.closed)
 
295
    
 
296
    def test_poll(self):
 
297
        a,b = self.create_bound_pair()
 
298
        tic = time.time()
 
299
        evt = a.poll(50)
 
300
        self.assertEquals(evt, 0)
 
301
        evt = a.poll(50, zmq.POLLOUT)
 
302
        self.assertEquals(evt, zmq.POLLOUT)
 
303
        msg = b'hi'
 
304
        a.send(msg)
 
305
        evt = b.poll(50)
 
306
        self.assertEquals(evt, zmq.POLLIN)
 
307
        msg2 = self.recv(b)
 
308
        evt = b.poll(50)
 
309
        self.assertEquals(evt, 0)
 
310
        self.assertEquals(msg2, msg)
 
311
    
 
312
    def test_ipc_path_max_length(self):
 
313
        """IPC_PATH_MAX_LEN is a sensible value"""
 
314
        if zmq.IPC_PATH_MAX_LEN == 0:
 
315
            raise SkipTest("IPC_PATH_MAX_LEN undefined")
 
316
        
 
317
        msg = "Surprising value for IPC_PATH_MAX_LEN: %s" % zmq.IPC_PATH_MAX_LEN
 
318
        self.assertTrue(zmq.IPC_PATH_MAX_LEN > 30, msg)
 
319
        self.assertTrue(zmq.IPC_PATH_MAX_LEN < 1025, msg)
 
320
 
 
321
    def test_ipc_path_max_length_msg(self):
 
322
        if zmq.IPC_PATH_MAX_LEN == 0:
 
323
            raise SkipTest("IPC_PATH_MAX_LEN undefined")
 
324
        
 
325
        s = self.context.socket(zmq.PUB)
 
326
        self.sockets.append(s)
 
327
        try:
 
328
            s.bind('ipc://{0}'.format('a' * (zmq.IPC_PATH_MAX_LEN + 1)))
 
329
        except zmq.ZMQError as e:
 
330
            self.assertTrue(str(zmq.IPC_PATH_MAX_LEN) in e.strerror)
 
331
 
 
332
 
 
333
if have_gevent:
 
334
    import gevent
 
335
    
 
336
    class TestSocketGreen(GreenTest, TestSocket):
 
337
        test_bad_attr = GreenTest.skip_green
 
338
        test_close_after_destroy = GreenTest.skip_green
 
339
        
 
340
        def test_timeout(self):
 
341
            a,b = self.create_bound_pair()
 
342
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
 
343
            timeout = gevent.Timeout(0.1)
 
344
            timeout.start()
 
345
            self.assertRaises(gevent.Timeout, b.recv)
 
346
            g.kill()
 
347
 
153
348