~ubuntu-branches/ubuntu/trusty/python3.4/trusty-proposed

« back to all changes in this revision

Viewing changes to Lib/test/test_selectors.py

  • Committer: Package Import Robot
  • Author(s): Matthias Klose
  • Date: 2013-11-25 09:44:27 UTC
  • Revision ID: package-import@ubuntu.com-20131125094427-lzxj8ap5w01lmo7f
Tags: upstream-3.4~b1
ImportĀ upstreamĀ versionĀ 3.4~b1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import errno
 
2
import random
 
3
import selectors
 
4
import signal
 
5
import socket
 
6
from test import support
 
7
from time import sleep
 
8
import unittest
 
9
import unittest.mock
 
10
try:
 
11
    from time import monotonic as time
 
12
except ImportError:
 
13
    from time import time as time
 
14
try:
 
15
    import resource
 
16
except ImportError:
 
17
    resource = None
 
18
 
 
19
 
 
20
if hasattr(socket, 'socketpair'):
 
21
    socketpair = socket.socketpair
 
22
else:
 
23
    def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
 
24
        with socket.socket(family, type, proto) as l:
 
25
            l.bind((support.HOST, 0))
 
26
            l.listen(3)
 
27
            c = socket.socket(family, type, proto)
 
28
            try:
 
29
                c.connect(l.getsockname())
 
30
                caddr = c.getsockname()
 
31
                while True:
 
32
                    a, addr = l.accept()
 
33
                    # check that we've got the correct client
 
34
                    if addr == caddr:
 
35
                        return c, a
 
36
                    a.close()
 
37
            except OSError:
 
38
                c.close()
 
39
                raise
 
40
 
 
41
 
 
42
def find_ready_matching(ready, flag):
 
43
    match = []
 
44
    for key, events in ready:
 
45
        if events & flag:
 
46
            match.append(key.fileobj)
 
47
    return match
 
48
 
 
49
 
 
50
class BaseSelectorTestCase(unittest.TestCase):
 
51
 
 
52
    def test_register(self):
 
53
        s = self.SELECTOR()
 
54
        self.addCleanup(s.close)
 
55
 
 
56
        rd, wr = socketpair()
 
57
        self.addCleanup(rd.close)
 
58
        self.addCleanup(wr.close)
 
59
 
 
60
        key = s.register(rd, selectors.EVENT_READ, "data")
 
61
        self.assertIsInstance(key, selectors.SelectorKey)
 
62
        self.assertEqual(key.fileobj, rd)
 
63
        self.assertEqual(key.fd, rd.fileno())
 
64
        self.assertEqual(key.events, selectors.EVENT_READ)
 
65
        self.assertEqual(key.data, "data")
 
66
 
 
67
        # register an unknown event
 
68
        self.assertRaises(ValueError, s.register, 0, 999999)
 
69
 
 
70
        # register an invalid FD
 
71
        self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ)
 
72
 
 
73
        # register twice
 
74
        self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ)
 
75
 
 
76
        # register the same FD, but with a different object
 
77
        self.assertRaises(KeyError, s.register, rd.fileno(),
 
78
                          selectors.EVENT_READ)
 
79
 
 
80
    def test_unregister(self):
 
81
        s = self.SELECTOR()
 
82
        self.addCleanup(s.close)
 
83
 
 
84
        rd, wr = socketpair()
 
85
        self.addCleanup(rd.close)
 
86
        self.addCleanup(wr.close)
 
87
 
 
88
        s.register(rd, selectors.EVENT_READ)
 
89
        s.unregister(rd)
 
90
 
 
91
        # unregister an unknown file obj
 
92
        self.assertRaises(KeyError, s.unregister, 999999)
 
93
 
 
94
        # unregister twice
 
95
        self.assertRaises(KeyError, s.unregister, rd)
 
96
 
 
97
    def test_modify(self):
 
98
        s = self.SELECTOR()
 
99
        self.addCleanup(s.close)
 
100
 
 
101
        rd, wr = socketpair()
 
102
        self.addCleanup(rd.close)
 
103
        self.addCleanup(wr.close)
 
104
 
 
105
        key = s.register(rd, selectors.EVENT_READ)
 
106
 
 
107
        # modify events
 
108
        key2 = s.modify(rd, selectors.EVENT_WRITE)
 
109
        self.assertNotEqual(key.events, key2.events)
 
