10
from test import test_support
11
from test.test_support import TESTFN, run_unittest, unlink
12
from StringIO import StringIO
14
HOST = test_support.HOST
28
self.socket = dummysocket()
34
def handle_read_event(self):
35
raise asyncore.ExitNow()
37
handle_write_event = handle_read_event
38
handle_expt_event = handle_read_event
42
self.error_handled = False
44
def handle_read_event(self):
47
handle_write_event = handle_read_event
48
handle_expt_event = handle_read_event
50
def handle_error(self):
51
self.error_handled = True
53
# used when testing senders; just collects what it gets until newline is sent
54
def capture_server(evt, buf, serv):
57
conn, addr = serv.accept()
58
except socket.timeout:
63
r, w, e = select.select([conn], [], [])
66
# keep everything except for the newline terminator
67
buf.write(data.replace('\n', ''))
79
class HelperFunctionTests(unittest.TestCase):
80
def test_readwriteexc(self):
81
# Check exception handling behavior of read, write and _exception
83
# check that ExitNow exceptions in the object handler method
84
# bubbles all the way up through asyncore read/write/_exception calls
86
self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
87
self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
88
self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
90
# check that an exception other than ExitNow in the object handler
91
# method causes the handle_error method to get called
94
self.assertEqual(tr2.error_handled, True)
98
self.assertEqual(tr2.error_handled, True)
100
tr2 = crashingdummy()
101
asyncore._exception(tr2)
102
self.assertEqual(tr2.error_handled, True)
104
# asyncore.readwrite uses constants in the select module that
105
# are not present in Windows systems (see this thread:
106
# http://mail.python.org/pipermail/python-list/2001-October/109973.html)
107
# These constants should be present as long as poll is available
109
if hasattr(select, 'poll'):
110
def test_readwrite(self):
111
# Check that correct methods are called by readwrite()
119
def handle_read_event(self):
122
def handle_write_event(self):
125
def handle_expt_event(self):
128
def handle_error(self):
129
self.error_handled = True
131
for flag in (select.POLLIN, select.POLLPRI):
133
self.assertEqual(tobj.read, False)
134
asyncore.readwrite(tobj, flag)
135
self.assertEqual(tobj.read, True)
137
# check that ExitNow exceptions in the object handler method
138
# bubbles all the way up through asyncore readwrite call
140
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
142
# check that an exception other than ExitNow in the object handler
143
# method causes the handle_error method to get called
144
tr2 = crashingdummy()
145
asyncore.readwrite(tr2, flag)
146
self.assertEqual(tr2.error_handled, True)
149
self.assertEqual(tobj.write, False)
150
asyncore.readwrite(tobj, select.POLLOUT)
151
self.assertEqual(tobj.write, True)
153
# check that ExitNow exceptions in the object handler method
154
# bubbles all the way up through asyncore readwrite call
156
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1,
159
# check that an exception other than ExitNow in the object handler
160
# method causes the handle_error method to get called
161
tr2 = crashingdummy()
162
asyncore.readwrite(tr2, select.POLLOUT)
163
self.assertEqual(tr2.error_handled, True)
165
for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL):
167
self.assertEqual(tobj.expt, False)
168
asyncore.readwrite(tobj, flag)
169
self.assertEqual(tobj.expt, True)
171
# check that ExitNow exceptions in the object handler method
172
# bubbles all the way up through asyncore readwrite calls
174
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
176
# check that an exception other than ExitNow in the object handler
177
# method causes the handle_error method to get called
178
tr2 = crashingdummy()
179
asyncore.readwrite(tr2, flag)
180
self.assertEqual(tr2.error_handled, True)
182
def test_closeall(self):
183
self.closeall_check(False)
185
def test_closeall_default(self):
186
self.closeall_check(True)
188
def closeall_check(self, usedefault):
189
# Check that close_all() closes everything in a given map
196
self.assertEqual(c.socket.closed, False)
200
socketmap = asyncore.socket_map
202
asyncore.socket_map = testmap
205
testmap, asyncore.socket_map = asyncore.socket_map, socketmap
207
asyncore.close_all(testmap)
209
self.assertEqual(len(testmap), 0)
212
self.assertEqual(c.socket.closed, True)
214
def test_compact_traceback(self):
216
raise Exception("I don't like spam!")
218
real_t, real_v, real_tb = sys.exc_info()
219
r = asyncore.compact_traceback()
221
self.fail("Expected exception")
223
(f, function, line), t, v, info = r
224
self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
225
self.assertEqual(function, 'test_compact_traceback')
226
self.assertEqual(t, real_t)
227
self.assertEqual(v, real_v)
228
self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
231
class DispatcherTests(unittest.TestCase):
238
def test_basic(self):
239
d = asyncore.dispatcher()
240
self.assertEqual(d.readable(), True)
241
self.assertEqual(d.writable(), True)
244
d = asyncore.dispatcher()
245
self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
248
d = asyncore.dispatcher()
250
# capture output of dispatcher.log() (to stderr)
253
l1 = "Lovely spam! Wonderful spam!"
254
l2 = "I don't like spam!"
262
lines = fp.getvalue().splitlines()
263
self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2])
265
def test_log_info(self):
266
d = asyncore.dispatcher()
268
# capture output of dispatcher.log_info() (to stdout via print)
271
l1 = "Have you got anything without spam?"
272
l2 = "Why can't she have egg bacon spam and sausage?"
273
l3 = "THAT'S got spam in it!"
276
d.log_info(l1, 'EGGS')
278
d.log_info(l3, 'SPAM')
282
lines = fp.getvalue().splitlines()
284
expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
286
expected = ['EGGS: %s' % l1, 'SPAM: %s' % l3]
288
self.assertEquals(lines, expected)
290
def test_unhandled(self):
291
d = asyncore.dispatcher()
293
# capture output of dispatcher.log_info() (to stdout via print)
306
lines = fp.getvalue().splitlines()
307
expected = ['warning: unhandled exception',
308
'warning: unhandled read event',
309
'warning: unhandled write event',
310
'warning: unhandled connect event',
311
'warning: unhandled accept event']
312
self.assertEquals(lines, expected)
316
class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
320
def handle_connect(self):
323
class DispatcherWithSendTests(unittest.TestCase):
333
self.evt = threading.Event()
334
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
335
self.sock.settimeout(3)
336
self.port = test_support.bind_port(self.sock)
339
args = (self.evt, cap, self.sock)
340
threading.Thread(target=capture_server, args=args).start()
342
# wait a little longer for the server to initialize (it sometimes
343
# refuses connections on slow machines without this wait)
346
data = "Suppose there isn't a 16-ton weight?"
347
d = dispatcherwithsend_noread()
348
d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
349
d.connect((HOST, self.port))
351
# give time for socket to connect
359
while d.out_buffer and n > 0:
365
self.assertEqual(cap.getvalue(), data*2)
368
class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
371
if hasattr(asyncore, 'file_wrapper'):
372
class FileWrapperTest(unittest.TestCase):
374
self.d = "It's not dead, it's sleeping!"
375
file(TESTFN, 'w').write(self.d)
381
fd = os.open(TESTFN, os.O_RDONLY)
382
w = asyncore.file_wrapper(fd)
384
self.assertEqual(w.fd, fd)
385
self.assertEqual(w.fileno(), fd)
386
self.assertEqual(w.recv(13), "It's not dead")
387
self.assertEqual(w.read(6), ", it's")
389
self.assertRaises(OSError, w.read, 1)
393
d2 = "I want to buy some cheese."
394
fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
395
w = asyncore.file_wrapper(fd)
400
self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
404
tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
405
DispatcherWithSendTests_UsePoll]
406
if hasattr(asyncore, 'file_wrapper'):
407
tests.append(FileWrapperTest)
411
if __name__ == "__main__":
10
from test import test_support
11
from test.test_support import TESTFN, run_unittest, unlink
12
from StringIO import StringIO
14
HOST = test_support.HOST
28
self.socket = dummysocket()
37
def handle_read_event(self):
38
raise asyncore.ExitNow()
40
handle_write_event = handle_read_event
41
handle_close = handle_read_event
42
handle_expt_event = handle_read_event
46
self.error_handled = False
48
def handle_read_event(self):
51
handle_write_event = handle_read_event
52
handle_close = handle_read_event
53
handle_expt_event = handle_read_event
55
def handle_error(self):
56
self.error_handled = True
58
# used when testing senders; just collects what it gets until newline is sent
59
def capture_server(evt, buf, serv):
62
conn, addr = serv.accept()
63
except socket.timeout:
68
r, w, e = select.select([conn], [], [])
71
# keep everything except for the newline terminator
72
buf.write(data.replace('\n', ''))
84
class HelperFunctionTests(unittest.TestCase):
85
def test_readwriteexc(self):
86
# Check exception handling behavior of read, write and _exception
88
# check that ExitNow exceptions in the object handler method
89
# bubbles all the way up through asyncore read/write/_exception calls
91
self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
92
self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
93
self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
95
# check that an exception other than ExitNow in the object handler
96
# method causes the handle_error method to get called
99
self.assertEqual(tr2.error_handled, True)
101
tr2 = crashingdummy()
103
self.assertEqual(tr2.error_handled, True)
105
tr2 = crashingdummy()
106
asyncore._exception(tr2)
107
self.assertEqual(tr2.error_handled, True)
109
# asyncore.readwrite uses constants in the select module that
110
# are not present in Windows systems (see this thread:
111
# http://mail.python.org/pipermail/python-list/2001-October/109973.html)
112
# These constants should be present as long as poll is available
114
if hasattr(select, 'poll'):
115
def test_readwrite(self):
116
# Check that correct methods are called by readwrite()
125
def handle_read_event(self):
128
def handle_write_event(self):
131
def handle_close(self):
134
def handle_expt_event(self):
137
def handle_error(self):
138
self.error_handled = True
140
for flag in (select.POLLIN, select.POLLPRI):
142
self.assertEqual(tobj.read, False)
143
asyncore.readwrite(tobj, flag)
144
self.assertEqual(tobj.read, True)
146
# check that ExitNow exceptions in the object handler method
147
# bubbles all the way up through asyncore readwrite call
149
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
151
# check that an exception other than ExitNow in the object handler
152
# method causes the handle_error method to get called
153
tr2 = crashingdummy()
154
asyncore.readwrite(tr2, flag)
155
self.assertEqual(tr2.error_handled, True)
158
self.assertEqual(tobj.write, False)
159
asyncore.readwrite(tobj, select.POLLOUT)
160
self.assertEqual(tobj.write, True)
162
# check that ExitNow exceptions in the object handler method
163
# bubbles all the way up through asyncore readwrite call
165
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1,
168
# check that an exception other than ExitNow in the object handler
169
# method causes the handle_error method to get called
170
tr2 = crashingdummy()
171
asyncore.readwrite(tr2, select.POLLOUT)
172
self.assertEqual(tr2.error_handled, True)
174
for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL):
176
self.assertEqual((tobj.expt, tobj.closed)[flag == select.POLLHUP], False)
177
asyncore.readwrite(tobj, flag)
178
self.assertEqual((tobj.expt, tobj.closed)[flag == select.POLLHUP], True)
180
# check that ExitNow exceptions in the object handler method
181
# bubbles all the way up through asyncore readwrite calls
183
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
185
# check that an exception other than ExitNow in the object handler
186
# method causes the handle_error method to get called
187
tr2 = crashingdummy()
188
asyncore.readwrite(tr2, flag)
189
self.assertEqual(tr2.error_handled, True)
191
def test_closeall(self):
192
self.closeall_check(False)
194
def test_closeall_default(self):
195
self.closeall_check(True)
197
def closeall_check(self, usedefault):
198
# Check that close_all() closes everything in a given map
205
self.assertEqual(c.socket.closed, False)
209
socketmap = asyncore.socket_map
211
asyncore.socket_map = testmap
214
testmap, asyncore.socket_map = asyncore.socket_map, socketmap
216
asyncore.close_all(testmap)
218
self.assertEqual(len(testmap), 0)
221
self.assertEqual(c.socket.closed, True)
223
def test_compact_traceback(self):
225
raise Exception("I don't like spam!")
227
real_t, real_v, real_tb = sys.exc_info()
228
r = asyncore.compact_traceback()
230
self.fail("Expected exception")
232
(f, function, line), t, v, info = r
233
self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
234
self.assertEqual(function, 'test_compact_traceback')
235
self.assertEqual(t, real_t)
236
self.assertEqual(v, real_v)
237
self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
240
class DispatcherTests(unittest.TestCase):
247
def test_basic(self):
248
d = asyncore.dispatcher()
249
self.assertEqual(d.readable(), True)
250
self.assertEqual(d.writable(), True)
253
d = asyncore.dispatcher()
254
self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
257
d = asyncore.dispatcher()
259
# capture output of dispatcher.log() (to stderr)
262
l1 = "Lovely spam! Wonderful spam!"
263
l2 = "I don't like spam!"
271
lines = fp.getvalue().splitlines()
272
self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2])
274
def test_log_info(self):
275
d = asyncore.dispatcher()
277
# capture output of dispatcher.log_info() (to stdout via print)
280
l1 = "Have you got anything without spam?"
281
l2 = "Why can't she have egg bacon spam and sausage?"
282
l3 = "THAT'S got spam in it!"
285
d.log_info(l1, 'EGGS')
287
d.log_info(l3, 'SPAM')
291
lines = fp.getvalue().splitlines()
293
expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
295
expected = ['EGGS: %s' % l1, 'SPAM: %s' % l3]
297
self.assertEquals(lines, expected)
299
def test_unhandled(self):
300
d = asyncore.dispatcher()
302
# capture output of dispatcher.log_info() (to stdout via print)
315
lines = fp.getvalue().splitlines()
316
expected = ['warning: unhandled exception',
317
'warning: unhandled read event',
318
'warning: unhandled write event',
319
'warning: unhandled connect event',
320
'warning: unhandled accept event']
321
self.assertEquals(lines, expected)
325
class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
329
def handle_connect(self):
332
class DispatcherWithSendTests(unittest.TestCase):
342
self.evt = threading.Event()
343
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
344
self.sock.settimeout(3)
345
self.port = test_support.bind_port(self.sock)
348
args = (self.evt, cap, self.sock)
349
threading.Thread(target=capture_server, args=args).start()
351
# wait a little longer for the server to initialize (it sometimes
352
# refuses connections on slow machines without this wait)
355
data = "Suppose there isn't a 16-ton weight?"
356
d = dispatcherwithsend_noread()
357
d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
358
d.connect((HOST, self.port))
360
# give time for socket to connect
368
while d.out_buffer and n > 0:
374
self.assertEqual(cap.getvalue(), data*2)
377
class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
380
if hasattr(asyncore, 'file_wrapper'):
381
class FileWrapperTest(unittest.TestCase):
383
self.d = "It's not dead, it's sleeping!"
384
file(TESTFN, 'w').write(self.d)
390
fd = os.open(TESTFN, os.O_RDONLY)
391
w = asyncore.file_wrapper(fd)
394
self.assertNotEqual(w.fd, fd)
395
self.assertNotEqual(w.fileno(), fd)
396
self.assertEqual(w.recv(13), "It's not dead")
397
self.assertEqual(w.read(6), ", it's")
399
self.assertRaises(OSError, w.read, 1)
403
d2 = "I want to buy some cheese."
404
fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
405
w = asyncore.file_wrapper(fd)
411
self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
415
tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
416
DispatcherWithSendTests_UsePoll]
417
if hasattr(asyncore, 'file_wrapper'):
418
tests.append(FileWrapperTest)
422
if __name__ == "__main__":