~ubuntu-branches/debian/jessie/python-eventlet/jessie

« back to all changes in this revision

Viewing changes to tests/zmq_test.py

  • Committer: Bazaar Package Importer
  • Author(s): Stefano Rivera
  • Date: 2011-06-02 16:18:16 UTC
  • mfrom: (1.1.4 upstream)
  • Revision ID: james.westby@ubuntu.com-20110602161816-c888ncsqx70pfvfu
Tags: 0.9.15-1
* New upstream release.
  - Drop all patches, accepted upstream.
* Correct DEP3 headers (first line of Description is the subject)
* Bump Standards-Version to 3.9.2, no changes needed.
* Drop Breaks: ${python:Breaks}, no longer used by dh_python2.
* debian/copyright: Update to DEP5 Format r174.
* Restore doc/modules/zmq.rst and BD on Sphinx 1.0.
* reuseaddr.patch: The logic for deciding whether to use SO_REUSEADDR was
  inverted.
* retry-on-timeout.patch: If an operation times out, try one last time.
  (LP: #771512)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
from eventlet import event, spawn, sleep, patcher
2
2
from eventlet.hubs import get_hub, _threadlocal, use_hub
3
3
from nose.tools import *
4
 
from tests import mock, LimitedTestCase, skip_unless_zmq
 
4
from tests import mock, LimitedTestCase, using_pyevent, skip_unless
5
5
from unittest import TestCase
6
6
 
7
7
from threading import Thread
8
8
try:
9
9
    from eventlet.green import zmq
10
 
    from eventlet.hubs.zeromq import Hub
11
10
except ImportError:
12
 
    zmq = None
13
 
    Hub = None
 
11
    zmq = {}    # for systems lacking zmq, skips tests instead of barfing
14
12
 
 
13
def zmq_supported(_):
 
14
    try:
 
15
        import zmq
 
16
    except ImportError:
 
17
        return False
 
18
    return not using_pyevent(_)
15
19
 
16
20
class TestUpstreamDownStream(LimitedTestCase):
17
21
 
44
48
        else:
45
49
            self.fail("Function did not raise any error")
46
50
 
47
 
    @skip_unless_zmq
 
51
    @skip_unless(zmq_supported)
48
52
    def test_recv_spawned_before_send_is_non_blocking(self):
49
53
        req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
50
 
#        req.connect(ipc)
51
 
#        rep.bind(ipc)
 
54
#       req.connect(ipc)
 
55
#       rep.bind(ipc)
52
56
        sleep()
53
57
        msg = dict(res=None)
54
58
        done = event.Event()
62
66
        done.wait()
63
67
        self.assertEqual(msg['res'], 'test')
64
68
 
65
 
    @skip_unless_zmq
 
69
    @skip_unless(zmq_supported)
66
70
    def test_close_socket_raises_enotsup(self):
67
71
        req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
68
72
 
71
75
        self.assertRaisesErrno(zmq.ENOTSUP, rep.recv)
72
76
        self.assertRaisesErrno(zmq.ENOTSUP, req.send, 'test')
73
77
 
74
 
    @skip_unless_zmq
 
78
    @skip_unless(zmq_supported)
75
79
    def test_send_1k_req_rep(self):
76
80
        req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
77
81
        sleep()
98
102
        final_i = done.wait()
99
103
        self.assertEqual(final_i, 0)
100
104
 
101
 
    @skip_unless_zmq
 
105
    @skip_unless(zmq_supported)
102
106
    def test_send_1k_push_pull(self):
103
107
        down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
104
108
        sleep()
122
126
        final_i = done.wait()
123
127
        self.assertEqual(final_i, 0)
124
128
 
125
 
    @skip_unless_zmq
 
129
    @skip_unless(zmq_supported)
126
130
    def test_send_1k_pub_sub(self):
127
131
        pub, sub_all, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
128
132
        sub1 = self.context.socket(zmq.SUB)
171
175
        self.assertEqual(sub2_count, 500)
172
176
        self.assertEqual(sub_all_count, 1000)
173
177
 
174
 
    @skip_unless_zmq
 
178
    @skip_unless(zmq_supported)
175
179
    def test_change_subscription(self):
176
180
        pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
177
181
        sub.setsockopt(zmq.SUBSCRIBE, 'test')
210
214
        rx_count = sub_done.wait()
211
215
        self.assertEqual(rx_count, 50)
212
216
 
213
 
    @skip_unless_zmq
 
217
    @skip_unless(zmq_supported)
214
218
    def test_recv_multipart_bug68(self):
215
219
        req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
216
220
        msg = ['']
228
232
        # but it's private __str__ appears to be the way to go
229
233
        self.assertEqual([str(m) for m in recieved_msg], msg2)
230
234
 
 
235
    @skip_unless(zmq_supported)
 
236
    def test_recv_noblock_bug76(self):
 
237
        req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
 
238
        self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK)
 
239
        self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK, True)
 
240
 
 
241
 
231
242
 
232
243
class TestThreadedContextAccess(TestCase):
233
244
    """zmq's Context must be unique within a hub
241
252
    in the same context
242
253
    """
243
254
    if zmq:  # don't call decorators if zmq module unavailable
244
 
        @skip_unless_zmq
245
 
        @mock.patch('eventlet.green.zmq.get_hub_name_from_instance')
246
 
        @mock.patch('eventlet.green.zmq.get_hub', spec=Hub)
247
 
        def test_context_factory_funtion(self, get_hub_mock, hub_name_mock):
248
 
            hub_name_mock.return_value = 'zeromq'
 
255
        @skip_unless(zmq_supported)
 
256
        def test_context_factory_function(self):
249
257
            ctx = zmq.Context()
250
 
            self.assertTrue(get_hub_mock().get_context.called)
 
258
            self.assertTrue(ctx is not None)
251
259
 
252
 
        @skip_unless_zmq
 
260
        @skip_unless(zmq_supported)
253
261
        def test_threadlocal_context(self):
254
 
            hub = get_hub()
255
262
            context = zmq.Context()
256
263
            self.assertEqual(context, _threadlocal.context)
257
 
            next_context = hub.get_context()
 
264
            next_context = zmq.Context()
258
265
            self.assertTrue(context is next_context)
259
266
 
260
 
        @skip_unless_zmq
 
267
        @skip_unless(zmq_supported)
261
268
        def test_different_context_in_different_thread(self):
262
269
            context = zmq.Context()
263
270
            test_result = []
264
271
            def assert_different(ctx):
265
 
                hub = get_hub()
266
272
                try:
267
273
                    this_thread_context = zmq.Context()
268
274
                except:
269
275
                    test_result.append('fail')
270
276
                    raise
271
277
                test_result.append(ctx is this_thread_context)
 
278
 
272
279
            Thread(target=assert_different, args=(context,)).start()
273
280
            while not test_result:
274
281
                sleep(0.1)
275
282
            self.assertFalse(test_result[0])
276
283
 
277
 
 
278
 
class TestCheckingForZMQHub(TestCase):
279
 
 
280
 
    @skip_unless_zmq
281
 
    def setUp(self):
282
 
        self.orig_hub = zmq.get_hub_name_from_instance(get_hub())
283
 
        use_hub('selects')
284
 
 
285
 
    def tearDown(self):
286
 
        use_hub(self.orig_hub)
287
 
 
288
 
    def test_assertionerror_raise_by_context(self):
289
 
        self.assertRaises(RuntimeError, zmq.Context)
290
 
 
291
 
 
292
 
 
293
 
 
294