2
# Copyright (c) 2010 Brian E. Granger
4
# This file is part of pyzmq.
6
# pyzmq is free software; you can redistribute it and/or modify it under
7
# the terms of the Lesser GNU General Public License as published by
8
# the Free Software Foundation; either version 3 of the License, or
9
# (at your option) any later version.
11
# pyzmq is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# Lesser GNU General Public License for more details.
16
# You should have received a copy of the Lesser GNU General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1
#-----------------------------------------------------------------------------
2
# Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley
4
# This file is part of pyzmq
6
# Distributed under the terms of the New BSD License. The full license is in
7
# the file COPYING.BSD, distributed as part of this software.
8
#-----------------------------------------------------------------------------
20
10
#-----------------------------------------------------------------------------
25
15
from unittest import TestCase
28
from zmq.tests import PollZMQTestCase
19
from zmq.tests import PollZMQTestCase, have_gevent, GreenTest
30
21
#-----------------------------------------------------------------------------
44
37
# Sleep to allow sockets to connect.
40
poller = self.Poller()
48
41
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
49
42
poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
50
43
# Poll result should contain both sockets
53
46
self.assertEquals(socks[s1], zmq.POLLOUT)
54
47
self.assertEquals(socks[s2], zmq.POLLOUT)
55
48
# Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
56
s1.send('msg1'.encode())
57
s2.send('msg2'.encode())
59
52
socks = dict(poller.poll())
60
53
self.assertEquals(socks[s1], zmq.POLLOUT|zmq.POLLIN)
88
81
self.assertEquals(socks[s2], zmq.POLLOUT)
90
83
# Make sure that s2 goes immediately into state 0 after send.
91
s2.send('msg1'.encode())
92
85
socks = dict(poller.poll())
93
86
self.assertEquals(s2 in socks, 0)
103
96
self.assertEquals(socks[s1], zmq.POLLOUT)
105
98
# Make sure s1 goes into state 0 after send.
106
s1.send('msg2'.encode())
107
100
socks = dict(poller.poll())
108
101
self.assertEquals(s1 in socks, 0)
123
116
# Wait for everything to finish.
119
def test_no_events(self):
120
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
121
poller = self.Poller()
122
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
123
poller.register(s2, 0)
124
self.assertTrue(s1 in poller.sockets)
125
self.assertFalse(s2 in poller.sockets)
126
poller.register(s1, 0)
127
self.assertFalse(s1 in poller.sockets)
126
129
def test_pubsub(self):
127
130
s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
128
s2.setsockopt(zmq.SUBSCRIBE, ''.encode())
131
s2.setsockopt(zmq.SUBSCRIBE, b'')
130
133
# Sleep to allow sockets to connect.
133
poller = zmq.Poller()
136
poller = self.Poller()
134
137
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
135
poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
138
poller.register(s2, zmq.POLLIN)
137
140
# Now make sure that both are send ready.
138
141
socks = dict(poller.poll())
139
142
self.assertEquals(socks[s1], zmq.POLLOUT)
140
143
self.assertEquals(s2 in socks, 0)
141
144
# Make sure that s1 stays in POLLOUT after a send.
142
s1.send('msg1'.encode())
143
146
socks = dict(poller.poll())
144
147
self.assertEquals(socks[s1], zmq.POLLOUT)
159
162
# Wait for everything to finish.
164
def test_timeout(self):
165
"""make sure Poller.poll timeout has the right units (milliseconds)."""
166
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
167
poller = self.Poller()
168
poller.register(s1, zmq.POLLIN)
170
evt = poller.poll(.005)
172
self.assertTrue(toc-tic < 0.1)
176
self.assertTrue(toc-tic < 0.1)
177
self.assertTrue(toc-tic > .001)
179
evt = poller.poll(500)
181
self.assertTrue(toc-tic < 1)
182
self.assertTrue(toc-tic > 0.1)
162
184
class TestSelect(PollZMQTestCase):
164
# This test is failing due to this issue:
165
# http://github.com/sustrik/zeromq2/issues#issue/26
166
186
def test_pair(self):
167
187
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
175
195
self.assert_(s1 not in rlist)
176
196
self.assert_(s2 not in rlist)
198
def test_timeout(self):
199
"""make sure select timeout has the right units (seconds)."""
200
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
202
r,w,x = zmq.select([s1,s2],[],[],.005)
204
self.assertTrue(toc-tic < 1)
205
self.assertTrue(toc-tic > 0.001)
207
r,w,x = zmq.select([s1,s2],[],[],.25)
209
self.assertTrue(toc-tic < 1)
210
self.assertTrue(toc-tic > 0.1)
215
from zmq import green as gzmq
217
class TestPollGreen(GreenTest, TestPoll):
220
def test_wakeup(self):
221
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
222
poller = self.Poller()
223
poller.register(s2, zmq.POLLIN)
226
r = gevent.spawn(lambda: poller.poll(10000))
227
s = gevent.spawn(lambda: s1.send(b'msg1'))
230
self.assertTrue(toc-tic < 1)