68
63
def test_reply(self):
69
64
alice, bob, mon = self.build_device()
70
alices = "hello bob".encode().split()
65
alices = b"hello bob".split()
71
66
alice.send_multipart(alices)
72
bobs = bob.recv_multipart()
67
bobs = self.recv_multipart(bob)
73
68
self.assertEquals(alices, bobs)
74
bobs = "hello alice".encode().split()
69
bobs = b"hello alice".split()
75
70
bob.send_multipart(bobs)
76
alices = alice.recv_multipart()
71
alices = self.recv_multipart(alice)
77
72
self.assertEquals(alices, bobs)
78
73
self.teardown_device()
80
75
def test_queue(self):
81
76
alice, bob, mon = self.build_device()
82
alices = "hello bob".encode().split()
77
alices = b"hello bob".split()
83
78
alice.send_multipart(alices)
84
alices2 = "hello again".encode().split()
79
alices2 = b"hello again".split()
85
80
alice.send_multipart(alices2)
86
alices3 = "hello again and again".encode().split()
81
alices3 = b"hello again and again".split()
87
82
alice.send_multipart(alices3)
88
bobs = bob.recv_multipart()
83
bobs = self.recv_multipart(bob)
89
84
self.assertEquals(alices, bobs)
90
bobs = bob.recv_multipart()
85
bobs = self.recv_multipart(bob)
91
86
self.assertEquals(alices2, bobs)
92
bobs = bob.recv_multipart()
87
bobs = self.recv_multipart(bob)
93
88
self.assertEquals(alices3, bobs)
94
bobs = "hello alice".encode().split()
89
bobs = b"hello alice".split()
95
90
bob.send_multipart(bobs)
96
alices = alice.recv_multipart()
91
alices = self.recv_multipart(alice)
97
92
self.assertEquals(alices, bobs)
98
93
self.teardown_device()
100
95
def test_monitor(self):
101
96
alice, bob, mon = self.build_device()
102
alices = "hello bob".encode().split()
97
alices = b"hello bob".split()
103
98
alice.send_multipart(alices)
104
alices2 = "hello again".encode().split()
99
alices2 = b"hello again".split()
105
100
alice.send_multipart(alices2)
106
alices3 = "hello again and again".encode().split()
101
alices3 = b"hello again and again".split()
107
102
alice.send_multipart(alices3)
108
bobs = bob.recv_multipart()
103
bobs = self.recv_multipart(bob)
109
104
self.assertEquals(alices, bobs)
110
mons = mon.recv_multipart()
111
self.assertEquals(['in'.encode()]+bobs, mons)
112
bobs = bob.recv_multipart()
105
mons = self.recv_multipart(mon)
106
self.assertEquals([b'in']+bobs, mons)
107
bobs = self.recv_multipart(bob)
113
108
self.assertEquals(alices2, bobs)
114
bobs = bob.recv_multipart()
109
bobs = self.recv_multipart(bob)
115
110
self.assertEquals(alices3, bobs)
116
mons = mon.recv_multipart()
117
self.assertEquals(['in'.encode()]+alices2, mons)
118
bobs = "hello alice".encode().split()
111
mons = self.recv_multipart(mon)
112
self.assertEquals([b'in']+alices2, mons)
113
bobs = b"hello alice".split()
119
114
bob.send_multipart(bobs)
120
alices = alice.recv_multipart()
115
alices = self.recv_multipart(alice)
121
116
self.assertEquals(alices, bobs)
122
mons = mon.recv_multipart()
123
self.assertEquals(['in'.encode()]+alices3, mons)
124
mons = mon.recv_multipart()
125
self.assertEquals(['out'.encode()]+bobs, mons)
117
mons = self.recv_multipart(mon)
118
self.assertEquals([b'in']+alices3, mons)
119
mons = self.recv_multipart(mon)
120
self.assertEquals([b'out']+bobs, mons)
126
121
self.teardown_device()
128
123
def test_prefix(self):
129
alice, bob, mon = self.build_device("".encode(), 'foo'.encode(), 'bar'.encode())
130
alices = "hello bob".encode().split()
124
alice, bob, mon = self.build_device(b"", b'foo', b'bar')
125
alices = b"hello bob".split()
131
126
alice.send_multipart(alices)
132
alices2 = "hello again".encode().split()
127
alices2 = b"hello again".split()
133
128
alice.send_multipart(alices2)
134
alices3 = "hello again and again".encode().split()
129
alices3 = b"hello again and again".split()
135
130
alice.send_multipart(alices3)
136
bobs = bob.recv_multipart()
131
bobs = self.recv_multipart(bob)
137
132
self.assertEquals(alices, bobs)
138
mons = mon.recv_multipart()
139
self.assertEquals(['foo'.encode()]+bobs, mons)
140
bobs = bob.recv_multipart()
133
mons = self.recv_multipart(mon)
134
self.assertEquals([b'foo']+bobs, mons)
135
bobs = self.recv_multipart(bob)
141
136
self.assertEquals(alices2, bobs)
142
bobs = bob.recv_multipart()
137
bobs = self.recv_multipart(bob)
143
138
self.assertEquals(alices3, bobs)
144
mons = mon.recv_multipart()
145
self.assertEquals(['foo'.encode()]+alices2, mons)
146
bobs = "hello alice".encode().split()
139
mons = self.recv_multipart(mon)
140
self.assertEquals([b'foo']+alices2, mons)
141
bobs = b"hello alice".split()
147
142
bob.send_multipart(bobs)
148
alices = alice.recv_multipart()
143
alices = self.recv_multipart(alice)
149
144
self.assertEquals(alices, bobs)
150
mons = mon.recv_multipart()
151
self.assertEquals(['foo'.encode()]+alices3, mons)
152
mons = mon.recv_multipart()
153
self.assertEquals(['bar'.encode()]+bobs, mons)
145
mons = self.recv_multipart(mon)
146
self.assertEquals([b'foo']+alices3, mons)
147
mons = self.recv_multipart(mon)
148
self.assertEquals([b'bar']+bobs, mons)
154
149
self.teardown_device()
156
151
def test_monitor_subscribe(self):
157
alice, bob, mon = self.build_device("out".encode())
158
alices = "hello bob".encode().split()
152
alice, bob, mon = self.build_device(b"out")
153
alices = b"hello bob".split()
159
154
alice.send_multipart(alices)
160
alices2 = "hello again".encode().split()
155
alices2 = b"hello again".split()
161
156
alice.send_multipart(alices2)
162
alices3 = "hello again and again".encode().split()
157
alices3 = b"hello again and again".split()
163
158
alice.send_multipart(alices3)
164
bobs = bob.recv_multipart()
159
bobs = self.recv_multipart(bob)
165
160
self.assertEquals(alices, bobs)
166
bobs = bob.recv_multipart()
161
bobs = self.recv_multipart(bob)
167
162
self.assertEquals(alices2, bobs)
168
bobs = bob.recv_multipart()
163
bobs = self.recv_multipart(bob)
169
164
self.assertEquals(alices3, bobs)
170
bobs = "hello alice".encode().split()
165
bobs = b"hello alice".split()
171
166
bob.send_multipart(bobs)
172
alices = alice.recv_multipart()
167
alices = self.recv_multipart(alice)
173
168
self.assertEquals(alices, bobs)
174
mons = mon.recv_multipart()
175
self.assertEquals(['out'.encode()]+bobs, mons)
169
mons = self.recv_multipart(mon)
170
self.assertEquals([b'out']+bobs, mons)
176
171
self.teardown_device()
173
def test_router_router(self):
174
"""test router-router MQ devices"""
175
dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
176
dev.setsockopt_in(zmq.LINGER, 0)
177
dev.setsockopt_out(zmq.LINGER, 0)
178
dev.setsockopt_mon(zmq.LINGER, 0)
180
binder = self.context.socket(zmq.DEALER)
181
porta = binder.bind_to_random_port('tcp://127.0.0.1')
182
portb = binder.bind_to_random_port('tcp://127.0.0.1')
185
a = self.context.socket(zmq.DEALER)
187
b = self.context.socket(zmq.DEALER)
190
a.connect('tcp://127.0.0.1:%i'%porta)
191
dev.bind_in('tcp://127.0.0.1:%i'%porta)
192
b.connect('tcp://127.0.0.1:%i'%portb)
193
dev.bind_out('tcp://127.0.0.1:%i'%portb)
196
if zmq.zmq_version_info() >= (3,1,0):
197
# flush erroneous poll state, due to LIBZMQ-280
198
ping_msg = [ b'ping', b'pong' ]
200
s.send_multipart(ping_msg)
205
msg = [ b'hello', b'there' ]
206
a.send_multipart([b'b']+msg)
207
bmsg = self.recv_multipart(b)
208
self.assertEquals(bmsg, [b'a']+msg)
209
b.send_multipart(bmsg)
210
amsg = self.recv_multipart(a)
211
self.assertEquals(amsg, [b'b']+msg)