~ubuntu-branches/ubuntu/quantal/python-gevent/quantal

« back to all changes in this revision

Viewing changes to greentest/test__queue.py

  • Committer: Bazaar Package Importer
  • Author(s): Örjan Persson
  • Date: 2011-05-17 16:43:20 UTC
  • mfrom: (6.1.1 sid)
  • Revision ID: james.westby@ubuntu.com-20110517164320-nwhld2xo6sz58p4m
Tags: 0.13.6-1
New upstream version (Closes: #601863).

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
 
16
16
    def test_send_last(self):
17
17
        q = queue.Queue()
 
18
 
18
19
        def waiter(q):
19
20
            timer = gevent.Timeout.start_new(0.1)
20
21
            try:
27
28
        gevent.sleep(0.01)
28
29
        q.put('hi2')
29
30
        gevent.sleep(0.01)
30
 
        assert p.get(timeout=0)=="OK"
 
31
        assert p.get(timeout=0) == "OK"
31
32
 
32
33
    def test_max_size(self):
33
34
        q = queue.Queue(2)
50
51
        self.assertEquals(results, ['a', 'b', 'c'])
51
52
        self.assertEquals(q.get(), 'b')
52
53
        self.assertEquals(q.get(), 'c')
53
 
        assert p.get(timeout=0)=="OK"
 
54
        assert p.get(timeout=0) == "OK"
54
55
 
55
56
    def test_zero_max_size(self):
56
57
        q = queue.Queue(0)
 
58
 
57
59
        def sender(evt, q):
58
60
            q.put('hi')
59
61
            evt.set('done')
69
71
        gevent.sleep(0.001)
70
72
        self.assert_(not e1.ready())
71
73
        p2 = gevent.spawn(receiver, e2, q)
72
 
        self.assertEquals(e2.get(),'hi')
73
 
        self.assertEquals(e1.get(),'done')
 
74
        self.assertEquals(e2.get(), 'hi')
 
75
        self.assertEquals(e1.get(), 'done')
74
76
        timeout = gevent.Timeout.start_new(0)
75
77
        try:
76
78
            gevent.joinall([p1, p2])
87
89
        sendings = ['1', '2', '3', '4']
88
90
        evts = [AsyncResult() for x in sendings]
89
91
        for i, x in enumerate(sendings):
90
 
            gevent.spawn(waiter, q, evts[i]) # use waitall for them
 
92
            gevent.spawn(waiter, q, evts[i])  # use waitall for them
91
93
 
92
 
        gevent.sleep(0.01) # get 'em all waiting
 
94
        gevent.sleep(0.01)  # get 'em all waiting
93
95
 
94
96
        results = set()
 
97
 
95
98
        def collect_pending_results():
96
99
            for i, e in enumerate(evts):
97
100
                timer = gevent.Timeout.start_new(0.001)
138
141
        self.assertEquals(q.get(), 'sent')
139
142
 
140
143
    def test_two_waiters_one_dies(self):
 
144
 
141
145
        def waiter(q, evt):
142
146
            evt.set(q.get())
 
147
 
143
148
        def do_receive(q, evt):
144
149
            timeout = gevent.Timeout.start_new(0, RuntimeError())
145
150
            try:
243
248
    def test_put_nowait_simple(self):
244
249
        result = []
245
250
        q = queue.Queue(1)
 
251
 
246
252
        def store_result(func, *args):
247
253
            result.append(func(*args))
 
254
 
248
255
        core.active_event(store_result, util.wrap_errors(Exception, q.put_nowait), 2)
249
256
        core.active_event(store_result, util.wrap_errors(Exception, q.put_nowait), 3)
250
257
        gevent.sleep(0)
251
 
        assert len(result)==2, result
252
 
        assert result[0]==None, result
 
258
        assert len(result) == 2, result
 
259
        assert result[0] == None, result
253
260
        assert isinstance(result[1], queue.Full), result
254
261
 
255
262
    def test_get_nowait_simple(self):
256
263
        result = []
257
264
        q = queue.Queue(1)
258
265
        q.put(4)
 
266
 
259
267
        def store_result(func, *args):
260
268
            result.append(func(*args))
 
269
 
261
270
        core.active_event(store_result, util.wrap_errors(Exception, q.get_nowait))
262
271
        core.active_event(store_result, util.wrap_errors(Exception, q.get_nowait))
263
272
        gevent.sleep(0)
264
 
        assert len(result)==2, result
265
 
        assert result[0]==4, result
 
273
        assert len(result) == 2, result
 
274
        assert result[0] == 4, result
266
275
        assert isinstance(result[1], queue.Empty), result
267
276
 
268
277
    # get_nowait must work from the mainloop
270
279
        result = []
271
280
        q = queue.Queue(0)
272
281
        p = gevent.spawn(q.put, 5)
 
282
 
273
283
        def store_result(func, *args):
274
284
            result.append(func(*args))
 
285
 
275
286
        assert q.empty(), q
276
287
        assert q.full(), q
277
288
        gevent.sleep(0)
291
302
        result = []
292
303
        q = queue.Queue(0)
293
304
        p = gevent.spawn(q.get)
 
305
 
294
306
        def store_result(func, *args):
295
307
            result.append(func(*args))
 
308
 
296
309
        assert q.empty(), q
297
310
        assert q.full(), q
298
311
        gevent.sleep(0)
307
320
        assert q.empty(), q
308
321
 
309
322
 
310
 
if __name__=='__main__':
 
323
class TestJoinEmpty(TestCase):
 
324
 
 
325
    def test_issue_45(self):
 
326
        """Test that join() exits immediatelly if not jobs were put into the queue"""
 
327
        self.switch_expected = False
 
328
        q = queue.JoinableQueue()
 
329
        q.join()
 
330
 
 
331
 
 
332
if __name__ == '__main__':
311
333
    main()