~ubuntu-branches/ubuntu/precise/pyzmq/precise

« back to all changes in this revision

Viewing changes to zmq/tests/test_socket.py

  • Committer: Package Import Robot
  • Author(s): Debian Python Modules Team
  • Date: 2011-09-23 00:16:39 UTC
  • mfrom: (1.1.4 upstream)
  • Revision ID: package-import@ubuntu.com-20110923001639-girjqodpb7uv17yu
Tags: 2.1.9-1
* New upstream version
  - should build on kFreeBSD without patches (Closes: #637777).
* Build-depend on zeromq 2.1.9

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/env python
2
2
# -*- coding: utf8 -*-
3
3
#
4
 
#    Copyright (c) 2010 Brian E. Granger
 
4
#    Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley
5
5
#
6
6
#    This file is part of pyzmq.
7
7
#
59
59
        self.assertEquals(s.send_unicode, s.send_unicode)
60
60
        self.assertEquals(p.recv_unicode, p.recv_unicode)
61
61
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
62
 
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
 
62
        if zmq.zmq_version() < '4.0.0':
 
63
            self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
 
64
            s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
63
65
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
64
66
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
65
 
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
66
67
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
67
68
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.SUBSCRIBE)
 
69
        if zmq.zmq_version() >= '4.0.0':
 
70
            # skip the rest on 4.0, because IDENTITY was removed
 
71
            return
68
72
        st = s.getsockopt(zmq.IDENTITY)
69
73
        self.assertEquals(st.decode('utf16'), s.getsockopt_unicode(zmq.IDENTITY, 'utf16'))
70
74
        time.sleep(0.1) # wait for connection/subscription
77
81
        "test non-uint64 sockopts"
78
82
        v = zmq.zmq_version()
79
83
        if not v >= '2.1':
80
 
            raise SkipTest
 
84
            raise SkipTest("only on libzmq >= 2.1")
81
85
        elif v < '3.0':
82
86
            hwm = zmq.HWM
83
87
        else:
99
103
        self.assertEquals(s.getsockopt(zmq.TYPE), zmq.SUB)
100
104
        
101
105
        # check for overflow / wrong type:
 
106
        errors = []
 
107
        backref = {}
 
108
        constants = zmq.core.constants
 
109
        for name in constants.__all__:
 
110
            value = getattr(constants, name)
 
111
            if isinstance(value, int):
 
112
                backref[value] = name
102
113
        for opt in zmq.core.constants.int_sockopts+zmq.core.constants.int64_sockopts:
103
 
            n = p.getsockopt(opt)
104
 
            self.assertTrue(n < 2**31)
 
114
            sopt = backref[opt]
 
115
            try:
 
116
                n = p.getsockopt(opt)
 
117
            except zmq.ZMQError:
 
118
                e = sys.exc_info()[1]
 
119
                errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
 
120
            else:
 
121
                if n > 2**31:
 
122
                    errors.append("getsockopt(zmq.%s) returned a ridiculous value."
 
123
                                    " It is probably the wrong type."%sopt)
 
124
        if errors:
 
125
            self.fail('\n'.join(errors))
105
126
    
106
127
    def test_sockopt_roundtrip(self):
107
128
        "test set/getsockopt roundtrip."
131
152
    def test_tracker(self):
132
153
        "test the MessageTracker object for tracking when zmq is done with a buffer"
133
154
        addr = 'tcp://127.0.0.1'
134
 
        a = self.context.socket(zmq.XREQ)
 
155
        a = self.context.socket(zmq.PUB)
135
156
        port = a.bind_to_random_port(addr)
136
157
        a.close()
137
158
        iface = "%s:%i"%(addr,port)
138
 
        a = self.context.socket(zmq.XREQ)
139
 
        a.setsockopt(zmq.IDENTITY, asbytes("a"))
140
 
        b = self.context.socket(zmq.XREP)
 
159
        a = self.context.socket(zmq.PAIR)
 