110
        self.assertEqual(key2, s.get_key(rd))
 
111
 
 
112
        s.unregister(rd)
 
113
 
 
114
        # modify data
 
115
        d1 = object()
 
116
        d2 = object()
 
117
 
 
118
        key = s.register(rd, selectors.EVENT_READ, d1)
 
119
        key2 = s.modify(rd, selectors.EVENT_READ, d2)
 
120
        self.assertEqual(key.events, key2.events)
 
121
        self.assertNotEqual(key.data, key2.data)
 
122
        self.assertEqual(key2, s.get_key(rd))
 
123
        self.assertEqual(key2.data, d2)
 
124
 
 
125
        # modify unknown file obj
 
126
        self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ)
 
127
 
 
128
        # modify use a shortcut
 
129
        d3 = object()
 
130
        s.register = unittest.mock.Mock()
 
131
        s.unregister = unittest.mock.Mock()
 
132
 
 
133
        s.modify(rd, selectors.EVENT_READ, d3)
 
134
        self.assertFalse(s.register.called)
 
135
        self.assertFalse(s.unregister.called)
 
136
 
 
137
    def test_close(self):
 
138
        s = self.SELECTOR()
 
139
        self.addCleanup(s.close)
 
140
 
 
141
        rd, wr = socketpair()
 
142
        self.addCleanup(rd.close)
 
143
        self.addCleanup(wr.close)
 
144
 
 
145
        s.register(rd, selectors.EVENT_READ)
 
146
        s.register(wr, selectors.EVENT_WRITE)
 
147
 
 
148
        s.close()
 
149
        self.assertRaises(KeyError, s.get_key, rd)
 
150
        self.assertRaises(KeyError, s.get_key, wr)
 
151
 
 
152
    def test_get_key(self):
 
153
        s = self.SELECTOR()
 
154
        self.addCleanup(s.close)
 
155
 
 
156
        rd, wr = socketpair()
 
157
        self.addCleanup(rd.close)
 
158
        self.addCleanup(wr.close)
 
159
 
 
160
        key = s.register(rd, selectors.EVENT_READ, "data")
 
161
        self.assertEqual(key, s.get_key(rd))
 
162
 
 
163
        # unknown file obj
 
164
        self.assertRaises(KeyError, s.get_key, 999999)
 
165
 
 
166
    def test_get_map(self):
 
167
        s = self.SELECTOR()
 
168
        self.addCleanup(s.close)
 
169
 
 
170
        rd, wr = socketpair()
 
171
        self.addCleanup(rd.close)
 
172
        self.addCleanup(wr.close)
 
173
 
 
174
        keys = s.get_map()
 
175
        self.assertFalse(keys)
 
176
        self.assertEqual(len(keys), 0)
 
177
        self.assertEqual(list(keys), [])
 
178
        key = s.register(rd, selectors.EVENT_READ, "data")
 
179
        self.assertIn(rd, keys)
 
180
        self.assertEqual(key, keys[rd])
 
181
        self.assertEqual(len(keys), 1)
 
182
        self.assertEqual(list(keys), [rd.fileno()])
 
183
        self.assertEqual(list(keys.values()), [key])
 
184
 
 
185
        # unknown file obj
 
186
        with self.assertRaises(KeyError):
 
187
            keys[999999]
 
188
 
 
189
        # Read-only mapping
 
190
        with self.assertRaises(TypeError):
 
191
            del keys[rd]
 
192
 
 
193
    def test_select(self):
 
194
        s = self.SELECTOR()
 
195
        self.addCleanup(s.close)
 
196
 
 
197
        rd, wr = socketpair()
 
198
        self.addCleanup(rd.close)
 
199
        self.addCleanup(wr.close)
 
200
 
 
201
        s.register(rd, selectors.EVENT_READ)
 
202
        wr_key = s.register(wr, selectors.EVENT_WRITE)
 
203
 
 
204
        result = s.select()
 
205
        for key, events in result:
 
206
            self.assertTrue(isinstance(key, selectors.SelectorKey))
 
207
            self.assertTrue(events)
 
208
            self.assertFalse(events & ~(selectors.EVENT_READ |
 
209
                                        selectors.EVENT_WRITE))
 
210
 
 
211
        self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result)
 
212
 
 
213
    def test_context_manager(self):
 
214
        s = self.SELECTOR()
 
