223
242
raise NotImplementedError
226
class SmartTCPServer_for_testing(server.SmartTCPServer):
245
class ThreadWithException(threading.Thread):
246
"""A catching exception thread.
248
If an exception occurs during the thread execution, it's caught and
249
re-raised when the thread is joined().
252
def __init__(self, *args, **kwargs):
253
# There are cases where the calling thread must wait, yet, if an
254
# exception occurs, the event should be set so the caller is not
255
# blocked. The main example is a calling thread that want to wait for
256
# the called thread to be in a given state before continuing.
258
event = kwargs.pop('event')
260
# If the caller didn't pass a specific event, create our own
261
event = threading.Event()
262
super(ThreadWithException, self).__init__(*args, **kwargs)
263
self.set_ready_event(event)
264
self.exception = None
265
self.ignored_exceptions = None # see set_ignored_exceptions
267
# compatibility thunk for python-2.4 and python-2.5...
268
if sys.version_info < (2, 6):
269
name = property(threading.Thread.getName, threading.Thread.setName)
271
def set_ready_event(self, event):
272
"""Set the ``ready`` event used to synchronize exception catching.
274
When the thread uses an event to synchronize itself with another thread
275
(setting it when the other thread can wake up from a ``wait`` call),
276
the event must be set after catching an exception or the other thread
279
Some threads require multiple events and should set the relevant one
284
def set_ignored_exceptions(self, ignored):
285
"""Declare which exceptions will be ignored.
287
:param ignored: Can be either:
288
- None: all exceptions will be raised,
289
- an exception class: the instances of this class will be ignored,
290
- a tuple of exception classes: the instances of any class of the
291
list will be ignored,
292
- a callable: that will be passed the exception object
293
and should return True if the exception should be ignored
296
self.ignored_exceptions = None
297
elif isinstance(ignored, (Exception, tuple)):
298
self.ignored_exceptions = lambda e: isinstance(e, ignored)
300
self.ignored_exceptions = ignored
303
"""Overrides Thread.run to capture any exception."""
307
super(ThreadWithException, self).run()
309
self.exception = sys.exc_info()
311
# Make sure the calling thread is released
315
def join(self, timeout=5):
316
"""Overrides Thread.join to raise any exception caught.
319
Calling join(timeout=0) will raise the caught exception or return None
320
if the thread is still alive.
322
The default timeout is set to 5 and should expire only when a thread
323
serving a client connection is hung.
325
super(ThreadWithException, self).join(timeout)
326
if self.exception is not None:
327
exc_class, exc_value, exc_tb = self.exception
328
self.exception = None # The exception should be raised only once
329
if (self.ignored_exceptions is None
330
or not self.ignored_exceptions(exc_value)):
331
# Raise non ignored exceptions
332
raise exc_class, exc_value, exc_tb
333
if timeout and self.isAlive():
334
# The timeout expired without joining the thread, the thread is
335
# therefore stucked and that's a failure as far as the test is
336
# concerned. We used to hang here.
338
# FIXME: we need to kill the thread, but as far as the test is
339
# concerned, raising an assertion is too strong. On most of the
340
# platforms, this doesn't occur, so just mentioning the problem is
341
# enough for now -- vila 2010824
342
sys.stderr.write('thread %s hung\n' % (self.name,))
343
#raise AssertionError('thread %s hung' % (self.name,))
345
def pending_exception(self):
346
"""Raise the caught exception.
348
This does nothing if no exception occurred.
353
class TestingTCPServerMixin:
354
"""Mixin to support running SocketServer.TCPServer in a thread.
356
Tests are connecting from the main thread, the server has to be run in a
361
self.started = threading.Event()
363
self.stopped = threading.Event()
364
# We collect the resources used by the clients so we can release them
367
self.ignored_exceptions = None
369
def server_bind(self):
370
self.socket.bind(self.server_address)
371
self.server_address = self.socket.getsockname()
376
# We are listening and ready to accept connections
380
# Really a connection but the python framework is generic and
382
self.handle_request()
383
# Let's close the listening socket
388
def handle_request(self):
389
"""Handle one request.
391
The python version swallows some socket exceptions and we don't use
392
timeout, so we override it to better control the server behavior.
394
request, client_address = self.get_request()
395
if self.verify_request(request, client_address):
397
self.process_request(request, client_address)
399
self.handle_error(request, client_address)
400
self.close_request(request)
402
def get_request(self):
403
return self.socket.accept()
405
def verify_request(self, request, client_address):
406
"""Verify the request.
408
Return True if we should proceed with this request, False if we should
409
not even touch a single byte in the socket ! This is useful when we
410
stop the server with a dummy last connection.
414
def handle_error(self, request, client_address):
415
# Stop serving and re-raise the last exception seen
417
# The following can be used for debugging purposes, it will display the
418
# exception and the traceback just when it occurs instead of waiting
419
# for the thread to be joined.
421
# SocketServer.BaseServer.handle_error(self, request, client_address)
424
def ignored_exceptions_during_shutdown(self, e):
425
if sys.platform == 'win32':
426
accepted_errnos = [errno.EBADF,
434
accepted_errnos = [errno.EBADF,
439
if isinstance(e, socket.error) and e[0] in accepted_errnos:
443
# The following methods are called by the main thread
445
def stop_client_connections(self):
447
c = self.clients.pop()
448
self.shutdown_client(c)
450
def shutdown_socket(self, sock):
451
"""Properly shutdown a socket.
453
This should be called only when no other thread is trying to use the
457
sock.shutdown(socket.SHUT_RDWR)
460
if self.ignored_exceptions(e):
465
# The following methods are called by the main thread
467
def set_ignored_exceptions(self, thread, ignored_exceptions):
468
self.ignored_exceptions = ignored_exceptions
469
thread.set_ignored_exceptions(self.ignored_exceptions)
471
def _pending_exception(self, thread):
472
"""Raise server uncaught exception.
474
Daughter classes can override this if they use daughter threads.
476
thread.pending_exception()
479
class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
481
def __init__(self, server_address, request_handler_class):
482
TestingTCPServerMixin.__init__(self)
483
SocketServer.TCPServer.__init__(self, server_address,
484
request_handler_class)
486
def get_request(self):
487
"""Get the request and client address from the socket."""
488
sock, addr = TestingTCPServerMixin.get_request(self)
489
self.clients.append((sock, addr))
492
# The following methods are called by the main thread
494
def shutdown_client(self, client):
496
self.shutdown_socket(sock)
499
class TestingThreadingTCPServer(TestingTCPServerMixin,
500
SocketServer.ThreadingTCPServer):
502
def __init__(self, server_address, request_handler_class):
503
TestingTCPServerMixin.__init__(self)
504
SocketServer.ThreadingTCPServer.__init__(self, server_address,
505
request_handler_class)
507
def get_request (self):
508
"""Get the request and client address from the socket."""
509
sock, addr = TestingTCPServerMixin.get_request(self)
510
# The thread is not create yet, it will be updated in process_request
511
self.clients.append((sock, addr, None))
514
def process_request_thread(self, started, stopped, request, client_address):
516
SocketServer.ThreadingTCPServer.process_request_thread(
517
self, request, client_address)
518
self.close_request(request)
521
def process_request(self, request, client_address):
522
"""Start a new thread to process the request."""
523
started = threading.Event()
524
stopped = threading.Event()
525
t = ThreadWithException(
527
name='%s -> %s' % (client_address, self.server_address),
528
target = self.process_request_thread,
529
args = (started, stopped, request, client_address))
530
# Update the client description
532
self.clients.append((request, client_address, t))
533
# Propagate the exception handler since we must use the same one as
534
# TestingTCPServer for connections running in their own threads.
535
t.set_ignored_exceptions(self.ignored_exceptions)
539
sys.stderr.write('Client thread %s started\n' % (t.name,))
540
# If an exception occured during the thread start, it will get raised.
541
t.pending_exception()
543
# The following methods are called by the main thread
545
def shutdown_client(self, client):
546
sock, addr, connection_thread = client
547
self.shutdown_socket(sock)
548
if connection_thread is not None:
549
# The thread has been created only if the request is processed but
550
# after the connection is inited. This could happen during server
551
# shutdown. If an exception occurred in the thread it will be
554
sys.stderr.write('Client thread %s will be joined\n'
555
% (connection_thread.name,))
556
connection_thread.join()
558
def set_ignored_exceptions(self, thread, ignored_exceptions):
559
TestingTCPServerMixin.set_ignored_exceptions(self, thread,
561
for sock, addr, connection_thread in self.clients:
562
if connection_thread is not None:
563
connection_thread.set_ignored_exceptions(
564
self.ignored_exceptions)
566
def _pending_exception(self, thread):
567
for sock, addr, connection_thread in self.clients:
568
if connection_thread is not None:
569
connection_thread.pending_exception()
570
TestingTCPServerMixin._pending_exception(self, thread)
573
class TestingTCPServerInAThread(transport.Server):
574
"""A server in a thread that re-raise thread exceptions."""
576
def __init__(self, server_address, server_class, request_handler_class):
577
self.server_class = server_class
578
self.request_handler_class = request_handler_class
579
self.host, self.port = server_address
581
self._server_thread = None
584
return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
586
def create_server(self):
587
return self.server_class((self.host, self.port),
588
self.request_handler_class)
590
def start_server(self):
591
self.server = self.create_server()
592
self._server_thread = ThreadWithException(
593
event=self.server.started,
594
target=self.run_server)
595
self._server_thread.start()
596
# Wait for the server thread to start (i.e release the lock)
597
self.server.started.wait()
598
# Get the real address, especially the port
599
self.host, self.port = self.server.server_address
600
self._server_thread.name = self.server.server_address
602
sys.stderr.write('Server thread %s started\n'
603
% (self._server_thread.name,))
604
# If an exception occured during the server start, it will get raised,
605
# otherwise, the server is blocked on its accept() call.
606
self._server_thread.pending_exception()
607
# From now on, we'll use a different event to ensure the server can set
609
self._server_thread.set_ready_event(self.server.stopped)
611
def run_server(self):
614
def stop_server(self):
615
if self.server is None:
618
# The server has been started successfully, shut it down now. As
619
# soon as we stop serving, no more connection are accepted except
620
# one to get out of the blocking listen.
621
self.set_ignored_exceptions(
622
self.server.ignored_exceptions_during_shutdown)
623
self.server.serving = False
625
sys.stderr.write('Server thread %s will be joined\n'
626
% (self._server_thread.name,))
627
# The server is listening for a last connection, let's give it:
630
last_conn = osutils.connect_socket((self.host, self.port))
631
except socket.error, e:
632
# But ignore connection errors as the point is to unblock the
633
# server thread, it may happen that it's not blocked or even
636
# We start shutting down the clients while the server itself is
638
self.server.stop_client_connections()
639
# Now we wait for the thread running self.server.serve() to finish
640
self.server.stopped.wait()
641
if last_conn is not None:
642
# Close the last connection without trying to use it. The
643
# server will not process a single byte on that socket to avoid
644
# complications (SSL starts with a handshake for example).
646
# Check for any exception that could have occurred in the server
649
self._server_thread.join()
651
if self.server.ignored_exceptions(e):
656
# Make sure we can be called twice safely, note that this means
657
# that we will raise a single exception even if several occurred in
658
# the various threads involved.
661
def set_ignored_exceptions(self, ignored_exceptions):
662
"""Install an exception handler for the server."""
663
self.server.set_ignored_exceptions(self._server_thread,
666
def pending_exception(self):
667
"""Raise uncaught exception in the server."""
668
self.server._pending_exception(self._server_thread)
671
class TestingSmartConnectionHandler(SocketServer.BaseRequestHandler,
672
medium.SmartServerSocketStreamMedium):
674
def __init__(self, request, client_address, server):
675
medium.SmartServerSocketStreamMedium.__init__(
676
self, request, server.backing_transport,
677
server.root_client_path)
678
request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
679
SocketServer.BaseRequestHandler.__init__(self, request, client_address,
683
while not self.finished:
684
server_protocol = self._build_protocol()
685
self._serve_one_request(server_protocol)
688
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):
690
def __init__(self, server_address, request_handler_class,
691
backing_transport, root_client_path):
692
TestingThreadingTCPServer.__init__(self, server_address,
693
request_handler_class)
694
server.SmartTCPServer.__init__(self, backing_transport,
697
# FIXME: No test are exercising the hooks for the test server
699
self.run_server_started_hooks()
701
TestingThreadingTCPServer.serve(self)
703
self.run_server_stopped_hooks()
706
"""Return the url of the server"""
707
return "bzr://%s:%d/" % self.server_address
710
class SmartTCPServer_for_testing(TestingTCPServerInAThread):
227
711
"""Server suitable for use by transport tests.
229
713
This server is backed by the process's cwd.
232
715
def __init__(self, thread_name_suffix=''):
233
super(SmartTCPServer_for_testing, self).__init__(None)
234
716
self.client_path_extra = None
235
717
self.thread_name_suffix = thread_name_suffix
237
def get_backing_transport(self, backing_transport_server):
238
"""Get a backing transport from a server we are decorating."""
239
return transport.get_transport(backing_transport_server.get_url())
718
self.host = '127.0.0.1'
720
super(SmartTCPServer_for_testing, self).__init__(
721
(self.host, self.port),
723
TestingSmartConnectionHandler)
725
def create_server(self):
726
return self.server_class((self.host, self.port),
727
self.request_handler_class,
728
self.backing_transport,
729
self.root_client_path)
241
732
def start_server(self, backing_transport_server=None,
242
client_path_extra='/extra/'):
733
client_path_extra='/extra/'):
243
734
"""Set up server for testing.
245
736
:param backing_transport_server: backing server to use. If not