40
40
raise RuntimeError, "timed out on %r" % (sock,)
42
42
if HAVE_UNIX_SOCKETS:
43
class ForkingUnixStreamServer(socketserver.ForkingMixIn,
44
socketserver.UnixStreamServer):
43
class ForkingUnixStreamServer(SocketServer.ForkingMixIn,
44
SocketServer.UnixStreamServer):
47
class ForkingUnixDatagramServer(socketserver.ForkingMixIn,
48
socketserver.UnixDatagramServer):
47
class ForkingUnixDatagramServer(SocketServer.ForkingMixIn,
48
SocketServer.UnixDatagramServer):
174
174
def test_TCPServer(self):
175
self.run_server(socketserver.TCPServer,
176
socketserver.StreamRequestHandler,
175
self.run_server(SocketServer.TCPServer,
176
SocketServer.StreamRequestHandler,
177
177
self.stream_examine)
179
179
def test_ThreadingTCPServer(self):
180
self.run_server(socketserver.ThreadingTCPServer,
181
socketserver.StreamRequestHandler,
180
self.run_server(SocketServer.ThreadingTCPServer,
181
SocketServer.StreamRequestHandler,
182
182
self.stream_examine)
185
185
def test_ForkingTCPServer(self):
186
186
with simple_subprocess(self):
187
self.run_server(socketserver.ForkingTCPServer,
188
socketserver.StreamRequestHandler,
187
self.run_server(SocketServer.ForkingTCPServer,
188
SocketServer.StreamRequestHandler,
189
189
self.stream_examine)
191
191
if HAVE_UNIX_SOCKETS:
192
192
def test_UnixStreamServer(self):
193
self.run_server(socketserver.UnixStreamServer,
194
socketserver.StreamRequestHandler,
193
self.run_server(SocketServer.UnixStreamServer,
194
SocketServer.StreamRequestHandler,
195
195
self.stream_examine)
197
197
def test_ThreadingUnixStreamServer(self):
198
self.run_server(socketserver.ThreadingUnixStreamServer,
199
socketserver.StreamRequestHandler,
198
self.run_server(SocketServer.ThreadingUnixStreamServer,
199
SocketServer.StreamRequestHandler,
200
200
self.stream_examine)
203
203
def test_ForkingUnixStreamServer(self):
204
204
with simple_subprocess(self):
205
205
self.run_server(ForkingUnixStreamServer,
206
socketserver.StreamRequestHandler,
206
SocketServer.StreamRequestHandler,
207
207
self.stream_examine)
209
209
def test_UDPServer(self):
210
self.run_server(socketserver.UDPServer,
211
socketserver.DatagramRequestHandler,
210
self.run_server(SocketServer.UDPServer,
211
SocketServer.DatagramRequestHandler,
212
212
self.dgram_examine)
214
214
def test_ThreadingUDPServer(self):
215
self.run_server(socketserver.ThreadingUDPServer,
216
socketserver.DatagramRequestHandler,
215
self.run_server(SocketServer.ThreadingUDPServer,
216
SocketServer.DatagramRequestHandler,
217
217
self.dgram_examine)
220
220
def test_ForkingUDPServer(self):
221
221
with simple_subprocess(self):
222
self.run_server(socketserver.ForkingUDPServer,
223
socketserver.DatagramRequestHandler,
222
self.run_server(SocketServer.ForkingUDPServer,
223
SocketServer.DatagramRequestHandler,
224
224
self.dgram_examine)
226
226
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
229
229
# if HAVE_UNIX_SOCKETS:
230
230
# def test_UnixDatagramServer(self):
231
# self.run_server(socketserver.UnixDatagramServer,
232
# socketserver.DatagramRequestHandler,
231
# self.run_server(SocketServer.UnixDatagramServer,
232
# SocketServer.DatagramRequestHandler,
233
233
# self.dgram_examine)
235
235
# def test_ThreadingUnixDatagramServer(self):
236
# self.run_server(socketserver.ThreadingUnixDatagramServer,
237
# socketserver.DatagramRequestHandler,
236
# self.run_server(SocketServer.ThreadingUnixDatagramServer,
237
# SocketServer.DatagramRequestHandler,
238
238
# self.dgram_examine)
240
240
# if HAVE_FORKING:
241
241
# def test_ForkingUnixDatagramServer(self):
242
# self.run_server(socketserver.ForkingUnixDatagramServer,
243
# socketserver.DatagramRequestHandler,
242
# self.run_server(SocketServer.ForkingUnixDatagramServer,
243
# SocketServer.DatagramRequestHandler,
244
244
# self.dgram_examine)