215
        self.addCleanup(s.close)
 
216
 
 
217
        rd, wr = socketpair()
 
218
        self.addCleanup(rd.close)
 
219
        self.addCleanup(wr.close)
 
220
 
 
221
        with s as sel:
 
222
            sel.register(rd, selectors.EVENT_READ)
 
223
            sel.register(wr, selectors.EVENT_WRITE)
 
224
 
 
225
        self.assertRaises(KeyError, s.get_key, rd)
 
226
        self.assertRaises(KeyError, s.get_key, wr)
 
227
 
 
228
    def test_fileno(self):
 
229
        s = self.SELECTOR()
 
230
        self.addCleanup(s.close)
 
231
 
 
232
        if hasattr(s, 'fileno'):
 
233
            fd = s.fileno()
 
234
            self.assertTrue(isinstance(fd, int))
 
235
            self.assertGreaterEqual(fd, 0)
 
236
 
 
237
    def test_selector(self):
 
238
        s = self.SELECTOR()
 
239
        self.addCleanup(s.close)
 
240
 
 
241
        NUM_SOCKETS = 12
 
242
        MSG = b" This is a test."
 
243
        MSG_LEN = len(MSG)
 
244
        readers = []
 
245
        writers = []
 
246
        r2w = {}
 
247
        w2r = {}
 
248
 
 
249
        for i in range(NUM_SOCKETS):
 
250
            rd, wr = socketpair()
 
251
            self.addCleanup(rd.close)
 
252
            self.addCleanup(wr.close)
 
253
            s.register(rd, selectors.EVENT_READ)
 
254
            s.register(wr, selectors.EVENT_WRITE)
 
255
            readers.append(rd)
 
256
            writers.append(wr)
 
257
            r2w[rd] = wr
 
258
            w2r[wr] = rd
 
259
 
 
260
        bufs = []
 
261
 
 
262
        while writers:
 
263
            ready = s.select()
 
264
            ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE)
 
265
            if not ready_writers:
 
266
                self.fail("no sockets ready for writing")
 
267
            wr = random.choice(ready_writers)
 
268
            wr.send(MSG)
 
269
 
 
270
            for i in range(10):
 
271
                ready = s.select()
 
272
                ready_readers = find_ready_matching(ready,
 
273
                                                    selectors.EVENT_READ)
 
274
                if ready_readers:
 
275
                    break
 
276
                # there might be a delay between the write to the write end and
 
277
                # the read end is reported ready
 
278
                sleep(0.1)
 
279
            else:
 
280
                self.fail("no sockets ready for reading")
 
281
            self.assertEqual([w2r[wr]], ready_readers)
 
282
            rd = ready_readers[0]
 
283
            buf = rd.recv(MSG_LEN)
 
284
            self.assertEqual(len(buf), MSG_LEN)
 
285
            bufs.append(buf)
 
286
            s.unregister(r2w[rd])
 
287
            s.unregister(rd)
 
288
            writers.remove(r2w[rd])
 
289
 
 
290
        self.assertEqual(bufs, [MSG] * NUM_SOCKETS)
 
291
 
 
292
    def test_timeout(self):
 
293
        s = self.SELECTOR()
 
294
        self.addCleanup(s.close)
 
295
 
 
296
        rd, wr = socketpair()
 
297
        self.addCleanup(rd.close)
 
298
        self.addCleanup(wr.close)
 
299
 
 
300
        s.register(wr, selectors.EVENT_WRITE)
 
301
        t = time()
 
302
        self.assertEqual(1, len(s.select(0)))
 
303
        self.assertEqual(1, len(s.select(-1)))
 
304
        self.assertLess(time() - t, 0.5)
 
305
 
 
306
        s.unregister(wr)
 
307
        s.register(rd, selectors.EVENT_READ)
 
308
        t = time()
 
309
        self.assertFalse(s.select(0))
 
310
        self.assertFalse(s.select(-1))
 
311
        self.assertLess(time() - t, 0.5)
 
312
 
 
313
        t0 = time()
 
314
        self.assertFalse(s.select(1))
 
315
        t1 = time()
 
316
        dt = t1 - t0
 
317
        self.assertTrue(0.8 <= dt <= 1.6, dt)
 
318
 
 
319
    @unittest.skipUnless(hasattr(signal, "alarm"),
 
320
                         "signal.alarm() required for this test")
 
