~ubuntu-branches/ubuntu/wily/pyzmq/wily

« back to all changes in this revision

Viewing changes to zmq/tests/test_poll.py

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2013-02-24 19:23:15 UTC
  • mfrom: (1.2.1) (9 sid)
  • mto: This revision was merged to the branch mainline in revision 10.
  • Revision ID: package-import@ubuntu.com-20130224192315-qhmwp3m3ymk8r60d
Tags: 2.2.0.1-1
* New upstream release
* relicense debian packaging to LGPL-3
* update watch file to use github directly
  thanks to Bart Martens for the file
* add autopkgtests
* drop obsolete DM-Upload-Allowed
* bump standard to 3.9.4, no changes required

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#
2
 
#    Copyright (c) 2010 Brian E. Granger
3
 
#
4
 
#    This file is part of pyzmq.
5
 
#
6
 
#    pyzmq is free software; you can redistribute it and/or modify it under
7
 
#    the terms of the Lesser GNU General Public License as published by
8
 
#    the Free Software Foundation; either version 3 of the License, or
9
 
#    (at your option) any later version.
10
 
#
11
 
#    pyzmq is distributed in the hope that it will be useful,
12
 
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
#    Lesser GNU General Public License for more details.
15
 
#
16
 
#    You should have received a copy of the Lesser GNU General Public License
17
 
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
 
#
 
1
#-----------------------------------------------------------------------------
 
2
#  Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley
 
3
#
 
4
#  This file is part of pyzmq
 
5
#
 
6
#  Distributed under the terms of the New BSD License.  The full license is in
 
7
#  the file COPYING.BSD, distributed as part of this software.
 
8
#-----------------------------------------------------------------------------
19
9
 
20
10
#-----------------------------------------------------------------------------
21
11
# Imports
25
15
from unittest import TestCase
26
16
 
27
17
import zmq
28
 
from zmq.tests import PollZMQTestCase
 
18
 
 
19
from zmq.tests import PollZMQTestCase, have_gevent, GreenTest
29
20
 
30
21
#-----------------------------------------------------------------------------
31
22
# Tests
36
27
 
37
28
class TestPoll(PollZMQTestCase):
38
29
 
 
30
    Poller = zmq.Poller
 
31
 
39
32
    # This test is failing due to this issue:
40
33
    # http://github.com/sustrik/zeromq2/issues#issue/26
41
34
    def test_pair(self):
44
37
        # Sleep to allow sockets to connect.
45
38
        wait()
46
39
 
47
 
        poller = zmq.Poller()
 
40
        poller = self.Poller()
48
41
        poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
49
42
        poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
50
43
        # Poll result should contain both sockets
53
46
        self.assertEquals(socks[s1], zmq.POLLOUT)
54
47
        self.assertEquals(socks[s2], zmq.POLLOUT)
55
48
        # Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
56
 
        s1.send('msg1'.encode())
57
 
        s2.send('msg2'.encode())
 
49
        s1.send(b'msg1')
 
50
        s2.send(b'msg2')
58
51
        wait()
59
52
        socks = dict(poller.poll())
60
53
        self.assertEquals(socks[s1], zmq.POLLOUT|zmq.POLLIN)
78
71
        # Sleep to allow sockets to connect.
79
72
        wait()
80
73
 
81
 
        poller = zmq.Poller()
 
74
        poller = self.Poller()
82
75
        poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
83
76
        poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
84
77
 
88
81
        self.assertEquals(socks[s2], zmq.POLLOUT)
89
82
 
90
83
        # Make sure that s2 goes immediately into state 0 after send.
91
 
        s2.send('msg1'.encode())
 
84
        s2.send(b'msg1')
92
85
        socks = dict(poller.poll())
93
86
        self.assertEquals(s2 in socks, 0)
94
87
 
103
96
        self.assertEquals(socks[s1], zmq.POLLOUT)
104
97
 
105
98
        # Make sure s1 goes into state 0 after send.
106
 
        s1.send('msg2'.encode())
 
99
        s1.send(b'msg2')
107
100
        socks = dict(poller.poll())
108
101
        self.assertEquals(s1 in socks, 0)
109
102
 
122
115
 
123
116
        # Wait for everything to finish.
124
117
        wait()
 
