~ubuntu-branches/ubuntu/utopic/python-gevent/utopic-proposed

« back to all changes in this revision

Viewing changes to greentest/test__greenletset.py

  • Committer: Bazaar Package Importer
  • Author(s): Örjan Persson
  • Date: 2011-05-17 16:43:20 UTC
  • mfrom: (1.1.2 upstream)
  • Revision ID: james.westby@ubuntu.com-20110517164320-844bxblhlra65dml
Tags: 0.13.6-1
New upstream version (Closes: #601863).

Show diffs side-by-side

added added

removed removed

Lines of Context:
5
5
 
6
6
DELAY = 0.1
7
7
 
 
8
 
 
9
class SpecialError(Exception):
 
10
    pass
 
11
 
 
12
 
8
13
class Undead(object):
9
14
 
10
15
    def __init__(self):
14
19
        while True:
15
20
            try:
16
21
                gevent.sleep(1)
 
22
            except SpecialError:
 
23
                break
17
24
            except:
18
25
                self.shot_count += 1
19
26
 
 
27
 
20
28
class Test(greentest.TestCase):
21
29
 
22
30
    def test_basic(self):
23
31
        DELAY = 0.05
24
 
        s = pool.GreenletSet()
 
32
        s = pool.Group()
25
33
        s.spawn(gevent.sleep, DELAY)
26
 
        assert len(s)==1, s
27
 
        s.spawn(gevent.sleep, DELAY*2.)
28
 
        assert len(s)==2, s
29
 
        gevent.sleep(DELAY*3./2.)
30
 
        assert len(s)==1, s
 
34
        assert len(s) == 1, s
 
35
        s.spawn(gevent.sleep, DELAY * 2.)
 
36
        assert len(s) == 2, s
 
37
        gevent.sleep(DELAY * 3. / 2.)
 
38
        assert len(s) == 1, s
31
39
        gevent.sleep(DELAY)
32
40
        assert not s, s
33
41
 
34
42
    def test_waitall(self):
35
 
        s = pool.GreenletSet()
 
43
        s = pool.Group()
36
44
        s.spawn(gevent.sleep, DELAY)
37
 
        s.spawn(gevent.sleep, DELAY*2)
38
 
        assert len(s)==2, s
 
45
        s.spawn(gevent.sleep, DELAY * 2)
 
46
        assert len(s) == 2, s
39
47
        start = time.time()
40
48
        s.join(raise_error=True)
41
49
        delta = time.time() - start
42
50
        assert not s, s
43
 
        assert len(s)==0, s
44
 
        assert DELAY*1.9 <= delta <= DELAY*2.5, (delta, DELAY)
 
51
        assert len(s) == 0, s
 
52
        assert DELAY * 1.9 <= delta <= DELAY * 2.5, (delta, DELAY)
45
53
 
46
54
    def test_kill_block(self):
47
 
        s = pool.GreenletSet()
 
55
        s = pool.Group()
48
56
        s.spawn(gevent.sleep, DELAY)
49
 
        s.spawn(gevent.sleep, DELAY*2)
50
 
        assert len(s)==2, s
 
57
        s.spawn(gevent.sleep, DELAY * 2)
 
58
        assert len(s) == 2, s
51
59
        start = time.time()
52
 
        s.kill(block=True)
 
60
        s.kill()
53
61
        assert not s, s
54
 
        assert len(s)==0, s
 
62
        assert len(s) == 0, s
55
63
        delta = time.time() - start
56
 
        assert delta < DELAY*0.5, delta
 
64
        assert delta < DELAY * 0.8, delta
57
65
 
58
66
    def test_kill_noblock(self):
59
 
        s = pool.GreenletSet()
 
67
        s = pool.Group()
60
68
        s.spawn(gevent.sleep, DELAY)
61
 
        s.spawn(gevent.sleep, DELAY*2)
62
 
        assert len(s)==2, s
 
69
        s.spawn(gevent.sleep, DELAY * 2)
 
70
        assert len(s) == 2, s
63
71
        s.kill(block=False)
64
 
        assert len(s)==2, s
 
72
        assert len(s) == 2, s
65
73
        gevent.sleep(0)
66
74
        assert not s, s
67
 
        assert len(s)==0, s
 
75
        assert len(s) == 0, s
68
76
 
69
77
    def test_kill_fires_once(self):
70
78
        u1 = Undead()
71
79
        u2 = Undead()
72
80
        p1 = gevent.spawn(u1)
73
81
        p2 = gevent.spawn(u2)
 
82
 
74
83
        def check(count1, count2):
75
84
            assert p1, p1
76
85
            assert p2, p2
78
87
            assert not p2.dead, p2
79
88
            self.assertEqual(u1.shot_count, count1)
80
89
            self.assertEqual(u2.shot_count, count2)
 
90
 
81
91
        gevent.sleep(0.01)
82
 
        s = pool.GreenletSet([p1, p2])
 
92
        s = pool.Group([p1, p2])
83
93
        assert len(s) == 2, s
84
94
        check(0, 0)
85
95
        s.killone(p1, block=False)
102
112
        assert len(s) == 2, s
103
113
        check(1, 1)
104
114
 
 
115
        p1.kill(SpecialError)
 
116
        p2.kill(SpecialError)
 
117
 
105
118
    def test_killall_subclass(self):
106
 
        p1 = GreenletSubclass.spawn(lambda : 1/0)
107
 
        p2 = GreenletSubclass.spawn(lambda : gevent.sleep(10))
108
 
        s = pool.GreenletSet([p1, p2])
109
 
        s.kill(block=True)
 
119
        p1 = GreenletSubclass.spawn(lambda: 1 / 0)
 
120
        p2 = GreenletSubclass.spawn(lambda: gevent.sleep(10))
 
121
        s = pool.Group([p1, p2])
 
122
        s.kill()
110
123
 
111
124
 
112
125
class GreenletSubclass(gevent.Greenlet):
113
126
    pass
114
127
 
115
128
 
116
 
if __name__=='__main__':
 
129
if __name__ == '__main__':
117
130
    greentest.main()
118