321
    def test_select_interrupt(self):
 
322
        s = self.SELECTOR()
 
323
        self.addCleanup(s.close)
 
324
 
 
325
        rd, wr = socketpair()
 
326
        self.addCleanup(rd.close)
 
327
        self.addCleanup(wr.close)
 
328
 
 
329
        orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
 
330
        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
 
331
        self.addCleanup(signal.alarm, 0)
 
332
 
 
333
        signal.alarm(1)
 
334
 
 
335
        s.register(rd, selectors.EVENT_READ)
 
336
        t = time()
 
337
        self.assertFalse(s.select(2))
 
338
        self.assertLess(time() - t, 2.5)
 
339
 
 
340
 
 
341
class ScalableSelectorMixIn:
 
342
 
 
343
    # see issue #18963 for why it's skipped on older OS X versions
 
344
    @support.requires_mac_ver(10, 5)
 
345
    @unittest.skipUnless(resource, "Test needs resource module")
 
346
    def test_above_fd_setsize(self):
 
347
        # A scalable implementation should have no problem with more than
 
348
        # FD_SETSIZE file descriptors. Since we don't know the value, we just
 
349
        # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling.
 
350
        soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
 
351
        try:
 
352
            resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
 
353
            self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,
 
354
                            (soft, hard))
 
355
            NUM_FDS = hard
 
356
        except (OSError, ValueError):
 
357
            NUM_FDS = soft
 
358
 
 
359
        # guard for already allocated FDs (stdin, stdout...)
 
360
        NUM_FDS -= 32
 
361
 
 
362
        s = self.SELECTOR()
 
363
        self.addCleanup(s.close)
 
364
 
 
365
        for i in range(NUM_FDS // 2):
 
366
            try:
 
367
                rd, wr = socketpair()
 
368
            except OSError:
 
369
                # too many FDs, skip - note that we should only catch EMFILE
 
370
                # here, but apparently *BSD and Solaris can fail upon connect()
 
371
                # or bind() with EADDRNOTAVAIL, so let's be safe
 
372
                self.skipTest("FD limit reached")
 
373
 
 
374
            self.addCleanup(rd.close)
 
375
            self.addCleanup(wr.close)
 
376
 
 
377
            try:
 
378
                s.register(rd, selectors.EVENT_READ)
 
379
                s.register(wr, selectors.EVENT_WRITE)
 
380
            except OSError as e:
 
381
                if e.errno == errno.ENOSPC:
 
382
                    # this can be raised by epoll if we go over
 
383
                    # fs.epoll.max_user_watches sysctl
 
384
                    self.skipTest("FD limit reached")
 
385
                raise
 
386
 
 
387
        self.assertEqual(NUM_FDS // 2, len(s.select()))
 
388
 
 
389
 
 
390
class DefaultSelectorTestCase(BaseSelectorTestCase):
 
391
 
 
392
    SELECTOR = selectors.DefaultSelector
 
393
 
 
394
 
 
395
class SelectSelectorTestCase(BaseSelectorTestCase):
 
396
 
 
397
    SELECTOR = selectors.SelectSelector
 
398
 
 
399
 
 
400
@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
 
401
                     "Test needs selectors.PollSelector")
 
402
class PollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
 
403
 
 
404
    SELECTOR = getattr(selectors, 'PollSelector', None)
 
405
 
 
406
 
 
407
@unittest.skipUnless(hasattr(selectors, 'EpollSelector'),
 
408
                     "Test needs selectors.EpollSelector")
 
409
class EpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
 
410
 
 
411
    SELECTOR = getattr(selectors, 'EpollSelector', None)
 
412
 
 
413
 
 
414
@unittest.skipUnless(hasattr(selectors, 'KqueueSelector'),
 
415
                     "Test needs selectors.KqueueSelector)")
 
416
class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
 
417
 
 
418
    SELECTOR = getattr(selectors, 'KqueueSelector', None)
 
419
 
 
420
 
 
421
def test_main():
 
422
    tests = [DefaultSelectorTestCase, SelectSelectorTestCase,
 
423
             PollSelectorTestCase, EpollSelectorTestCase,
 
424
             KqueueSelectorTestCase]
 
425
    support.run_unittest(*tests)
 
426
    support.reap_children()
 
427
 
 
428
 
 
429
if __name__ == "__main__":
 
430
    test_main()