6
from test import support
11
from time import monotonic as time
13
from time import time as time
20
if hasattr(socket, 'socketpair'):
21
socketpair = socket.socketpair
23
def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
24
with socket.socket(family, type, proto) as l:
25
l.bind((support.HOST, 0))
27
c = socket.socket(family, type, proto)
29
c.connect(l.getsockname())
30
caddr = c.getsockname()
33
# check that we've got the correct client
42
def find_ready_matching(ready, flag):
44
for key, events in ready:
46
match.append(key.fileobj)
50
class BaseSelectorTestCase(unittest.TestCase):
52
def test_register(self):
54
self.addCleanup(s.close)
57
self.addCleanup(rd.close)
58
self.addCleanup(wr.close)
60
key = s.register(rd, selectors.EVENT_READ, "data")
61
self.assertIsInstance(key, selectors.SelectorKey)
62
self.assertEqual(key.fileobj, rd)
63
self.assertEqual(key.fd, rd.fileno())
64
self.assertEqual(key.events, selectors.EVENT_READ)
65
self.assertEqual(key.data, "data")
67
# register an unknown event
68
self.assertRaises(ValueError, s.register, 0, 999999)
70
# register an invalid FD
71
self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ)
74
self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ)
76
# register the same FD, but with a different object
77
self.assertRaises(KeyError, s.register, rd.fileno(),
80
def test_unregister(self):
82
self.addCleanup(s.close)
85
self.addCleanup(rd.close)
86
self.addCleanup(wr.close)
88
s.register(rd, selectors.EVENT_READ)
91
# unregister an unknown file obj
92
self.assertRaises(KeyError, s.unregister, 999999)
95
self.assertRaises(KeyError, s.unregister, rd)
97
def test_modify(self):
99
self.addCleanup(s.close)
101
rd, wr = socketpair()
102
self.addCleanup(rd.close)
103
self.addCleanup(wr.close)
105
key = s.register(rd, selectors.EVENT_READ)
108
key2 = s.modify(rd, selectors.EVENT_WRITE)
109
self.assertNotEqual(key.events, key2.events)
110
self.assertEqual(key2, s.get_key(rd))
118
key = s.register(rd, selectors.EVENT_READ, d1)
119
key2 = s.modify(rd, selectors.EVENT_READ, d2)
120
self.assertEqual(key.events, key2.events)
121
self.assertNotEqual(key.data, key2.data)
122
self.assertEqual(key2, s.get_key(rd))
123
self.assertEqual(key2.data, d2)
125
# modify unknown file obj
126
self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ)
128
# modify use a shortcut
130
s.register = unittest.mock.Mock()
131
s.unregister = unittest.mock.Mock()
133
s.modify(rd, selectors.EVENT_READ, d3)
134
self.assertFalse(s.register.called)
135
self.assertFalse(s.unregister.called)
137
def test_close(self):
139
self.addCleanup(s.close)
141
rd, wr = socketpair()
142
self.addCleanup(rd.close)
143
self.addCleanup(wr.close)
145
s.register(rd, selectors.EVENT_READ)
146
s.register(wr, selectors.EVENT_WRITE)
149
self.assertRaises(KeyError, s.get_key, rd)
150
self.assertRaises(KeyError, s.get_key, wr)
152
def test_get_key(self):
154
self.addCleanup(s.close)
156
rd, wr = socketpair()
157
self.addCleanup(rd.close)
158
self.addCleanup(wr.close)
160
key = s.register(rd, selectors.EVENT_READ, "data")
161
self.assertEqual(key, s.get_key(rd))
164
self.assertRaises(KeyError, s.get_key, 999999)
166
def test_get_map(self):
168
self.addCleanup(s.close)
170
rd, wr = socketpair()
171
self.addCleanup(rd.close)
172
self.addCleanup(wr.close)
175
self.assertFalse(keys)
176
self.assertEqual(len(keys), 0)
177
self.assertEqual(list(keys), [])
178
key = s.register(rd, selectors.EVENT_READ, "data")
179
self.assertIn(rd, keys)
180
self.assertEqual(key, keys[rd])
181
self.assertEqual(len(keys), 1)
182
self.assertEqual(list(keys), [rd.fileno()])
183
self.assertEqual(list(keys.values()), [key])
186
with self.assertRaises(KeyError):
190
with self.assertRaises(TypeError):
193
def test_select(self):
195
self.addCleanup(s.close)
197
rd, wr = socketpair()
198
self.addCleanup(rd.close)
199
self.addCleanup(wr.close)
201
s.register(rd, selectors.EVENT_READ)
202
wr_key = s.register(wr, selectors.EVENT_WRITE)
205
for key, events in result:
206
self.assertTrue(isinstance(key, selectors.SelectorKey))
207
self.assertTrue(events)
208
self.assertFalse(events & ~(selectors.EVENT_READ |
209
selectors.EVENT_WRITE))
211
self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result)
213
def test_context_manager(self):
215
self.addCleanup(s.close)
217
rd, wr = socketpair()
218
self.addCleanup(rd.close)
219
self.addCleanup(wr.close)
222
sel.register(rd, selectors.EVENT_READ)
223
sel.register(wr, selectors.EVENT_WRITE)
225
self.assertRaises(KeyError, s.get_key, rd)
226
self.assertRaises(KeyError, s.get_key, wr)
228
def test_fileno(self):
230
self.addCleanup(s.close)
232
if hasattr(s, 'fileno'):
234
self.assertTrue(isinstance(fd, int))
235
self.assertGreaterEqual(fd, 0)
237
def test_selector(self):
239
self.addCleanup(s.close)
242
MSG = b" This is a test."
249
for i in range(NUM_SOCKETS):
250
rd, wr = socketpair()
251
self.addCleanup(rd.close)
252
self.addCleanup(wr.close)
253
s.register(rd, selectors.EVENT_READ)
254
s.register(wr, selectors.EVENT_WRITE)
264
ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE)
265
if not ready_writers:
266
self.fail("no sockets ready for writing")
267
wr = random.choice(ready_writers)
272
ready_readers = find_ready_matching(ready,
273
selectors.EVENT_READ)
276
# there might be a delay between the write to the write end and
277
# the read end is reported ready
280
self.fail("no sockets ready for reading")
281
self.assertEqual([w2r[wr]], ready_readers)
282
rd = ready_readers[0]
283
buf = rd.recv(MSG_LEN)
284
self.assertEqual(len(buf), MSG_LEN)
286
s.unregister(r2w[rd])
288
writers.remove(r2w[rd])
290
self.assertEqual(bufs, [MSG] * NUM_SOCKETS)
292
def test_timeout(self):
294
self.addCleanup(s.close)
296
rd, wr = socketpair()
297
self.addCleanup(rd.close)
298
self.addCleanup(wr.close)
300
s.register(wr, selectors.EVENT_WRITE)
302
self.assertEqual(1, len(s.select(0)))
303
self.assertEqual(1, len(s.select(-1)))
304
self.assertLess(time() - t, 0.5)
307
s.register(rd, selectors.EVENT_READ)
309
self.assertFalse(s.select(0))
310
self.assertFalse(s.select(-1))
311
self.assertLess(time() - t, 0.5)
314
self.assertFalse(s.select(1))
317
self.assertTrue(0.8 <= dt <= 1.6, dt)
319
@unittest.skipUnless(hasattr(signal, "alarm"),
320
"signal.alarm() required for this test")
321
def test_select_interrupt(self):
323
self.addCleanup(s.close)
325
rd, wr = socketpair()
326
self.addCleanup(rd.close)
327
self.addCleanup(wr.close)
329
orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
330
self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
331
self.addCleanup(signal.alarm, 0)
335
s.register(rd, selectors.EVENT_READ)
337
self.assertFalse(s.select(2))
338
self.assertLess(time() - t, 2.5)
341
class ScalableSelectorMixIn:
343
# see issue #18963 for why it's skipped on older OS X versions
344
@support.requires_mac_ver(10, 5)
345
@unittest.skipUnless(resource, "Test needs resource module")
346
def test_above_fd_setsize(self):
347
# A scalable implementation should have no problem with more than
348
# FD_SETSIZE file descriptors. Since we don't know the value, we just
349
# try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling.
350
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
352
resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
353
self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,
356
except (OSError, ValueError):
359
# guard for already allocated FDs (stdin, stdout...)
363
self.addCleanup(s.close)
365
for i in range(NUM_FDS // 2):
367
rd, wr = socketpair()
369
# too many FDs, skip - note that we should only catch EMFILE
370
# here, but apparently *BSD and Solaris can fail upon connect()
371
# or bind() with EADDRNOTAVAIL, so let's be safe
372
self.skipTest("FD limit reached")
374
self.addCleanup(rd.close)
375
self.addCleanup(wr.close)
378
s.register(rd, selectors.EVENT_READ)
379
s.register(wr, selectors.EVENT_WRITE)
381
if e.errno == errno.ENOSPC:
382
# this can be raised by epoll if we go over
383
# fs.epoll.max_user_watches sysctl
384
self.skipTest("FD limit reached")
387
self.assertEqual(NUM_FDS // 2, len(s.select()))
390
class DefaultSelectorTestCase(BaseSelectorTestCase):
392
SELECTOR = selectors.DefaultSelector
395
class SelectSelectorTestCase(BaseSelectorTestCase):
397
SELECTOR = selectors.SelectSelector
400
@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
401
"Test needs selectors.PollSelector")
402
class PollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
404
SELECTOR = getattr(selectors, 'PollSelector', None)
407
@unittest.skipUnless(hasattr(selectors, 'EpollSelector'),
408
"Test needs selectors.EpollSelector")
409
class EpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
411
SELECTOR = getattr(selectors, 'EpollSelector', None)
414
@unittest.skipUnless(hasattr(selectors, 'KqueueSelector'),
415
"Test needs selectors.KqueueSelector)")
416
class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
418
SELECTOR = getattr(selectors, 'KqueueSelector', None)
422
tests = [DefaultSelectorTestCase, SelectSelectorTestCase,
423
PollSelectorTestCase, EpollSelectorTestCase,
424
KqueueSelectorTestCase]
425
support.run_unittest(*tests)
426
support.reap_children()
429
if __name__ == "__main__":