Rev 5274: Merge propagate-exceptions into http-leaks in file:///home/vila/src/bzr/experimental/leaking-tests/

Vincent Ladeuil v.ladeuil+lp at free.fr
Mon Jun 7 18:07:06 BST 2010


At file:///home/vila/src/bzr/experimental/leaking-tests/

------------------------------------------------------------
revno: 5274 [merge]
revision-id: v.ladeuil+lp at free.fr-20100607170705-xyb0qz96r51dmwr5
parent: v.ladeuil+lp at free.fr-20100605170611-oyml3ruiamddmv0w
parent: v.ladeuil+lp at free.fr-20100607165530-jnix5v6ic2x7c973
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: http-leaks
timestamp: Mon 2010-06-07 19:07:05 +0200
message:
  Merge propagate-exceptions into http-leaks
modified:
  bzrlib/tests/test_server.py    test_server.py-20100209163834-im1ozfuenfmqaa2m-1
  bzrlib/tests/test_test_server.py test_test_server.py-20100601152414-r8rln0ok7514pcoz-1
-------------- next part --------------
=== modified file 'bzrlib/tests/test_server.py'
--- a/bzrlib/tests/test_server.py	2010-06-05 16:26:37 +0000
+++ b/bzrlib/tests/test_server.py	2010-06-07 17:07:05 +0000
@@ -252,10 +252,29 @@
         super(ThreadWithException, self).__init__(*args, **kwargs)
         self.set_event(event)
         self.exception = None
+        self.ignored_exceptions = None # see set_ignored_exceptions
 
     def set_event(self, event):
         self.ready = event
 
+    def set_ignored_exceptions(self, ignored):
+        """Declare which exceptions will be ignored.
+
+        :param ignored: Can be either:
+           - None: all exceptions will be raised,
+           - an exception class: the instances of this class will be ignored,
+           - a tuple of exception classes: the instances of any class of the
+             list will be ignored,
+           - a callable: that will be passed the exception object
+             and should return True if the exception should be ignored
+        """
+        if ignored is None:
+            self.ignored_exceptions = None
+        elif isinstance(ignored, (Exception, tuple)):
+            self.ignored_exceptions = lambda e: isinstance(e, ignored)
+        else:
+            self.ignored_exceptions = ignored
+
     def run(self):
         """Overrides Thread.run to capture any exception."""
         self.ready.clear()
@@ -282,8 +301,11 @@
         super(ThreadWithException, self).join(timeout)
         if self.exception is not None:
             exc_class, exc_value, exc_tb = self.exception
-            self.execption = None # The exception should be raised only once
-            raise exc_class, exc_value, exc_tb
+            self.exception = None # The exception should be raised only once
+            if (self.ignored_exceptions is None
+                or not self.ignored_exceptions(exc_value)):
+                # Raise non ignored exceptions
+                raise exc_class, exc_value, exc_tb
         if timeout and self.isAlive():
             # The timeout expired without joining the thread, the thread is
             # therefore stucked and that's a failure as far as the test is
@@ -314,6 +336,7 @@
         # We collect the resources used by the clients so we can release them
         # when shutting down
         self.clients = []
+        self.ignored_exceptions = None
 
     def server_bind(self):
         # We need to override the SocketServer bind, yet, we still want to use
@@ -329,13 +352,29 @@
         self.stopped.clear()
         # We are listening and ready to accept connections
         self.started.set()