160
        # a.setsockopt(zmq.IDENTITY, asbytes("a"))
 
161
        b = self.context.socket(zmq.PAIR)
141
162
        self.sockets.extend([a,b])
142
163
        a.connect(iface)
143
164
        time.sleep(0.1)
152
173
        b.bind(iface)
153
174
        msg = b.recv_multipart()
154
175
        self.assertEquals(p1.done, True)
155
 
        self.assertEquals(msg, (list(map(asbytes, ['a', 'something']))))
 
176
        self.assertEquals(msg, (list(map(asbytes, ['something']))))
156
177
        msg = b.recv_multipart()
157
178
        self.assertEquals(p2.done, True)
158
 
        self.assertEquals(msg, list(map(asbytes, ['a', 'something', 'else'])))
 
179
        self.assertEquals(msg, list(map(asbytes, ['something', 'else'])))
159
180
        m = zmq.Message(asbytes("again"), track=True)
160
 
        self.assertEquals(m.done, False)
 
181
        self.assertEquals(m.tracker.done, False)
161
182
        p1 = a.send(m, copy=False)
162
183
        p2 = a.send(m, copy=False)
163
 
        self.assertEquals(m.done, False)
 
184
        self.assertEquals(m.tracker.done, False)
164
185
        self.assertEquals(p1.done, False)
165
186
        self.assertEquals(p2.done, False)
166
187
        msg = b.recv_multipart()
167
 
        self.assertEquals(m.done, False)
168
 
        self.assertEquals(msg, list(map(asbytes, ['a', 'again'])))
 
188
        self.assertEquals(m.tracker.done, False)
 
189
        self.assertEquals(msg, list(map(asbytes, ['again'])))
169
190
        msg = b.recv_multipart()
170
 
        self.assertEquals(m.done, False)
171
 
        self.assertEquals(msg, list(map(asbytes, ['a', 'again'])))
 
191
        self.assertEquals(m.tracker.done, False)
 
192
        self.assertEquals(msg, list(map(asbytes, ['again'])))
172
193
        self.assertEquals(p1.done, False)
173
194
        self.assertEquals(p2.done, False)
174
195
        pm = m.tracker
191
212
        self.assertRaises(zmq.ZMQError, s.recv)
192
213
        del ctx
193
214
    
 
215
    def test_attr(self):
 
216
        """set setting/getting sockopts as attributes"""
 
217
        s = self.context.socket(zmq.DEALER)
 
218
        self.sockets.append(s)
 
219
        linger = 10
 
220
        s.linger = linger
 
221
        self.assertEquals(linger, s.linger)
 
222
        self.assertEquals(linger, s.getsockopt(zmq.LINGER))
 
223
        self.assertEquals(s.fd, s.getsockopt(zmq.FD))
 
224
    
 
225
    def test_bad_attr(self):
 
226
        s = self.context.socket(zmq.DEALER)
 
227
        self.sockets.append(s)
 
228
        try:
 
229
            s.apple='foo'
 
230
        except AttributeError:
 
231
            pass
 
232
        else:
 
233
            self.fail("bad setattr should have raised AttributeError")
 
234
        try:
 
235
            s.apple
 
236
        except AttributeError:
 
237
            pass
 
238
        else:
 
239
            self.fail("bad getattr should have raised AttributeError")
 
240
 
 
241
    def test_subclass(self):
 
242
        """subclasses can assign attributes"""
 
243
        class S(zmq.Socket):
 
244
            def __init__(self, *a, **kw):
 
245
                self.a=-1
 
246
        s = S(self.context, zmq.REP)
 
247
        self.sockets.append(s)
 
248
        self.assertEquals(s.a, -1)
 
249
        s.a=1
 
250
        self.assertEquals(s.a, 1)
 
251
        a=s.a
 
252
        self.assertEquals(a, 1)
 
253
        
 
254
        
 
255
 
 
256
    
194
257