58
59
self.assertRaises(greenlet.LinkedCompleted, receiver2.get)
60
61
def test_link_to_inactive_greenlet(self):
61
p = gevent.spawn(lambda : 100)
62
p = gevent.spawn(lambda: 100)
62
63
receiver = gevent.spawn_later(10000, sleep, 1)
65
66
self.assertRaises(greenlet.LinkedCompleted, receiver.get)
67
receiver.kill(block=True)
69
70
def test_link_to_asyncresult(self):
70
p = gevent.spawn(lambda : 100)
71
p = gevent.spawn(lambda: 100)
71
72
event = AsyncResult()
73
74
self.assertEqual(event.get(), 100)
100
101
self.assertEqual(q.get().get(), 100)
102
103
def test_link_to_channel(self):
103
p1 = gevent.spawn(lambda : 101)
104
p2 = gevent.spawn(lambda : 102)
105
p3 = gevent.spawn(lambda : 103)
104
p1 = gevent.spawn(lambda: 101)
105
p2 = gevent.spawn(lambda: 102)
106
p3 = gevent.spawn(lambda: 103)
111
112
assert sorted(results) == [101, 102, 103], results
113
114
def test_link_to_current(self):
114
p = gevent.spawn(lambda : 100)
115
p = gevent.spawn(lambda: 100)
116
117
self.assertRaises(greenlet.LinkedCompleted, sleep, 0.1)
117
118
self.assertRaises(greenlet.LinkedCompleted, p.link)
127
128
def _test_func(self, link):
130
assert len(p._links)==1, p._links
131
assert len(p._links) == 1, p._links
131
132
p.unlink(test_func)
132
133
assert not p._links, p._links
135
assert len(p._links)==1, p._links
136
assert len(p._links) == 1, p._links
136
137
p.unlink(self.setUp)
137
138
assert not p._links, p._links
148
149
def _test_greenlet(self, link):
150
151
link(getcurrent())
151
assert len(p._links)==1, p._links
152
assert len(p._links) == 1, p._links
152
153
p.unlink(getcurrent())
153
154
assert not p._links, p._links
155
156
g = gevent.Greenlet()
157
assert len(p._links)==1, p._links
158
assert len(p._links) == 1, p._links
159
160
assert not p._links, p._links
173
174
def link(self, p, listener=None):
174
175
getattr(p, self.link_method)(listener)
177
greentest.TestCase.tearDown(self)
177
def receiverf(self, proc_flag):
179
proc_flag.append('finished')
180
181
def set_links(self, p, first_time, kill_exc_type):
181
182
event = AsyncResult()
182
183
self.link(p, event)
187
proc_flag.append('finished')
188
receiver = gevent.spawn(receiver)
187
receiver = gevent.spawn(self.receiverf, proc_flag)
189
188
self.link(p, receiver)
205
204
for _ in range(10):
206
205
self.link(p, AsyncResult())
207
206
self.link(p, Queue(1).put)
208
208
return event, receiver, proc_flag, queue, callback_flag
210
def myprocf(self, proc_finished_flag):
212
proc_finished_flag.append('finished')
210
215
def set_links_timeout(self, link):
211
216
# stuff that won't be touched
212
217
event = AsyncResult()
215
220
proc_finished_flag = []
218
proc_finished_flag.append('finished')
220
myproc = gevent.spawn(myproc)
221
myproc = gevent.spawn(self.myprocf, proc_finished_flag)
229
230
assert with_timeout(DELAY, queue.get, timeout_value=X) is X, queue.get()
230
231
assert with_timeout(DELAY, gevent.joinall, [myproc], timeout_value=X) is X
231
232
assert proc_finished_flag == [], proc_finished_flag
234
244
class TestReturn_link(LinksTestCase):
235
245
link_method = 'link'
237
251
def test_return(self):
240
p = self.p = gevent.spawn(return25)
241
self._test_return(p, True, 25, greenlet.LinkedCompleted, lambda : sleep(0))
252
self.p = gevent.spawn(return25)
253
self._test_return(self.p, True, 25, greenlet.LinkedCompleted, sleep0)
242
254
# repeating the same with dead process
243
255
for _ in xrange(3):
244
self._test_return(p, False, 25, greenlet.LinkedCompleted, lambda : sleep(0))
256
self._test_return(self.p, False, 25, greenlet.LinkedCompleted, sleep0)
246
260
def _test_return(self, p, first_time, result, kill_exc_type, action):
247
261
event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
333
347
self.check_timed_out(*xxxxx)
335
349
def test_raise(self):
336
p = self.p = gevent.spawn(lambda : getcurrent().throw(ExpectedError('test_raise')))
350
p = self.p = gevent.spawn(lambda: getcurrent().throw(ExpectedError('test_raise')))
337
351
self._test_raise(p, True, greenlet.LinkedFailed)
338
352
# repeating the same with dead process
339
353
for _ in xrange(3):
340
354
self._test_raise(p, False, greenlet.LinkedFailed)
342
357
class TestRaise_link_exception(TestRaise_link):
343
358
link_method = 'link_exception'
346
361
class TestStuff(greentest.TestCase):
348
363
def test_wait_noerrors(self):
349
x = gevent.spawn(lambda : 1)
350
y = gevent.spawn(lambda : 2)
351
z = gevent.spawn(lambda : 3)
364
x = gevent.spawn(lambda: 1)
365
y = gevent.spawn(lambda: 2)
366
z = gevent.spawn(lambda: 3)
352
367
gevent.joinall([x, y, z], raise_error=True)
353
368
self.assertEqual([x.value, y.value, z.value], [1, 2, 3])
354
369
e = AsyncResult()
376
391
self.assertEqual(z.get(), 3)
377
392
self.assertRaises(ExpectedError, gevent.joinall, [y], raise_error=True)
379
def test_wait_all_exception_order(self):
394
def test_joinall_exception_order(self):
380
395
# if there're several exceptions raised, the earliest one must be raised by joinall
383
398
raise ExpectedError('first')
384
399
a = gevent.spawn(first)
385
b = gevent.spawn(lambda : getcurrent().throw(ExpectedError('second')))
400
b = gevent.spawn(lambda: getcurrent().throw(ExpectedError('second')))
387
402
gevent.joinall([a, b], raise_error=True)
388
403
except ExpectedError, ex:
389
404
assert 'second' in str(ex), repr(str(ex))
405
gevent.joinall([a, b])
391
407
def test_multiple_listeners_error(self):
392
408
# if there was an error while calling a callback
393
409
# it should not prevent the other listeners from being called
394
410
# also, all of the errors should be logged, check the output
395
411
# manually that they are
396
p = gevent.spawn(lambda : 5)
412
p = gevent.spawn(lambda: 5)
398
415
def listener1(*args):
399
416
results.append(10)
400
417
raise ExpectedError('listener1')
401
419
def listener2(*args):
402
420
results.append(20)
403
421
raise ExpectedError('listener2')
404
423
def listener3(*args):
405
424
raise ExpectedError('listener3')
406
426
p.link(listener1)
407
427
p.link(listener2)
408
428
p.link(listener3)
410
430
assert results in [[10, 20], [20, 10]], results
412
p = gevent.spawn(lambda : getcurrent().throw(ExpectedError('test_multiple_listeners_error')))
432
p = gevent.spawn(lambda: getcurrent().throw(ExpectedError('test_multiple_listeners_error')))
414
434
p.link(listener1)
415
435
p.link(listener2)
416
436
p.link(listener3)
418
438
assert results in [[10, 20], [20, 10]], results
440
class Results(object):
445
def listener1(self, p):
446
p.unlink(self.listener2)
447
self.results.append(5)
448
raise ExpectedError('listener1')
450
def listener2(self, p):
451
p.unlink(self.listener1)
452
self.results.append(5)
453
raise ExpectedError('listener2')
455
def listener3(self, p):
456
raise ExpectedError('listener3')
420
458
def _test_multiple_listeners_error_unlink(self, p, link):
421
459
# notification must not happen after unlink even
422
460
# though notification process has been already started
424
def listener1(*args):
427
raise ExpectedError('listener1')
428
def listener2(*args):
431
raise ExpectedError('listener2')
432
def listener3(*args):
433
raise ExpectedError('listener3')
438
assert results == [5], results
461
results = self.Results()
463
link(results.listener1)
464
link(results.listener2)
465
link(results.listener3)
467
assert results.results == [5], results.results
440
469
def test_multiple_listeners_error_unlink_Greenlet_link(self):
441
p = gevent.spawn(lambda : 5)
470
p = gevent.spawn(lambda: 5)
442
471
self._test_multiple_listeners_error_unlink(p, p.link)
444
474
def test_multiple_listeners_error_unlink_Greenlet_rawlink(self):
445
p = gevent.spawn(lambda : 5)
475
p = gevent.spawn(lambda: 5)
446
476
self._test_multiple_listeners_error_unlink(p, p.rawlink)
448
478
def test_multiple_listeners_error_unlink_AsyncResult_rawlink(self):
503
538
class TestJoin(greentest.GenericWaitTestCase):
505
540
def wait(self, timeout):
506
gevent.spawn(gevent.sleep, 10).join(timeout=timeout)
541
self.g = gevent.spawn(gevent.sleep, 10)
542
return self.g.join(timeout=timeout)
508
548
class TestGet(greentest.GenericGetTestCase):
510
550
def wait(self, timeout):
511
gevent.spawn(gevent.sleep, 10).get(timeout=timeout)
551
self.g = gevent.spawn(gevent.sleep, 10)
552
return self.g.get(timeout=timeout)
513
558
class TestJoinAll(greentest.GenericWaitTestCase):
515
560
def wait(self, timeout):
516
gevent.joinall([gevent.spawn(gevent.sleep, 10)], timeout=timeout)
561
self.g = gevent.spawn(gevent.sleep, 10)
562
gevent.joinall([self.g], timeout=timeout)
519
568
class TestBasic(greentest.TestCase):
521
570
def test_simple_exit(self):
523
573
def func(delay, return_value=4):
524
574
gevent.sleep(delay)
525
575
return return_value
526
577
g = gevent.Greenlet(func, 0.01, return_value=5)
527
578
g.link(lambda x: link_test.append(x))
560
611
assert g.successful()
561
612
assert g.value == 5
562
assert g.exception is None # not changed
563
assert link_test == [g] # changed
613
assert g.exception is None # not changed
614
assert link_test == [g] # changed
565
616
def test_error_exit(self):
567
619
def func(delay, return_value=4):
568
620
gevent.sleep(delay)
569
621
error = ExpectedError('test_error_exit')
570
622
error.myattr = return_value
572
625
g = gevent.Greenlet(func, 0.001, return_value=5)
573
626
g.link(lambda x: link_test.append(x))