~ubuntu-branches/ubuntu/maverick/python3.1/maverick

« back to all changes in this revision

Viewing changes to Lib/test/test_poll.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-03-23 00:01:27 UTC
  • Revision ID: james.westby@ubuntu.com-20090323000127-5fstfxju4ufrhthq
Tags: upstream-3.1~a1+20090322
ImportĀ upstreamĀ versionĀ 3.1~a1+20090322

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Test case for the os.poll() function
 
2
 
 
3
import os, select, random, unittest
 
4
from test.support import TestSkipped, TESTFN, run_unittest
 
5
 
 
6
try:
 
7
    select.poll
 
8
except AttributeError:
 
9
    raise TestSkipped("select.poll not defined -- skipping test_poll")
 
10
 
 
11
 
 
12
def find_ready_matching(ready, flag):
 
13
    match = []
 
14
    for fd, mode in ready:
 
15
        if mode & flag:
 
16
            match.append(fd)
 
17
    return match
 
18
 
 
19
class PollTests(unittest.TestCase):
 
20
 
 
21
    def test_poll1(self):
 
22
        # Basic functional test of poll object
 
23
        # Create a bunch of pipe and test that poll works with them.
 
24
 
 
25
        p = select.poll()
 
26
 
 
27
        NUM_PIPES = 12
 
28
        MSG = b" This is a test."
 
29
        MSG_LEN = len(MSG)
 
30
        readers = []
 
31
        writers = []
 
32
        r2w = {}
 
33
        w2r = {}
 
34
 
 
35
        for i in range(NUM_PIPES):
 
36
            rd, wr = os.pipe()
 
37
            p.register(rd)
 
38
            p.modify(rd, select.POLLIN)
 
39
            p.register(wr, select.POLLOUT)
 
40
            readers.append(rd)
 
41
            writers.append(wr)
 
42
            r2w[rd] = wr
 
43
            w2r[wr] = rd
 
44
 
 
45
        bufs = []
 
46
 
 
47
        while writers:
 
48
            ready = p.poll()
 
49
            ready_writers = find_ready_matching(ready, select.POLLOUT)
 
50
            if not ready_writers:
 
51
                raise RuntimeError("no pipes ready for writing")
 
52
            wr = random.choice(ready_writers)
 
53
            os.write(wr, MSG)
 
54
 
 
55
            ready = p.poll()
 
56
            ready_readers = find_ready_matching(ready, select.POLLIN)
 
57
            if not ready_readers:
 
58
                raise RuntimeError("no pipes ready for reading")
 
59
            rd = random.choice(ready_readers)
 
60
            buf = os.read(rd, MSG_LEN)
 
61
            self.assertEqual(len(buf), MSG_LEN)
 
62
            bufs.append(buf)
 
63
            os.close(r2w[rd]) ; os.close( rd )
 
64
            p.unregister( r2w[rd] )
 
65
            p.unregister( rd )
 
66
            writers.remove(r2w[rd])
 
67
 
 
68
        self.assertEqual(bufs, [MSG] * NUM_PIPES)
 
69
 
 
70
    def poll_unit_tests(self):
 
71
        # returns NVAL for invalid file descriptor
 
72
        FD = 42
 
73
        try:
 
74
            os.close(FD)
 
75
        except OSError:
 
76
            pass
 
77
        p = select.poll()
 
78
        p.register(FD)
 
79
        r = p.poll()
 
80
        self.assertEqual(r[0], (FD, select.POLLNVAL))
 
81
 
 
82
        f = open(TESTFN, 'w')
 
83
        fd = f.fileno()
 
84
        p = select.poll()
 
85
        p.register(f)
 
86
        r = p.poll()
 
87
        self.assertEqual(r[0][0], fd)
 
88
        f.close()
 
89
        r = p.poll()
 
90
        self.assertEqual(r[0], (fd, select.POLLNVAL))
 
91
        os.unlink(TESTFN)
 
92
 
 
93
        # type error for invalid arguments
 
94
        p = select.poll()
 
95
        self.assertRaises(TypeError, p.register, p)
 
96
        self.assertRaises(TypeError, p.unregister, p)
 
97
 
 
98
        # can't unregister non-existent object
 
99
        p = select.poll()
 
100
        self.assertRaises(KeyError, p.unregister, 3)
 
101
 
 
102
        # Test error cases
 
103
        pollster = select.poll()
 
104
        class Nope:
 
105
            pass
 
106
 
 
107
        class Almost:
 
108
            def fileno(self):
 
109
                return 'fileno'
 
110
 
 
111
        self.assertRaises(TypeError, pollster.register, Nope(), 0)
 
112
        self.assertRaises(TypeError, pollster.register, Almost(), 0)
 
113
 
 
114
    # Another test case for poll().  This is copied from the test case for
 
115
    # select(), modified to use poll() instead.
 
116
 
 
117
    def test_poll2(self):
 
118
        cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
 
119
        p = os.popen(cmd, 'r')
 
120
        pollster = select.poll()
 
121
        pollster.register( p, select.POLLIN )
 
122
        for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
 
123
            fdlist = pollster.poll(tout)
 
124
            if (fdlist == []):
 
125
                continue
 
126
            fd, flags = fdlist[0]
 
127
            if flags & select.POLLHUP:
 
128
                line = p.readline()
 
129
                if line != "":
 
130
                    self.fail('error: pipe seems to be closed, but still returns data')
 
131
                continue
 
132
 
 
133
            elif flags & select.POLLIN:
 
134
                line = p.readline()
 
135
                if not line:
 
136
                    break
 
137
                continue
 
138
            else:
 
139
                self.fail('Unexpected return value from select.poll: %s' % fdlist)
 
140
        p.close()
 
141
 
 
142
    def test_poll3(self):
 
143
        # test int overflow
 
144
        pollster = select.poll()
 
145
        pollster.register(1)
 
146
 
 
147
        self.assertRaises(OverflowError, pollster.poll, 1 << 64)
 
148
 
 
149
        x = 2 + 3
 
150
        if x != 5:
 
151
            self.fail('Overflow must have occurred')
 
152
 
 
153
def test_main():
 
154
    run_unittest(PollTests)
 
155
 
 
156
if __name__ == '__main__':
 
157
    test_main()