~ubuntu-branches/ubuntu/wily/pyzmq/wily

« back to all changes in this revision

Viewing changes to zmq/tests/test_monqueue.py

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2013-02-24 19:23:15 UTC
  • mfrom: (1.2.1) (9 sid)
  • mto: This revision was merged to the branch mainline in revision 10.
  • Revision ID: package-import@ubuntu.com-20130224192315-qhmwp3m3ymk8r60d
Tags: 2.2.0.1-1
* New upstream release
* relicense debian packaging to LGPL-3
* update watch file to use github directly
  thanks to Bart Martens for the file
* add autopkgtests
* drop obsolete DM-Upload-Allowed
* bump standard to 3.9.4, no changes required

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#
2
 
#    Copyright (c) 2010 Min Ragan-Kelley
3
 
#
4
 
#    This file is part of pyzmq.
5
 
#
6
 
#    pyzmq is free software; you can redistribute it and/or modify it under
7
 
#    the terms of the Lesser GNU General Public License as published by
8
 
#    the Free Software Foundation; either version 3 of the License, or
9
 
#    (at your option) any later version.
10
 
#
11
 
#    pyzmq is distributed in the hope that it will be useful,
12
 
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
#    Lesser GNU General Public License for more details.
15
 
#
16
 
#    You should have received a copy of the Lesser GNU General Public License
17
 
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
 
#
 
1
#-----------------------------------------------------------------------------
 
2
#  Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley
 
3
#
 
4
#  This file is part of pyzmq
 
5
#
 
6
#  Distributed under the terms of the New BSD License.  The full license is in
 
7
#  the file COPYING.BSD, distributed as part of this software.
 
8
#-----------------------------------------------------------------------------
19
9
 
20
10
#-----------------------------------------------------------------------------
21
11
# Imports
26
16
 
27
17
import zmq
28
18
from zmq import devices
29
 
from zmq.tests import BaseZMQTestCase
30
19
 
 
20
from zmq.tests import BaseZMQTestCase, SkipTest
31
21
 
32
22
#-----------------------------------------------------------------------------
33
23
# Tests
34
24
#-----------------------------------------------------------------------------
 
25
devices.ThreadMonitoredQueue.context_factory = zmq.Context
35
26
 
36
27
class TestMonitoredQueue(BaseZMQTestCase):
37
28
    sockets = []
38
 
    pass
39
29
    
40
 
    def build_device(self, mon_sub="".encode(), in_prefix='in'.encode(), out_prefix='out'.encode()):
41
 
        self.skip_if_pgm()
 
30
    def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
42
31
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
43
32
                                            in_prefix, out_prefix)
44
33
        alice = self.context.socket(zmq.PAIR)
53
42
        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
54
43
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
55
44
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
 
45
        self.device.start()
56
46
        time.sleep(.2)
57
 
        self.device.start()
 
47
        try:
 
48
            # this is currenlty necessary to ensure no dropped monitor messages
 
49
            # see LIBZMQ-248 for more info
 
50
            mon.recv_multipart(zmq.NOBLOCK)
 
51
        except zmq.ZMQError:
 
52
            pass
58
53
        self.sockets.extend([alice, bob, mon])
59
54
        return alice, bob, mon
60
55
        
67
62
        
68
63
    def test_reply(self):
69
64
        alice, bob, mon = self.build_device()
70
 
        alices = "hello bob".encode().split()
 
65
        alices = b"hello bob".split()
71
66
        alice.send_multipart(alices)
72
 
        bobs = bob.recv_multipart()
 
67
        bobs = self.recv_multipart(bob)
73
68
        self.assertEquals(alices, bobs)
74
 
        bobs = "hello alice".encode().split()
 
69
        bobs = b"hello alice".split()
75
70
        bob.send_multipart(bobs)
76
 
        alices = alice.recv_multipart()
 
71
        alices = self.recv_multipart(alice)
77
72
        self.assertEquals(alices, bobs)
78
73
        self.teardown_device()
79
74
    
80
75
    def test_queue(self):
81
76
        alice, bob, mon = self.build_device()
82
 
        alices = "hello bob".encode().split()
 
77
        alices = b"hello bob".split()
83
78
        alice.send_multipart(alices)
84
 
        alices2 = "hello again".encode().split()
 
79
        alices2 = b"hello again".split()
85
80
        alice.send_multipart(alices2)
86
 
        alices3 = "hello again and again".encode().split()
 
81
        alices3 = b"hello again and again".split()
87
82
        alice.send_multipart(alices3)
88
 
        bobs = bob.recv_multipart()
 
83
        bobs = self.recv_multipart(bob)
89
84
        self.assertEquals(alices, bobs)
