~ubuntu-branches/ubuntu/maverick/python3.1/maverick

« back to all changes in this revision

Viewing changes to Lib/test/test_asynchat.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-03-23 00:01:27 UTC
  • Revision ID: james.westby@ubuntu.com-20090323000127-5fstfxju4ufrhthq
Tags: upstream-3.1~a1+20090322
ImportĀ upstreamĀ versionĀ 3.1~a1+20090322

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# test asynchat -- requires threading
 
2
 
 
3
import _thread as thread # If this fails, we can't test this module
 
4
import asyncore, asynchat, socket, threading, time
 
5
import unittest
 
6
import sys
 
7
from test import support
 
8
 
 
9
HOST = support.HOST
 
10
SERVER_QUIT = b'QUIT\n'
 
11
 
 
12
class echo_server(threading.Thread):
 
13
    # parameter to determine the number of bytes passed back to the
 
14
    # client each send
 
15
    chunk_size = 1
 
16
 
 
17
    def __init__(self, event):
 
18
        threading.Thread.__init__(self)
 
19
        self.event = event
 
20
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
21
        self.port = support.bind_port(self.sock)
 
22
 
 
23
    def run(self):
 
24
        self.sock.listen(1)
 
25
        self.event.set()
 
26
        conn, client = self.sock.accept()
 
27
        self.buffer = b""
 
28
        # collect data until quit message is seen
 
29
        while SERVER_QUIT not in self.buffer:
 
30
            data = conn.recv(1)
 
31
            if not data:
 
32
                break
 
33
            self.buffer = self.buffer + data
 
34
 
 
35
        # remove the SERVER_QUIT message
 
36
        self.buffer = self.buffer.replace(SERVER_QUIT, b'')
 
37
 
 
38
        # re-send entire set of collected data
 
39
        try:
 
40
            # this may fail on some tests, such as test_close_when_done, since
 
41
            # the client closes the channel when it's done sending
 
42
            while self.buffer:
 
43
                n = conn.send(self.buffer[:self.chunk_size])
 
44
                time.sleep(0.001)
 
45
                self.buffer = self.buffer[n:]
 
46
        except:
 
47
            pass
 
48
 
 
49
        conn.close()
 
50
        self.sock.close()
 
51
 
 
52
class echo_client(asynchat.async_chat):
 
53
 
 
54
    def __init__(self, terminator, server_port):
 
55
        asynchat.async_chat.__init__(self)
 
56
        self.contents = []
 
57
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
 
58
        self.connect((HOST, server_port))
 
59
        self.set_terminator(terminator)
 
60
        self.buffer = b""
 
61
 
 
62
    def handle_connect(self):
 
63
        pass
 
64
 
 
65
    if sys.platform == 'darwin':
 
66
        # select.poll returns a select.POLLHUP at the end of the tests
 
67
        # on darwin, so just ignore it
 
68
        def handle_expt(self):
 
69
            pass
 
70
 
 
71
    def collect_incoming_data(self, data):
 
72
        self.buffer += data
 
73
 
 
74
    def found_terminator(self):
 
75
        self.contents.append(self.buffer)
 
76
        self.buffer = b""
 
77
 
 
78
 
 
79
def start_echo_server():
 
80
    event = threading.Event()
 
81
    s = echo_server(event)
 
82
    s.start()
 
83
    event.wait()
 
84
    event.clear()
 
85
    time.sleep(0.01) # Give server time to start accepting.
 
86
    return s, event
 
87
 
 
88
 
 
89
class TestAsynchat(unittest.TestCase):
 
90
    usepoll = False
 
91
 
 
92
    def setUp (self):
 
93
        pass
 
94
 
 
95
    def tearDown (self):
 
96
        pass
 
97
 
 
98
    def line_terminator_check(self, term, server_chunk):
 
99
        event = threading.Event()
 
100
        s = echo_server(event)
 
101
        s.chunk_size = server_chunk
 
102
        s.start()
 
103
        event.wait()
 
104
        event.clear()
 
105
        time.sleep(0.01) # Give server time to start accepting.
 
106
        c = echo_client(term, s.port)
 
107
        c.push(b"hello ")
 
108
        c.push(b"world" + term)
 
109
        c.push(b"I'm not dead yet!" + term)
 
110
        c.push(SERVER_QUIT)
 
111
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 
112
        s.join()
 
113
 
 
114
        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
 
115
 
 
116
    # the line terminator tests below check receiving variously-sized
 
117
    # chunks back from the server in order to exercise all branches of
 
118
    # async_chat.handle_read
 
119
 
 
120
    def test_line_terminator1(self):
 
121
        # test one-character terminator
 
122
        for l in (1,2,3):
 
123
            self.line_terminator_check(b'\n', l)
 
124
 
 
125
    def test_line_terminator2(self):
 
126
        # test two-character terminator
 
127
        for l in (1,2,3):
 
128
            self.line_terminator_check(b'\r\n', l)
 
129
 
 
130
    def test_line_terminator3(self):
 
131
        # test three-character terminator
 
132
        for l in (1,2,3):
 
