1
1
from eventlet import event, spawn, sleep, patcher
2
2
from eventlet.hubs import get_hub, _threadlocal, use_hub
3
3
from nose.tools import *
4
from tests import mock, LimitedTestCase, skip_unless_zmq
4
from tests import mock, LimitedTestCase, using_pyevent, skip_unless
5
5
from unittest import TestCase
7
7
from threading import Thread
9
9
from eventlet.green import zmq
10
from eventlet.hubs.zeromq import Hub
11
10
except ImportError:
11
zmq = {} # for systems lacking zmq, skips tests instead of barfing
18
return not using_pyevent(_)
16
20
class TestUpstreamDownStream(LimitedTestCase):
45
49
self.fail("Function did not raise any error")
51
@skip_unless(zmq_supported)
48
52
def test_recv_spawned_before_send_is_non_blocking(self):
49
53
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
53
57
msg = dict(res=None)
54
58
done = event.Event()
63
67
self.assertEqual(msg['res'], 'test')
69
@skip_unless(zmq_supported)
66
70
def test_close_socket_raises_enotsup(self):
67
71
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
71
75
self.assertRaisesErrno(zmq.ENOTSUP, rep.recv)
72
76
self.assertRaisesErrno(zmq.ENOTSUP, req.send, 'test')
78
@skip_unless(zmq_supported)
75
79
def test_send_1k_req_rep(self):
76
80
req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
98
102
final_i = done.wait()
99
103
self.assertEqual(final_i, 0)
105
@skip_unless(zmq_supported)
102
106
def test_send_1k_push_pull(self):
103
107
down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
122
126
final_i = done.wait()
123
127
self.assertEqual(final_i, 0)
129
@skip_unless(zmq_supported)
126
130
def test_send_1k_pub_sub(self):
127
131
pub, sub_all, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
128
132
sub1 = self.context.socket(zmq.SUB)
171
175
self.assertEqual(sub2_count, 500)
172
176
self.assertEqual(sub_all_count, 1000)
178
@skip_unless(zmq_supported)
175
179
def test_change_subscription(self):
176
180
pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
177
181
sub.setsockopt(zmq.SUBSCRIBE, 'test')
210
214
rx_count = sub_done.wait()
211
215
self.assertEqual(rx_count, 50)
217
@skip_unless(zmq_supported)
214
218
def test_recv_multipart_bug68(self):
215
219
req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
228
232
# but it's private __str__ appears to be the way to go
229
233
self.assertEqual([str(m) for m in recieved_msg], msg2)
235
@skip_unless(zmq_supported)
236
def test_recv_noblock_bug76(self):
237
req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
238
self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK)
239
self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK, True)
232
243
class TestThreadedContextAccess(TestCase):
233
244
"""zmq's Context must be unique within a hub
241
252
in the same context
243
254
if zmq: # don't call decorators if zmq module unavailable
245
@mock.patch('eventlet.green.zmq.get_hub_name_from_instance')
246
@mock.patch('eventlet.green.zmq.get_hub', spec=Hub)
247
def test_context_factory_funtion(self, get_hub_mock, hub_name_mock):
248
hub_name_mock.return_value = 'zeromq'
255
@skip_unless(zmq_supported)
256
def test_context_factory_function(self):
249
257
ctx = zmq.Context()
250
self.assertTrue(get_hub_mock().get_context.called)
258
self.assertTrue(ctx is not None)
260
@skip_unless(zmq_supported)
253
261
def test_threadlocal_context(self):
255
262
context = zmq.Context()
256
263
self.assertEqual(context, _threadlocal.context)
257
next_context = hub.get_context()
264
next_context = zmq.Context()
258
265
self.assertTrue(context is next_context)
267
@skip_unless(zmq_supported)
261
268
def test_different_context_in_different_thread(self):
262
269
context = zmq.Context()
264
271
def assert_different(ctx):
267
273
this_thread_context = zmq.Context()
269
275
test_result.append('fail')
271
277
test_result.append(ctx is this_thread_context)
272
279
Thread(target=assert_different, args=(context,)).start()
273
280
while not test_result:
275
282
self.assertFalse(test_result[0])
278
class TestCheckingForZMQHub(TestCase):
282
self.orig_hub = zmq.get_hub_name_from_instance(get_hub())
286
use_hub(self.orig_hub)
288
def test_assertionerror_raise_by_context(self):
289
self.assertRaises(RuntimeError, zmq.Context)