90
 
        bobs = bob.recv_multipart()
 
85
        bobs = self.recv_multipart(bob)
91
86
        self.assertEquals(alices2, bobs)
92
 
        bobs = bob.recv_multipart()
 
87
        bobs = self.recv_multipart(bob)
93
88
        self.assertEquals(alices3, bobs)
94
 
        bobs = "hello alice".encode().split()
 
89
        bobs = b"hello alice".split()
95
90
        bob.send_multipart(bobs)
96
 
        alices = alice.recv_multipart()
 
91
        alices = self.recv_multipart(alice)
97
92
        self.assertEquals(alices, bobs)
98
93
        self.teardown_device()
99
94
    
100
95
    def test_monitor(self):
101
96
        alice, bob, mon = self.build_device()
102
 
        alices = "hello bob".encode().split()
 
97
        alices = b"hello bob".split()
103
98
        alice.send_multipart(alices)
104
 
        alices2 = "hello again".encode().split()
 
99
        alices2 = b"hello again".split()
105
100
        alice.send_multipart(alices2)
106
 
        alices3 = "hello again and again".encode().split()
 
101
        alices3 = b"hello again and again".split()
107
102
        alice.send_multipart(alices3)
108
 
        bobs = bob.recv_multipart()
 
103
        bobs = self.recv_multipart(bob)
109
104
        self.assertEquals(alices, bobs)
110
 
        mons = mon.recv_multipart()
111
 
        self.assertEquals(['in'.encode()]+bobs, mons)
112
 
        bobs = bob.recv_multipart()
 
105
        mons = self.recv_multipart(mon)
 
106
        self.assertEquals([b'in']+bobs, mons)
 
107
        bobs = self.recv_multipart(bob)
113
108
        self.assertEquals(alices2, bobs)
114
 
        bobs = bob.recv_multipart()
 
109
        bobs = self.recv_multipart(bob)
115
110
        self.assertEquals(alices3, bobs)
116
 
        mons = mon.recv_multipart()
117
 
        self.assertEquals(['in'.encode()]+alices2, mons)
118
 
        bobs = "hello alice".encode().split()
 
111
        mons = self.recv_multipart(mon)
 
112
        self.assertEquals([b'in']+alices2, mons)
 
113
        bobs = b"hello alice".split()
119
114
        bob.send_multipart(bobs)
120
 
        alices = alice.recv_multipart()
 
115
        alices = self.recv_multipart(alice)
121
116
        self.assertEquals(alices, bobs)
122
 
        mons = mon.recv_multipart()
123
 
        self.assertEquals(['in'.encode()]+alices3, mons)
124
 
        mons = mon.recv_multipart()
125
 
        self.assertEquals(['out'.encode()]+bobs, mons)
 
117
        mons = self.recv_multipart(mon)
 
118
        self.assertEquals([b'in']+alices3, mons)
 
119
        mons = self.recv_multipart(mon)
 
120
        self.assertEquals([b'out']+bobs, mons)
126
121
        self.teardown_device()
127
122
    
128
123
    def test_prefix(self):
129
 
        alice, bob, mon = self.build_device("".encode(), 'foo'.encode(), 'bar'.encode())
130
 
        alices = "hello bob".encode().split()
 
124
        alice, bob, mon = self.build_device(b"", b'foo', b'bar')
 
125
        alices = b"hello bob".split()
131
126
        alice.send_multipart(alices)
132
 
        alices2 = "hello again".encode().split()
 
127
        alices2 = b"hello again".split()
133
128
        alice.send_multipart(alices2)
134
 
        alices3 = "hello again and again".encode().split()
 
129
        alices3 = b"hello again and again".split()
135
130
        alice.send_multipart(alices3)
136
 
        bobs = bob.recv_multipart()
 
131
        bobs = self.recv_multipart(bob)
137
132
        self.assertEquals(alices, bobs)
138
 
        mons = mon.recv_multipart()
139
 
        self.assertEquals(['foo'.encode()]+bobs, mons)
140
 
        bobs = bob.recv_multipart()
 
133
        mons = self.recv_multipart(mon)
 
134
        self.assertEquals([b'foo']+bobs, mons)
 
135
        bobs = self.recv_multipart(bob)
141
136
        self.assertEquals(alices2, bobs)
142
 
        bobs = bob.recv_multipart()
 
137
        bobs = self.recv_multipart(bob)
143
138
        self.assertEquals(alices3, bobs)
144
 
        mons = mon.recv_multipart()
145
 
        self.assertEquals(['foo'.encode()]+alices2, mons)
146
 
        bobs = "hello alice".encode().split()
 
139
        mons = self.recv_multipart(mon)
 