133
            self.line_terminator_check(b'qqq', l)
 
134
 
 
135
    def numeric_terminator_check(self, termlen):
 
136
        # Try reading a fixed number of bytes
 
137
        s, event = start_echo_server()
 
138
        c = echo_client(termlen, s.port)
 
139
        data = b"hello world, I'm not dead yet!\n"
 
140
        c.push(data)
 
141
        c.push(SERVER_QUIT)
 
142
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 
143
        s.join()
 
144
 
 
145
        self.assertEqual(c.contents, [data[:termlen]])
 
146
 
 
147
    def test_numeric_terminator1(self):
 
148
        # check that ints & longs both work (since type is
 
149
        # explicitly checked in async_chat.handle_read)
 
150
        self.numeric_terminator_check(1)
 
151
 
 
152
    def test_numeric_terminator2(self):
 
153
        self.numeric_terminator_check(6)
 
154
 
 
155
    def test_none_terminator(self):
 
156
        # Try reading a fixed number of bytes
 
157
        s, event = start_echo_server()
 
158
        c = echo_client(None, s.port)
 
159
        data = b"hello world, I'm not dead yet!\n"
 
160
        c.push(data)
 
161
        c.push(SERVER_QUIT)
 
162
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 
163
        s.join()
 
164
 
 
165
        self.assertEqual(c.contents, [])
 
166
        self.assertEqual(c.buffer, data)
 
167
 
 
168
    def test_simple_producer(self):
 
169
        s, event = start_echo_server()
 
170
        c = echo_client(b'\n', s.port)
 
171
        data = b"hello world\nI'm not dead yet!\n"
 
172
        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
 
173
        c.push_with_producer(p)
 
174
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 
175
        s.join()
 
176
 
 
177
        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
 
178
 
 
179
    def test_string_producer(self):
 
180
        s, event = start_echo_server()
 
181
        c = echo_client(b'\n', s.port)
 
182
        data = b"hello world\nI'm not dead yet!\n"
 
183
        c.push_with_producer(data+SERVER_QUIT)
 
184
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 
185
        s.join()
 
186
 
 
187
        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
 
188
 
 
189
    def test_empty_line(self):
 
190
        # checks that empty lines are handled correctly
 
191
        s, event = start_echo_server()
 
192
        c = echo_client(b'\n', s.port)
 
193
        c.push(b"hello world\n\nI'm not dead yet!\n")
 
194
        c.push(SERVER_QUIT)
 
195
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 
196
        s.join()
 
197
 
 
198
        self.assertEqual(c.contents,
 
199
                         [b"hello world", b"", b"I'm not dead yet!"])
 
200
 
 
201
    def test_close_when_done(self):
 
202
        s, event = start_echo_server()
 
203
        c = echo_client(b'\n', s.port)
 
204
        c.push(b"hello world\nI'm not dead yet!\n")
 
205
        c.push(SERVER_QUIT)
 
206
        c.close_when_done()
 
207
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
 
208
        s.join()
 
209
 
 
210
        self.assertEqual(c.contents, [])
 
211
        # the server might have been able to send a byte or two back, but this
 
212
        # at least checks that it received something and didn't just fail
 
213
        # (which could still result in the client not having received anything)
 
214
        self.assertTrue(len(s.buffer) > 0)
 
215
 
 
216
 
 
217
class TestAsynchat_WithPoll(TestAsynchat):
 
218
    usepoll = True
 
219
 
 
220
class TestHelperFunctions(unittest.TestCase):
 
221
    def test_find_prefix_at_end(self):
 
222
        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
 
223
        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
 
224
 
 
225
class TestFifo(unittest.TestCase):
 
226
    def test_basic(self):
 
227
        f = asynchat.fifo()
 
228
        f.push(7)
 
229
        f.push(b'a')
 
230
        self.assertEqual(len(f), 2)
 
231
        self.assertEqual(f.first(), 7)
 
232
        self.assertEqual(f.pop(), (1, 7))
 
233
        self.assertEqual(len(f), 1)
 
234
        self.assertEqual(f.first(), b'a')
 
235
        self.assertEqual(f.is_empty(), False)
 
236
        self.assertEqual(f.pop(), (1, b'a'))
 
237
        self.assertEqual(len(f), 0)
 
238
        self.assertEqual(f.is_empty(), True)
 
239
        self.assertEqual(f.pop(), (0, None))
 
240
 
 
241
    def test_given_list(self):
 
242
        f = asynchat.fifo([b'x', 17, 3])
 
243
        self.assertEqual(len(f), 3)
 
244
        self.assertEqual(f.pop(), (1, b'x'))
 
245
        self.assertEqual(f.pop(), (1, 17))
 
246
        self.assertEqual(f.pop(), (1, 3))
 
247
        self.assertEqual(f.pop(), (0, None))
 
248
 
 
249
 
 
250
def test_main(verbose=None):
 
251
    support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
 
252
                              TestHelperFunctions, TestFifo)
 
253
 
 
254
if __name__ == "__main__":
 
255
    test_main(verbose=True)