49
56
def test_str(self):
50
57
"""Test the str representations of the Messages."""
51
58
for i in range(16):
54
self.assertEquals(s, str(s))
55
self.assert_(s is str(s))
61
self.assertEquals(s, str(m).encode())
64
"""Test the Message.bytes property."""
69
self.assertEquals(s, m.bytes)
70
# check that it copies
71
self.assert_(b is not s)
72
# check that it copies only once
73
self.assert_(b is m.bytes)
75
def test_unicode(self):
76
"""Test the unicode representations of the Messages."""
78
self.assertRaises(TypeError, zmq.Message, s)
80
if str is not unicode:
84
m = zmq.Message(s.encode('utf8'))
85
self.assertEquals(s, unicode(m.bytes,'utf8'))
57
87
def test_len(self):
58
88
"""Test the len of the Messages."""
59
89
for i in range(16):
62
self.assertEquals(len(s), len(s))
92
self.assertEquals(len(s), len(m))
64
94
def test_lifecycle1(self):
65
95
"""Run through a ref counting cycle with a copy."""
66
100
for i in range(5, 16): # 32, 64,..., 65536
68
self.assertEquals(grc(s), 2)
103
self.assertEquals(grc(s), rc)
69
104
m = zmq.Message(s)
70
self.assertEquals(grc(s), 4)
106
self.assertEquals(grc(s), rc)
72
self.assertEquals(grc(s), 5)
73
self.assertEquals(s, str(m))
74
self.assertEquals(s, str(m2))
75
self.assert_(s is str(m))
76
self.assert_(s is str(m2))
109
self.assertEquals(grc(s), rc)
111
extra = int(isinstance(b,view))
112
# memoryview incs by 2
115
self.assertEquals(grc(s), rc)
117
self.assertEquals(s, str(m).encode())
118
self.assertEquals(s, str(m2).encode())
119
self.assertEquals(s, m.bytes)
120
# self.assert_(s is str(m))
121
# self.assert_(s is str(m2))
78
self.assertEquals(grc(s), 4)
124
self.assertEquals(grc(s), rc)
127
self.assertEquals(grc(s), rc)
80
self.assertEquals(grc(s), 2)
130
self.assertEquals(grc(s), rc)
131
self.assertEquals(rc, 2)
83
134
def test_lifecycle2(self):
84
135
"""Run through a different ref counting cycle with a copy."""
85
140
for i in range(5, 16): # 32, 64,..., 65536
87
self.assertEquals(grc(s), 2)
143
self.assertEquals(grc(s), rc)
88
144
m = zmq.Message(s)
89
self.assertEquals(grc(s), 4)
146
self.assertEquals(grc(s), rc)
91
self.assertEquals(grc(s), 5)
92
self.assertEquals(s, str(m))
93
self.assertEquals(s, str(m2))
94
self.assert_(s is str(m))
95
self.assert_(s is str(m2))
149
self.assertEquals(grc(s), rc)
151
extra = int(isinstance(b,view))
153
self.assertEquals(grc(s), rc)
154
self.assertEquals(s, str(m).encode())
155
self.assertEquals(s, str(m2).encode())
156
self.assertEquals(s, m2.bytes)
157
self.assertEquals(s, m.bytes)
158
# self.assert_(s is str(m))
159
# self.assert_(s is str(m2))
161
self.assertEquals(grc(s), rc)
97
self.assertEquals(grc(s), 4)
163
# m.buffer is kept until m is del'd
166
self.assertEquals(grc(s), rc)
99
self.assertEquals(grc(s), 2)
169
self.assertEquals(grc(s), rc)
170
self.assertEquals(rc, 2)
173
def test_tracker(self):
174
m = zmq.Message('asdf'.encode(), track=True)
175
self.assertFalse(m.done)
176
pm = zmq.MessageTracker(m)
177
self.assertFalse(pm.done)
179
self.assertTrue(pm.done)
181
def test_no_tracker(self):
182
m = zmq.Message('asdf'.encode(), track=False)
183
self.assertRaises(ValueError, getattr, m, 'done')
185
self.assertRaises(ValueError, getattr, m2, 'done')
186
self.assertRaises(ValueError, zmq.MessageTracker, m)
188
def test_multi_tracker(self):
189
m = zmq.Message('asdf'.encode(), track=True)
190
m2 = zmq.Message('whoda'.encode(), track=True)
191
mt = zmq.MessageTracker(m,m2)
192
self.assertFalse(m.done)
193
self.assertFalse(mt.done)
194
self.assertRaises(zmq.NotDone, mt.wait, 0.1)
197
self.assertRaises(zmq.NotDone, mt.wait, 0.1)
198
self.assertFalse(mt.done)
200
self.assertTrue(mt.wait() is None)
201
self.assertTrue(mt.done)
204
def test_buffer_in(self):
205
"""test using a buffer as input"""
211
ins = "§§¶•ªº˜µ¬˚…∆˙åß∂©œ∑´†≈ç√".encode('utf8')
213
ins = "§§¶•ªº˜µ¬˚…∆˙åß∂©œ∑´†≈ç√"
214
m = zmq.Message(view(ins))
216
def test_bad_buffer_in(self):
217
"""test using a bad object"""
218
self.assertRaises(TypeError, zmq.Message, 5)
219
self.assertRaises(TypeError, zmq.Message, object())
221
def test_buffer_out(self):
222
"""receiving buffered output"""
228
ins = "§§¶•ªº˜µ¬˚…∆˙åß∂©œ∑´†≈ç√".encode('utf8')
230
ins = "§§¶•ªº˜µ¬˚…∆˙åß∂©œ∑´†≈ç√"
233
self.assertTrue(isinstance(outb, view))
234
self.assert_(outb is m.buffer)
235
self.assert_(m.buffer is m.buffer)
237
def test_multisend(self):
238
"""ensure that a message remains intact after multiple sends"""
239
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
240
s = "message".encode()
242
self.assertEquals(s, m.bytes)
244
a.send(m, copy=False)
246
self.assertEquals(s, m.bytes)
247
a.send(m, copy=False)
249
self.assertEquals(s, m.bytes)
252
self.assertEquals(s, m.bytes)
255
self.assertEquals(s, m.bytes)
258
self.assertEquals(s,r)
259
self.assertEquals(s, m.bytes)
261
def test_buffer_numpy(self):
262
"""test non-copying numpy array messages"""
266
raise SkipTest("NumPy unavailable")
267
shapes = map(numpy.random.randint, [2]*5,[16]*5)
268
for i in range(1,len(shapes)+1):
270
A = numpy.random.random(shape)
272
self.assertEquals(A.data, m.buffer)
273
B = numpy.frombuffer(m.buffer,dtype=A.dtype).reshape(A.shape)
274
self.assertEquals((A==B).all(), True)
276
def test_memoryview(self):
277
"""test messages from memoryview (only valid for python >= 2.7)"""
278
major,minor = sys.version_info[:2]
279
if not (major >= 3 or (major == 2 and minor >= 7)):
282
s = 'carrotjuice'.encode()
287
self.assertEquals(s2,s)
288
self.assertEquals(m.bytes,s)