1
# test asynchat -- requires threading
3
import _thread as thread # If this fails, we can't test this module
4
import asyncore, asynchat, socket, threading, time
7
from test import support
10
SERVER_QUIT = b'QUIT\n'
12
class echo_server(threading.Thread):
13
# parameter to determine the number of bytes passed back to the
17
def __init__(self, event):
18
threading.Thread.__init__(self)
20
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
21
self.port = support.bind_port(self.sock)
26
conn, client = self.sock.accept()
28
# collect data until quit message is seen
29
while SERVER_QUIT not in self.buffer:
33
self.buffer = self.buffer + data
35
# remove the SERVER_QUIT message
36
self.buffer = self.buffer.replace(SERVER_QUIT, b'')
38
# re-send entire set of collected data
40
# this may fail on some tests, such as test_close_when_done, since
41
# the client closes the channel when it's done sending
43
n = conn.send(self.buffer[:self.chunk_size])
45
self.buffer = self.buffer[n:]
52
class echo_client(asynchat.async_chat):
54
def __init__(self, terminator, server_port):
55
asynchat.async_chat.__init__(self)
57
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
58
self.connect((HOST, server_port))
59
self.set_terminator(terminator)
62
def handle_connect(self):
65
if sys.platform == 'darwin':
66
# select.poll returns a select.POLLHUP at the end of the tests
67
# on darwin, so just ignore it
68
def handle_expt(self):
71
def collect_incoming_data(self, data):
74
def found_terminator(self):
75
self.contents.append(self.buffer)
79
def start_echo_server():
80
event = threading.Event()
81
s = echo_server(event)
85
time.sleep(0.01) # Give server time to start accepting.
89
class TestAsynchat(unittest.TestCase):
98
def line_terminator_check(self, term, server_chunk):
99
event = threading.Event()
100
s = echo_server(event)
101
s.chunk_size = server_chunk
105
time.sleep(0.01) # Give server time to start accepting.
106
c = echo_client(term, s.port)
108
c.push(b"world" + term)
109
c.push(b"I'm not dead yet!" + term)
111
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
114
self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
116
# the line terminator tests below check receiving variously-sized
117
# chunks back from the server in order to exercise all branches of
118
# async_chat.handle_read
120
def test_line_terminator1(self):
121
# test one-character terminator
123
self.line_terminator_check(b'\n', l)
125
def test_line_terminator2(self):
126
# test two-character terminator
128
self.line_terminator_check(b'\r\n', l)
130
def test_line_terminator3(self):
131
# test three-character terminator
133
self.line_terminator_check(b'qqq', l)
135
def numeric_terminator_check(self, termlen):
136
# Try reading a fixed number of bytes
137
s, event = start_echo_server()
138
c = echo_client(termlen, s.port)
139
data = b"hello world, I'm not dead yet!\n"
142
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
145
self.assertEqual(c.contents, [data[:termlen]])
147
def test_numeric_terminator1(self):
148
# check that ints & longs both work (since type is
149
# explicitly checked in async_chat.handle_read)
150
self.numeric_terminator_check(1)
152
def test_numeric_terminator2(self):
153
self.numeric_terminator_check(6)
155
def test_none_terminator(self):
156
# Try reading a fixed number of bytes
157
s, event = start_echo_server()
158
c = echo_client(None, s.port)
159
data = b"hello world, I'm not dead yet!\n"
162
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
165
self.assertEqual(c.contents, [])
166
self.assertEqual(c.buffer, data)
168
def test_simple_producer(self):
169
s, event = start_echo_server()
170
c = echo_client(b'\n', s.port)
171
data = b"hello world\nI'm not dead yet!\n"
172
p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
173
c.push_with_producer(p)
174
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
177
self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
179
def test_string_producer(self):
180
s, event = start_echo_server()
181
c = echo_client(b'\n', s.port)
182
data = b"hello world\nI'm not dead yet!\n"
183
c.push_with_producer(data+SERVER_QUIT)
184
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
187
self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
189
def test_empty_line(self):
190
# checks that empty lines are handled correctly
191
s, event = start_echo_server()
192
c = echo_client(b'\n', s.port)
193
c.push(b"hello world\n\nI'm not dead yet!\n")
195
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
198
self.assertEqual(c.contents,
199
[b"hello world", b"", b"I'm not dead yet!"])
201
def test_close_when_done(self):
202
s, event = start_echo_server()
203
c = echo_client(b'\n', s.port)
204
c.push(b"hello world\nI'm not dead yet!\n")
207
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
210
self.assertEqual(c.contents, [])
211
# the server might have been able to send a byte or two back, but this
212
# at least checks that it received something and didn't just fail
213
# (which could still result in the client not having received anything)
214
self.assertTrue(len(s.buffer) > 0)
217
class TestAsynchat_WithPoll(TestAsynchat):
220
class TestHelperFunctions(unittest.TestCase):
221
def test_find_prefix_at_end(self):
222
self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
223
self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
225
class TestFifo(unittest.TestCase):
226
def test_basic(self):
230
self.assertEqual(len(f), 2)
231
self.assertEqual(f.first(), 7)
232
self.assertEqual(f.pop(), (1, 7))
233
self.assertEqual(len(f), 1)
234
self.assertEqual(f.first(), b'a')
235
self.assertEqual(f.is_empty(), False)
236
self.assertEqual(f.pop(), (1, b'a'))
237
self.assertEqual(len(f), 0)
238
self.assertEqual(f.is_empty(), True)
239
self.assertEqual(f.pop(), (0, None))
241
def test_given_list(self):
242
f = asynchat.fifo([b'x', 17, 3])
243
self.assertEqual(len(f), 3)
244
self.assertEqual(f.pop(), (1, b'x'))
245
self.assertEqual(f.pop(), (1, 17))
246
self.assertEqual(f.pop(), (1, 3))
247
self.assertEqual(f.pop(), (0, None))
250
def test_main(verbose=None):
251
support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
252
TestHelperFunctions, TestFifo)
254
if __name__ == "__main__":
255
test_main(verbose=True)