118
    
 
119
    def test_no_events(self):
 
120
        s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
 
121
        poller = self.Poller()
 
122
        poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
 
123
        poller.register(s2, 0)
 
124
        self.assertTrue(s1 in poller.sockets)
 
125
        self.assertFalse(s2 in poller.sockets)
 
126
        poller.register(s1, 0)
 
127
        self.assertFalse(s1 in poller.sockets)
125
128
 
126
129
    def test_pubsub(self):
127
130
        s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
128
 
        s2.setsockopt(zmq.SUBSCRIBE, ''.encode())
 
131
        s2.setsockopt(zmq.SUBSCRIBE, b'')
129
132
 
130
133
        # Sleep to allow sockets to connect.
131
134
        wait()
132
135
 
133
 
        poller = zmq.Poller()
 
136
        poller = self.Poller()
134
137
        poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
135
 
        poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
 
138
        poller.register(s2, zmq.POLLIN)
136
139
 
137
140
        # Now make sure that both are send ready.
138
141
        socks = dict(poller.poll())
139
142
        self.assertEquals(socks[s1], zmq.POLLOUT)
140
143
        self.assertEquals(s2 in socks, 0)
141
144
        # Make sure that s1 stays in POLLOUT after a send.
142
 
        s1.send('msg1'.encode())
 
145
        s1.send(b'msg1')
143
146
        socks = dict(poller.poll())
144
147
        self.assertEquals(socks[s1], zmq.POLLOUT)
145
148
 
158
161
 
159
162
        # Wait for everything to finish.
160
163
        wait()
 
164
    def test_timeout(self):
 
165
        """make sure Poller.poll timeout has the right units (milliseconds)."""
 
166
        s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
 
167
        poller = self.Poller()
 
168
        poller.register(s1, zmq.POLLIN)
 
169
        tic = time.time()
 
170
        evt = poller.poll(.005)
 
171
        toc = time.time()
 
172
        self.assertTrue(toc-tic < 0.1)
 
173
        tic = time.time()
 
174
        evt = poller.poll(5)
 
175
        toc = time.time()
 
176
        self.assertTrue(toc-tic < 0.1)
 
177
        self.assertTrue(toc-tic > .001)
 
178
        tic = time.time()
 
179
        evt = poller.poll(500)
 
180
        toc = time.time()
 
181
        self.assertTrue(toc-tic < 1)
 
182
        self.assertTrue(toc-tic > 0.1)
161
183
 
162
184
class TestSelect(PollZMQTestCase):
163
185
 
164
 
    # This test is failing due to this issue:
165
 
    # http://github.com/sustrik/zeromq2/issues#issue/26
166
186
    def test_pair(self):
167
187
        s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
168
188
 
175
195
        self.assert_(s1 not in rlist)
176
196
        self.assert_(s2 not in rlist)
177
197
 
 
198
    def test_timeout(self):
 
199
        """make sure select timeout has the right units (seconds)."""
 
200
        s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
 
201
        tic = time.time()
 
202
        r,w,x = zmq.select([s1,s2],[],[],.005)
 
203
        toc = time.time()
 
204
        self.assertTrue(toc-tic < 1)
 
205
        self.assertTrue(toc-tic > 0.001)
 
206
        tic = time.time()
 
207
        r,w,x = zmq.select([s1,s2],[],[],.25)
 
208
        toc = time.time()
 
209
        self.assertTrue(toc-tic < 1)
 
210
        self.assertTrue(toc-tic > 0.1)
 
211
 
 
212
 
 
213
if have_gevent:
 
214
    import gevent
 
215
    from zmq import green as gzmq
 
216
 
 
217
    class TestPollGreen(GreenTest, TestPoll):
 
218
        Poller = gzmq.Poller
 
219
 
 
220
        def test_wakeup(self):
 
221
            s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
 
222
            poller = self.Poller()
 
223
            poller.register(s2, zmq.POLLIN)
 
224
 
 
225
            tic = time.time()
 
226
            r = gevent.spawn(lambda: poller.poll(10000))
 
227
            s = gevent.spawn(lambda: s1.send(b'msg1'))
 
228
            r.join()
 
229
            toc = time.time()
 
230
            self.assertTrue(toc-tic < 1)
 
231
            
 
232