-        while self.serving.isSet():
-            # Really a connection but the python framework is generic and
-            # call them requests
-            self.handle_request()
-        # Let's close the listening socket
-        self.server_close()
-        self.stopped.set()
+        try:
+            while self.serving.isSet():
+                # Really a connection but the python framework is generic and
+                # call them requests
+                self.handle_request()
+            # Let's close the listening socket
+            self.server_close()
+        finally:
+            self.stopped.set()
+
+    def handle_request(self):
+        """Handle one request.
+
+        The python version swallows some socket exceptions and we don't use
+        timeout, so we override it to better control the server behavior.
+        """
+        request, client_address = self.get_request()
+        if self.verify_request(request, client_address):
+            try:
+                self.process_request(request, client_address)
+            except:
+                self.handle_error(request, client_address)
+                self.close_request(request)
 
     def handle_request(self):
         """Handle one request.
@@ -366,6 +405,16 @@
 #        self.sibling_class.handle_error(self, request, client_address)
         raise
 
+    def ignored_exceptions_during_shutdown(self, e):
+        if sys.platform == 'win32':
+            accepted_errnos = [errno.EBADF, errno.WSAEBADF, errno.WSAENOTCONN,
+                               errno.WSAECONNRESET, errno.WSAESHUTDOWN]
+        else:
+            accepted_errnos = [errno.EBADF, errno.ENOTCONN, errno.ECONNRESET]
+        if isinstance(e, socket.error) and e[0] in accepted_errnos:
+            return True
+        return False
+
     # The following methods are called by the main thread
 
     def stop_client_connections(self):
@@ -373,28 +422,34 @@
             c = self.clients.pop()
             self.shutdown_client(c)
 
-    def shutdown_client_socket(self, sock):
-        """Properly shutdown a client socket.
-
-        Under some circumstances (as in bug #383920), we need to force the
-        shutdown as python delays it until gc occur otherwise and the client
-        may hang.
+    def shutdown_socket(self, sock):
+        """Properly shutdown a socket.
 
         This should be called only when no other thread is trying to use the
         socket.
         """
         try:
-            # The request process has been completed, the thread is about to
-            # die, let's shutdown the socket if we can.
             sock.shutdown(socket.SHUT_RDWR)
             sock.close()
-        except (socket.error, select.error), e:
-            if e[0] in (errno.EBADF, errno.ENOTCONN):
-                # Right, the socket is already down
+        except Exception, e:
+            if self.ignored_exceptions(e):
                 pass
             else:
                 raise
 
+    # The following methods are called by the main thread
+
+    def set_ignored_exceptions(self, thread, ignored_exceptions):
+        self.ignored_exceptions = ignored_exceptions
+        thread.set_ignored_exceptions(self.ignored_exceptions)
+
+    def _pending_exception(self, thread):
+        """Raise server uncaught exception.
+
+        Daughter classes can override this if they use daughter threads.
+        """
+        thread.pending_exception()
+
 
 class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
 
@@ -413,8 +468,7 @@
 
     def shutdown_client(self, client):
         sock, addr = client
-        self.shutdown_client_socket(sock)
-
+        self.shutdown_socket(sock)
 
 
 class TestingThreadingTCPServer(TestingTCPServerMixin,
@@ -447,10 +501,13 @@
             event=stopped,
             target = self.process_request_thread,
             args = (started, stopped, request, client_address))
-        t.name = '%s -> %s' % (client_address, self.server_address)
         # Update the client description
         self.clients.pop()
         self.clients.append((request, client_address, t))
+        # Propagate the exception handler since we must the same one for
+        # connections running in their own threads than TestingTCPServer.
+        t.set_ignored_exceptions(self.ignored_exceptions)
+        t.name = '%s -> %s' % (client_address, self.server_address)
         t.start()
         started.wait()
         # If an exception occured during the thread start, it will get raised.
@@ -459,14 +516,28 @@
     # The following methods are called by the main thread
 
     def shutdown_client(self, client):
-        sock, addr, t = client
-        self.shutdown_client_socket(sock)
-        if t is not None:
+        sock, addr, connection_thread = client
+        self.shutdown_socket(sock)
+        if connection_thread is not None:
             # The thread has been created only if the request is processed but
             # after the connection is inited. This could happen during server
             # shutdown. If an exception occurred in the thread it will be
             # re-raised
-            t.join()
+            connection_thread.join()
+
+    def set_ignored_exceptions(self, thread, ignored_exceptions):
+        TestingTCPServerMixin.set_ignored_exceptions(self, thread,
+                                                     ignored_exceptions)
+        for sock, addr, connection_thread in self.clients:
+            if connection_thread is not None:
+                connection_thread.set_ignored_exceptions(
+                    self.ignored_exceptions)
+
+    def _pending_exception(self, thread):
+        for sock, addr, connection_thread in self.clients:
+            if connection_thread is not None:
+                connection_thread.pending_exception()
+        TestingTCPServerMixin._pending_exception(self, thread)
 
 
 class TestingTCPServerInAThread(transport.Server):
@@ -477,6 +548,7 @@
         self.request_handler_class = request_handler_class
         self.host, self.port = server_address
         self.server = None
+        self._server_thread = None
 
     def __repr__(self):
         return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
@@ -512,6 +584,8 @@
             # The server has been started successfully, shut it down now.  As
             # soon as we stop serving, no more connection are accepted except
             # one to get out of the blocking listen.
+            self.set_ignored_exceptions(
+                self.server.ignored_exceptions_during_shutdown)
             self.server.serving.clear()
             # The server is listening for a last connection, let's give it:
             last_conn = None
@@ -534,13 +608,28 @@
                 last_conn.close()
             # Check for any exception that could have occurred in the server
             # thread
-            self._server_thread.join()
+            try:
+                self._server_thread.join()
+            except Exception, e:
+                if self.server.ignored_exceptions(e):
+                    pass
+                else:
+                    raise
         finally:
             # Make sure we can be called twice safely, note that this means
             # that we will raise a single exception even if several occurred in
             # the various threads involved.
             self.server = None
 
+    def set_ignored_exceptions(self, ignored_exceptions):
+        """Install an exception handler for the server."""
+        self.server.set_ignored_exceptions(self._server_thread,
+                                           ignored_exceptions)
+
+    def pending_exception(self):
+        """Raise uncaught exception in the server."""
+        self.server._pending_exception(self._server_thread)
+
 
 class SmartTCPServer_for_testing(server.SmartTCPServer):
     """Server suitable for use by transport tests.

=== modified file 'bzrlib/tests/test_test_server.py'
--- a/bzrlib/tests/test_test_server.py	2010-06-05 10:25:45 +0000
+++ b/bzrlib/tests/test_test_server.py	2010-06-07 17:07:05 +0000
@@ -17,6 +17,7 @@
 import errno
 import socket
 import SocketServer
+import threading
 
 from bzrlib import (
     osutils,
@@ -117,7 +118,6 @@
     def test_start_stop(self):
         server = self.get_server()
         client = self.get_client()
-        client.connect((server.host, server.port))
         server.stop_server()
         # since the server doesn't accept connections anymore attempting to
         # connect should fail
@@ -147,14 +147,14 @@
         self.assertRaises(CantStart,
                           self.get_server, server_class=CantStartServer)
 
-    def test_server_fails_while_serving_or_stoping(self):
-        class ServerFailure(Exception):
+    def test_server_fails_while_serving_or_stopping(self):
+        class CantConnect(Exception):
             pass
 
         class FailingConnectionHandler(TCPConnectionHandler):
 
             def handle(self):
-                raise ServerFailure()
+                raise CantConnect()
 
         server = self.get_server(
             connection_handler_class=FailingConnectionHandler)
@@ -171,7 +171,53 @@
             client.read()
         except socket.error:
             pass
-        # Now the server has raise the exception in its own thread
-        self.assertRaises(ServerFailure, server.stop_server)
-
-
+        # Now the server has raised the exception in its own thread
+        self.assertRaises(CantConnect, server.stop_server)
+
+    def test_server_crash_while_responding(self):
+        sync = threading.Event()
+        sync.clear()
+        class FailToRespond(Exception):
+            pass
+
+        class FailingDuringResponseHandler(TCPConnectionHandler):
+
+            def handle_connection(self):
+                req = self.rfile.readline()
+                threading.currentThread().set_event(sync)
+                raise FailToRespond()
+
+        server = self.get_server(
+            connection_handler_class=FailingDuringResponseHandler)
+        client = self.get_client()
+        client.connect((server.host, server.port))
+        client.write('ping\n')
+        sync.wait()
+        self.assertRaises(FailToRespond, server.pending_exception)
+
+    def test_exception_swallowed_while_serving(self):
+        sync = threading.Event()
+        sync.clear()
+        class CantServe(Exception):
+            pass
+
+        class FailingWhileServingConnectionHandler(TCPConnectionHandler):
+
+            def handle(self):
+                # We want to sync with the thread that is serving the
+                # connection.
+                threading.currentThread().set_event(sync)
+                raise CantServe()
+
+        server = self.get_server(
+            connection_handler_class=FailingWhileServingConnectionHandler)
+        # Install the exception swallower
+        server.set_ignored_exceptions(CantServe)
+        client = self.get_client()
+        # Connect to the server so the exception is raised there
+        client.connect((server.host, server.port))
+        # Wait for the exception to propagate.
+        sync.wait()
+        # The connection wasn't served properly but the exception should have
+        # been swallowed.
+        server.pending_exception()



More information about the bazaar-commits mailing list