22
24
#-----------------------------------------------------------------------------
25
from zmq.tests import BaseZMQTestCase
30
from zmq.tests import BaseZMQTestCase, SkipTest
31
from zmq.utils.strtypes import bytes, unicode
33
from queue import Queue
35
from Queue import Queue
27
37
#-----------------------------------------------------------------------------
29
39
#-----------------------------------------------------------------------------
32
41
class TestSocket(BaseZMQTestCase):
34
43
def test_create(self):
36
s = ctx.socket(zmq.PUB)
44
s = self.context.socket(zmq.PUB)
37
45
# Superluminal protocol not yet implemented
38
46
self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.bind, 'ftl://')
39
47
self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.connect, 'ftl://')
51
def test_unicode_sockopts(self):
52
"""test setting/getting sockopts with unicode strings"""
54
if str is not unicode:
55
topic = topic.decode('utf8')
56
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
57
self.assertEquals(s.send_unicode, s.send_unicode)
58
self.assertEquals(p.recv_unicode, p.recv_unicode)
59
self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
60
self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
61
self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
62
s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
63
s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
64
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
65
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.SUBSCRIBE)
66
st = s.getsockopt(zmq.IDENTITY)
67
self.assertEquals(st.decode('utf16'), s.getsockopt_unicode(zmq.IDENTITY, 'utf16'))
68
time.sleep(0.1) # wait for connection/subscription
69
p.send_unicode(topic,zmq.SNDMORE)
70
p.send_unicode(topic*2, encoding='latin-1')
71
self.assertEquals(topic, s.recv_unicode())
72
self.assertEquals(topic*2, s.recv_unicode(encoding='latin-1'))
74
def test_send_unicode(self):
75
"test sending unicode objects"
76
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
77
self.sockets.extend([a,b])
79
if str is not unicode:
81
self.assertRaises(TypeError, a.send, u,copy=False)
82
self.assertRaises(TypeError, a.send, u,copy=True)
85
self.assertEquals(s,u.encode('utf8'))
86
self.assertEquals(s.decode('utf8'),u)
87
a.send_unicode(u,encoding='utf16')
88
s = b.recv_unicode(encoding='utf16')
89
self.assertEquals(s,u)
91
def test_tracker(self):
92
"test the MessageTracker object for tracking when zmq is done with a buffer"
93
addr = 'tcp://127.0.0.1'
94
a = self.context.socket(zmq.XREQ)
95
port = a.bind_to_random_port(addr)
98
iface = "%s:%i"%(addr,port)
99
a = self.context.socket(zmq.XREQ)
100
a.setsockopt(zmq.IDENTITY, "a".encode())
101
b = self.context.socket(zmq.XREP)
102
self.sockets.extend([a,b])
105
p1 = a.send('something'.encode(), copy=False, track=True)
106
self.assertTrue(isinstance(p1, zmq.MessageTracker))
107
self.assertFalse(p1.done)
108
p2 = a.send_multipart(list(map(str.encode, ['something', 'else'])), copy=False, track=True)
109
self.assert_(isinstance(p2, zmq.MessageTracker))
110
self.assertEquals(p2.done, False)
111
self.assertEquals(p1.done, False)
114
msg = b.recv_multipart()
115
self.assertEquals(p1.done, True)
116
self.assertEquals(msg, (list(map(str.encode, ['a', 'something']))))
117
msg = b.recv_multipart()
118
self.assertEquals(p2.done, True)
119
self.assertEquals(msg, list(map(str.encode, ['a', 'something', 'else'])))
120
m = zmq.Message("again".encode(), track=True)
121
self.assertEquals(m.done, False)
122
p1 = a.send(m, copy=False)
123
p2 = a.send(m, copy=False)
124
self.assertEquals(m.done, False)
125
self.assertEquals(p1.done, False)
126
self.assertEquals(p2.done, False)
127
msg = b.recv_multipart()
128
self.assertEquals(m.done, False)
129
self.assertEquals(msg, list(map(str.encode, ['a', 'again'])))
130
msg = b.recv_multipart()
131
self.assertEquals(m.done, False)
132
self.assertEquals(msg, list(map(str.encode, ['a', 'again'])))
133
self.assertEquals(p1.done, False)
134
self.assertEquals(p2.done, False)
138
self.assertEquals(p1.done, True)
139
self.assertEquals(p2.done, True)
140
m = zmq.Message('something'.encode(), track=False)
141
self.assertRaises(ValueError, a.send, m, copy=False, track=True)
42
144
def test_close(self):
44
s = ctx.socket(zmq.PUB)
145
s = self.context.socket(zmq.PUB)
46
self.assertRaises(zmq.ZMQError, s.bind, '')
47
self.assertRaises(zmq.ZMQError, s.connect, '')
48
self.assertRaises(zmq.ZMQError, s.setsockopt, zmq.SUBSCRIBE, '')
49
self.assertRaises(zmq.ZMQError, s.send, 'asdf')
147
self.assertRaises(zmq.ZMQError, s.bind, ''.encode())
148
self.assertRaises(zmq.ZMQError, s.connect, ''.encode())
149
self.assertRaises(zmq.ZMQError, s.setsockopt, zmq.SUBSCRIBE, ''.encode())
150
self.assertRaises(zmq.ZMQError, s.send, 'asdf'.encode())
50
151
self.assertRaises(zmq.ZMQError, s.recv)