140
        self.assertEquals([b'foo']+alices2, mons)
 
141
        bobs = b"hello alice".split()
147
142
        bob.send_multipart(bobs)
148
 
        alices = alice.recv_multipart()
 
143
        alices = self.recv_multipart(alice)
149
144
        self.assertEquals(alices, bobs)
150
 
        mons = mon.recv_multipart()
151
 
        self.assertEquals(['foo'.encode()]+alices3, mons)
152
 
        mons = mon.recv_multipart()
153
 
        self.assertEquals(['bar'.encode()]+bobs, mons)
 
145
        mons = self.recv_multipart(mon)
 
146
        self.assertEquals([b'foo']+alices3, mons)
 
147
        mons = self.recv_multipart(mon)
 
148
        self.assertEquals([b'bar']+bobs, mons)
154
149
        self.teardown_device()
155
150
    
156
151
    def test_monitor_subscribe(self):
157
 
        alice, bob, mon = self.build_device("out".encode())
158
 
        alices = "hello bob".encode().split()
 
152
        alice, bob, mon = self.build_device(b"out")
 
153
        alices = b"hello bob".split()
159
154
        alice.send_multipart(alices)
160
 
        alices2 = "hello again".encode().split()
 
155
        alices2 = b"hello again".split()
161
156
        alice.send_multipart(alices2)
162
 
        alices3 = "hello again and again".encode().split()
 
157
        alices3 = b"hello again and again".split()
163
158
        alice.send_multipart(alices3)
164
 
        bobs = bob.recv_multipart()
 
159
        bobs = self.recv_multipart(bob)
165
160
        self.assertEquals(alices, bobs)
166
 
        bobs = bob.recv_multipart()
 
161
        bobs = self.recv_multipart(bob)
167
162
        self.assertEquals(alices2, bobs)
168
 
        bobs = bob.recv_multipart()
 
163
        bobs = self.recv_multipart(bob)
169
164
        self.assertEquals(alices3, bobs)
170
 
        bobs = "hello alice".encode().split()
 
165
        bobs = b"hello alice".split()
171
166
        bob.send_multipart(bobs)
172
 
        alices = alice.recv_multipart()
 
167
        alices = self.recv_multipart(alice)
173
168
        self.assertEquals(alices, bobs)
174
 
        mons = mon.recv_multipart()
175
 
        self.assertEquals(['out'.encode()]+bobs, mons)
 
169
        mons = self.recv_multipart(mon)
 
170
        self.assertEquals([b'out']+bobs, mons)
176
171
        self.teardown_device()
177
172
    
 
173
    def test_router_router(self):
 
174
        """test router-router MQ devices"""
 
175
        dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
 
176
        dev.setsockopt_in(zmq.LINGER, 0)
 
177
        dev.setsockopt_out(zmq.LINGER, 0)
 
178
        dev.setsockopt_mon(zmq.LINGER, 0)
 
179
        
 
180
        binder = self.context.socket(zmq.DEALER)
 
181
        porta = binder.bind_to_random_port('tcp://127.0.0.1')
 
182
        portb = binder.bind_to_random_port('tcp://127.0.0.1')
 
183
        binder.close()
 
184
        time.sleep(0.1)
 
185
        a = self.context.socket(zmq.DEALER)
 
186
        a.identity = b'a'
 
187
        b = self.context.socket(zmq.DEALER)
 
188
        b.identity = b'b'
 
189
        
 
190
        a.connect('tcp://127.0.0.1:%i'%porta)
 
191
        dev.bind_in('tcp://127.0.0.1:%i'%porta)
 
192
        b.connect('tcp://127.0.0.1:%i'%portb)
 
193
        dev.bind_out('tcp://127.0.0.1:%i'%portb)
 
194
        dev.start()
 
195
        time.sleep(0.2)
 
196
        if zmq.zmq_version_info() >= (3,1,0):
 
197
            # flush erroneous poll state, due to LIBZMQ-280
 
198
            ping_msg = [ b'ping', b'pong' ]
 
199
            for s in (a,b):
 
200
                s.send_multipart(ping_msg)
 
201
                try:
 
202
                    s.recv(zmq.NOBLOCK)
 
203
                except zmq.ZMQError:
 
204
                    pass
 
205
        msg = [ b'hello', b'there' ]
 
206
        a.send_multipart([b'b']+msg)
 
207
        bmsg = self.recv_multipart(b)
 
208
        self.assertEquals(bmsg, [b'a']+msg)
 
209
        b.send_multipart(bmsg)
 
210
        amsg = self.recv_multipart(a)
 
211
        self.assertEquals(amsg, [b'b']+msg)