~ubuntu-branches/ubuntu/maverick/python3.1/maverick

« back to all changes in this revision

Viewing changes to Lib/test/test_socketserver.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-03-23 00:01:27 UTC
  • Revision ID: james.westby@ubuntu.com-20090323000127-5fstfxju4ufrhthq
Tags: upstream-3.1~a1+20090322
ImportĀ upstreamĀ versionĀ 3.1~a1+20090322

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
Test suite for socketserver.
 
3
"""
 
4
 
 
5
import contextlib
 
6
import errno
 
7
import imp
 
8
import os
 
9
import select
 
10
import signal
 
11
import socket
 
12
import tempfile
 
13
import threading
 
14
import time
 
15
import unittest
 
16
import socketserver
 
17
 
 
18
import test.support
 
19
from test.support import reap_children, verbose, TestSkipped
 
20
from test.support import TESTFN as TEST_FILE
 
21
 
 
22
test.support.requires("network")
 
23
 
 
24
TEST_STR = b"hello world\n"
 
25
HOST = test.support.HOST
 
26
 
 
27
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
 
28
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
 
29
 
 
30
def signal_alarm(n):
 
31
    """Call signal.alarm when it exists (i.e. not on Windows)."""
 
32
    if hasattr(signal, 'alarm'):
 
33
        signal.alarm(n)
 
34
 
 
35
def receive(sock, n, timeout=20):
 
36
    r, w, x = select.select([sock], [], [], timeout)
 
37
    if sock in r:
 
38
        return sock.recv(n)
 
39
    else:
 
40
        raise RuntimeError("timed out on %r" % (sock,))
 
41
 
 
42
if HAVE_UNIX_SOCKETS:
 
43
    class ForkingUnixStreamServer(socketserver.ForkingMixIn,
 
44
                                  socketserver.UnixStreamServer):
 
45
        pass
 
46
 
 
47
    class ForkingUnixDatagramServer(socketserver.ForkingMixIn,
 
48
                                    socketserver.UnixDatagramServer):
 
49
        pass
 
50
 
 
51
 
 
52
@contextlib.contextmanager
 
53
def simple_subprocess(testcase):
 
54
    pid = os.fork()
 
55
    if pid == 0:
 
56
        # Don't throw an exception; it would be caught by the test harness.
 
57
        os._exit(72)
 
58
    yield None
 
59
    pid2, status = os.waitpid(pid, 0)
 
60
    testcase.assertEquals(pid2, pid)
 
61
    testcase.assertEquals(72 << 8, status)
 
62
 
 
63
 
 
64
class SocketServerTest(unittest.TestCase):
 
65
    """Test all socket servers."""
 
66
 
 
67
    def setUp(self):
 
68
        signal_alarm(20)  # Kill deadlocks after 20 seconds.
 
69
        self.port_seed = 0
 
70
        self.test_files = []
 
71
 
 
72
    def tearDown(self):
 
73
        signal_alarm(0)  # Didn't deadlock.
 
74
        reap_children()
 
75
 
 
76
        for fn in self.test_files:
 
77
            try:
 
78
                os.remove(fn)
 
79
            except os.error:
 
80
                pass
 
81
        self.test_files[:] = []
 
82
 
 
83
    def pickaddr(self, proto):
 
84
        if proto == socket.AF_INET:
 
85
            return (HOST, 0)
 
86
        else:
 
87
            # XXX: We need a way to tell AF_UNIX to pick its own name
 
88
            # like AF_INET provides port==0.
 
89
            dir = None
 
90
            if os.name == 'os2':
 
91
                dir = '\socket'
 
92
            fn = tempfile.mktemp(prefix='unix_socket.', dir=dir)
 
93
            if os.name == 'os2':
 
94
                # AF_UNIX socket names on OS/2 require a specific prefix
 
95
                # which can't include a drive letter and must also use
 
96
                # backslashes as directory separators
 
97
                if fn[1] == ':':
 
98
                    fn = fn[2:]
 
99
                if fn[0] in (os.sep, os.altsep):
 
100
                    fn = fn[1:]
 
101
                if os.sep == '/':
 
102
                    fn = fn.replace(os.sep, os.altsep)
 
103
                else:
 
104
                    fn = fn.replace(os.altsep, os.sep)
 
105
            self.test_files.append(fn)
 
106
            return fn
 
107
 
 
108
    def make_server(self, addr, svrcls, hdlrbase):
 
109
        class MyServer(svrcls):
 
110
            def handle_error(self, request, client_address):
 
111
                self.close_request(request)
 
112
                self.server_close()
 
113
                raise
 
114
 
 
115
        class MyHandler(hdlrbase):
 
116
            def handle(self):
 
117
                line = self.rfile.readline()
 
118
                self.wfile.write(line)
 
119
 
 
120
        if verbose: print("creating server")
 
121
        server = MyServer(addr, MyHandler)
 
122
        self.assertEquals(server.server_address, server.socket.getsockname())
 
123
        return server
 
124
 
 
125
    def run_server(self, svrcls, hdlrbase, testfunc):
 
126
        server = self.make_server(self.pickaddr(svrcls.address_family),
 
127
                                  svrcls, hdlrbase)
 
128
        # We had the OS pick a port, so pull the real address out of
 
129
        # the server.
 
130
        addr = server.server_address
 
131
        if verbose:
 
132
            print("ADDR =", addr)
 
133
            print("CLASS =", svrcls)
 
134
 
 
135
        t = threading.Thread(
 
136
            name='%s serving' % svrcls,
 
137
            target=server.serve_forever,
 
138
            # Short poll interval to make the test finish quickly.
 
139
            # Time between requests is short enough that we won't wake
 
140
            # up spuriously too many times.
 
141
            kwargs={'poll_interval':0.01})
 
142
        t.daemon = True  # In case this function raises.
 
143
        t.start()
 
144
        if verbose: print("server running")
 
145
        for i in range(3):
 
146
            if verbose: print("test client", i)
 
147
            testfunc(svrcls.address_family, addr)
 
148
        if verbose: print("waiting for server")
 
149
        server.shutdown()
 
150
        t.join()
 
151
        if verbose: print("done")
 
152
 
 
153
    def stream_examine(self, proto, addr):
 
154
        s = socket.socket(proto, socket.SOCK_STREAM)
 
155
        s.connect(addr)
 
156
        s.sendall(TEST_STR)
 
157
        buf = data = receive(s, 100)
 
158
        while data and b'\n' not in buf:
 
159
            data = receive(s, 100)
 
160
            buf += data
 
161
        self.assertEquals(buf, TEST_STR)
 
162
        s.close()
 
163
 
 
164
    def dgram_examine(self, proto, addr):
 
165
        s = socket.socket(proto, socket.SOCK_DGRAM)
 
166
        s.sendto(TEST_STR, addr)
 
167
        buf = data = receive(s, 100)
 
168
        while data and b'\n' not in buf:
 
169
            data = receive(s, 100)
 
170
            buf += data
 
171
        self.assertEquals(buf, TEST_STR)
 
172
        s.close()
 
173
 
 
174
    def test_TCPServer(self):
 
175
        self.run_server(socketserver.TCPServer,
 
176
                        socketserver.StreamRequestHandler,
 
177
                        self.stream_examine)
 
178
 
 
179
    def test_ThreadingTCPServer(self):
 
180
        self.run_server(socketserver.ThreadingTCPServer,
 
181
                        socketserver.StreamRequestHandler,
 
182
                        self.stream_examine)
 
183
 
 
184
    if HAVE_FORKING:
 
185
        def test_ForkingTCPServer(self):
 
186
            with simple_subprocess(self):
 
187
                self.run_server(socketserver.ForkingTCPServer,
 
188
                                socketserver.StreamRequestHandler,
 
189
                                self.stream_examine)
 
190
 
 
191
    if HAVE_UNIX_SOCKETS:
 
192
        def test_UnixStreamServer(self):
 
193
            self.run_server(socketserver.UnixStreamServer,
 
194
                            socketserver.StreamRequestHandler,
 
195
                            self.stream_examine)
 
196
 
 
197
        def test_ThreadingUnixStreamServer(self):
 
198
            self.run_server(socketserver.ThreadingUnixStreamServer,
 
199
                            socketserver.StreamRequestHandler,
 
200
                            self.stream_examine)
 
201
 
 
202
        if HAVE_FORKING:
 
203
            def test_ForkingUnixStreamServer(self):
 
204
                with simple_subprocess(self):
 
205
                    self.run_server(ForkingUnixStreamServer,
 
206
                                    socketserver.StreamRequestHandler,
 
207
                                    self.stream_examine)
 
208
 
 
209
    def test_UDPServer(self):
 
210
        self.run_server(socketserver.UDPServer,
 
211
                        socketserver.DatagramRequestHandler,
 
212
                        self.dgram_examine)
 
213
 
 
214
    def test_ThreadingUDPServer(self):
 
215
        self.run_server(socketserver.ThreadingUDPServer,
 
216
                        socketserver.DatagramRequestHandler,
 
217
                        self.dgram_examine)
 
218
 
 
219
    if HAVE_FORKING:
 
220
        def test_ForkingUDPServer(self):
 
221
            with simple_subprocess(self):
 
222
                self.run_server(socketserver.ForkingUDPServer,
 
223
                                socketserver.DatagramRequestHandler,
 
224
                                self.dgram_examine)
 
225
 
 
226
    # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
 
227
    # client address so this cannot work:
 
228
 
 
229
    # if HAVE_UNIX_SOCKETS:
 
230
    #     def test_UnixDatagramServer(self):
 
231
    #         self.run_server(socketserver.UnixDatagramServer,
 
232
    #                         socketserver.DatagramRequestHandler,
 
233
    #                         self.dgram_examine)
 
234
    #
 
235
    #     def test_ThreadingUnixDatagramServer(self):
 
236
    #         self.run_server(socketserver.ThreadingUnixDatagramServer,
 
237
    #                         socketserver.DatagramRequestHandler,
 
238
    #                         self.dgram_examine)
 
239
    #
 
240
    #     if HAVE_FORKING:
 
241
    #         def test_ForkingUnixDatagramServer(self):
 
242
    #             self.run_server(socketserver.ForkingUnixDatagramServer,
 
243
    #                             socketserver.DatagramRequestHandler,
 
244
    #                             self.dgram_examine)
 
245
 
 
246
 
 
247
def test_main():
 
248
    if imp.lock_held():
 
249
        # If the import lock is held, the threads will hang
 
250
        raise TestSkipped("can't run when import lock is held")
 
251
 
 
252
    test.support.run_unittest(SocketServerTest)
 
253
 
 
254
if __name__ == "__main__":
 
255
    test_main()
 
256
    signal_alarm(3)  # Shutdown shouldn't take more than 3 seconds.