Rev 5266: Really test against a threading server and properly shutdown socket and threads. in file:///home/vila/src/bzr/experimental/leaking-tests/

Vincent Ladeuil v.ladeuil+lp at free.fr
Thu Jun 3 18:40:49 BST 2010


At file:///home/vila/src/bzr/experimental/leaking-tests/

------------------------------------------------------------
revno: 5266
revision-id: v.ladeuil+lp at free.fr-20100603174049-st3ip75ad46j3npv
parent: v.ladeuil+lp at free.fr-20100603161048-n0ir7wz6t7qtycg4
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: propagate-exceptions
timestamp: Thu 2010-06-03 19:40:49 +0200
message:
  Really test against a threading server and properly shutdown socket and threads. 
-------------- next part --------------
=== modified file 'bzrlib/tests/test_server.py'
--- a/bzrlib/tests/test_server.py	2010-06-03 16:10:48 +0000
+++ b/bzrlib/tests/test_server.py	2010-06-03 17:40:49 +0000
@@ -14,6 +14,7 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
+import errno
 import socket
 import SocketServer
 import select
@@ -260,28 +261,40 @@
         self.ready.clear()
         try:
             super(ThreadWithException, self).run()
-        except Exception, e:
+        except:
             self.exception = sys.exc_info()
         finally:
             # Make sure the calling thread is released
             self.ready.set()
 
 
-    def join(self, *args, **kwargs):
+    def join(self, timeout=5):
         """Overrides Thread.join to raise any exception caught.
 
 
         Calling join(timeout=0) will raise the caught exception or return None
         if the thread is still alive.
+
+        The default timeout is set to 5 and should expire only when a thread
+        serving a client connection is hung.
         """
-        # Note that we don't care about the timeout parameter here: either the
-        # thread has raised an exception and it should be raised (and join()
-        # should succeed whatever the timeout is) or it's still alive which
-        # means it didn't encounter an exception.
-        super(ThreadWithException, self).join(*args, **kwargs)
+        super(ThreadWithException, self).join(timeout)
         if self.exception is not None:
             exc_class, exc_value, exc_tb = self.exception
+            self.execption = None # The exception should be raised only once
             raise exc_class, exc_value, exc_tb
+        if timeout and self.isAlive():
+            # The timeout expired without joining the thread, the thread is
+            # therefore stucked and that's a failure as far as the test is
+            # concerned. We used to hang here.
+            raise AssertionError('thread %s hung' % (self.name,))
+
+    def pending_exception(self):
+        """Raise the caught exception.
+
+        This does nothing if no exception occurred.
+        """
+        self.join(timeout=0)
 
 
 class TestingTCPServerMixin:
@@ -336,8 +349,34 @@
         self.serving.clear()
         raise
 
-    def stop_clients(self):
-        pass
+    # The following methods are called by the main thread
+
+    def stop_client_connections(self):
+        while self.clients:
+            c = self.clients.pop()
+            self.shutdown_client(c)
+
+    def shutdown_client_socket(self, sock):
+        """Properly shutdown a client socket.
+
+        Under some circumstances (as in bug #383920), we need to force the
+        shutdown as python delays it until gc occur otherwise and the client
+        may hang.
+
+        This should be called only when no other thread is trying to use the
+        socket.
+        """
+        try:
+            # The request process has been completed, the thread is about to
+            # die, let's shutdown the socket if we can.
+            sock.shutdown(socket.SHUT_RDWR)
+            sock.close()
+        except (socket.error, select.error), e:
+            if e[0] in (errno.EBADF, errno.ENOTCONN):
+                # Right, the socket is already down
+                pass
+            else:
+                raise
 
 
 class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
@@ -355,6 +394,11 @@
 
     # The following methods are called by the main thread
 
+    def shutdown_client(self, client):
+        sock, addr = client
+        self.shutdown_client_socket(sock)
+
+
 
 class TestingThreadingTCPServer(TestingTCPServerMixin,
                                 SocketServer.ThreadingTCPServer):
@@ -371,7 +415,8 @@
         self.clients.append((sock, addr, None))
         return sock, addr
 
-    def process_request_thread(self, stopped, request, client_address):
+    def process_request_thread(self, started, stopped, request, client_address):
+        started.set()
         SocketServer.ThreadingTCPServer.process_request_thread(
             self, request, client_address)
         self.close_request(request)
@@ -379,18 +424,32 @@
 
     def process_request(self, request, client_address):
         """Start a new thread to process the request."""
+        started = threading.Event()
         stopped = threading.Event()
-        t = test_server.ThreadWithException(
+        t = ThreadWithException(
             event=stopped,
             target = self.process_request_thread,
-            args = (stopped, request, client_address))
+            args = (started, stopped, request, client_address))
         t.name = '%s -> %s' % (client_address, self.server_address)
-        if 'threads' in tests.selftest_debug_flags:
-            print 'Thread for: %s started' % (threading.currentThread().name,)
         # Update the client description
         self.clients.pop()
         self.clients.append((request, client_address, t))
         t.start()
+        started.wait()
+        # If an exception occured during the thread start, it will get raised.
+        t.pending_exception()
+
+    # The following methods are called by the main thread
+
+    def shutdown_client(self, client):
+        sock, addr, t = client
+        self.shutdown_client_socket(sock)
+        if t is not None:
+            # The thread has been created only if the request is processed but
+            # after the connection is inited. This could happen during server
+            # shutdown. If an exception occurred in the thread it will be
+            # re-raised
+            t.join()
 
 
 class TestingTCPServerInAThread(object):
@@ -406,13 +465,6 @@
         return self.server_class(self.server_address,
                                  self.request_handler_class)
 
-    def pending_exception(self):
-        """Re-raise the exception raised by the server in its own thread.
-
-        This does nothing if no exception occurred.
-        """
-        self._server_thread.join(timeout=0)
-
     def start_server(self):
         self.server = self.create_server()
         self._server_thread = ThreadWithException(
@@ -422,8 +474,10 @@
         self.server.started.wait()
         # Get the real address, especially the port
         self.server_address = self.server.server_address
-        # If an exception occured during the server start, it will get raised
-        self.pending_exception()
+        self._server_thread.name = self.server_address
+        # If an exception occured during the server start, it will get raised,
+        # otherwise, the server is blocked on its accept() call.
+        self._server_thread.pending_exception()
         # From now on, we'll use a different event to ensure the server can set
         # its exception
         self._server_thread.set_event(self.server.stopped)
@@ -434,44 +488,40 @@
     def stop_server(self):
         if self.server is None:
             return
-        # The server has been started successfully, shut it down now
-        # As soon as we stop serving, no more connection are accepted except
-        # one to get out of the blocking listen.
-        self.server.serving.clear()
-        # The server is listening for a last connection, let's give it:
-        last_conn = None
-        try:
-            last_conn = osutils.connect_socket(self.server.server_address)
-        except socket.error, e:
-            # But ignore connection errors as the point is to unblock the
-            # server thread, it may happen that it's not blocked or even not
-            # started.
-            pass
-        # We don't have to wait for the server to shut down to start shutting
-        # down the clients, so let's start now.
-        self.server.stop_clients()
-        # Now we wait for the thread running self.server.serve() to finish
-        self.server.stopped.wait()
-        if last_conn is not None:
-            # Close the last connection without trying to use it. The server
-            # will not process a single byte on that socket to avoid
-            # complications (SSL starts with a handshake for example).
-            last_conn.close()
-        try:
+        try:
+            # The server has been started successfully, shut it down now.  As
+            # soon as we stop serving, no more connection are accepted except
+            # one to get out of the blocking listen.
+            self.server.serving.clear()
+            # The server is listening for a last connection, let's give it:
+            last_conn = None
+            try:
+                last_conn = osutils.connect_socket(self.server.server_address)
+            except socket.error, e:
+                # But ignore connection errors as the point is to unblock the
+                # server thread, it may happen that it's not blocked or even
+                # not started.
+                pass
+            # We start shutting down the client while the server itself is
+            # shutting down.
+            self.server.stop_client_connections()
+            # Now we wait for the thread running self.server.serve() to finish
+            self.server.stopped.wait()
+            if last_conn is not None:
+                # Close the last connection without trying to use it. The
+                # server will not process a single byte on that socket to avoid
+                # complications (SSL starts with a handshake for example).
+                last_conn.close()
             # Check for any exception that could have occurred in the server
             # thread
             self._server_thread.join()
         finally:
-            # Make sure we can be called twice safely
+            # Make sure we can be called twice safely, note that this means
+            # that we will raise a single exception even if several occurred in
+            # the various threads involved.
             self.server = None
 
 
-class TestingThreadingTCPServerInAThread(TestingTCPServerInAThread):
-    """A socket server in a thread which spawn one thread for each connection"""
-
-    pass
-
-
 class SmartTCPServer_for_testing(server.SmartTCPServer):
     """Server suitable for use by transport tests.
 

=== modified file 'bzrlib/tests/test_test_server.py'
--- a/bzrlib/tests/test_test_server.py	2010-06-03 16:10:48 +0000
+++ b/bzrlib/tests/test_test_server.py	2010-06-03 17:40:49 +0000
@@ -72,17 +72,18 @@
         else:
             raise ValueError('[%s] not understood' % req)
 
-class TestTestingServerInAThread(tests.TestCase):
-
-    server_in_thread_class = test_server.TestingTCPServerInAThread
+
+class TestTCPServerInAThread(tests.TestCase):
+
+    server_class = test_server.TestingTCPServer
 
     def get_server(self, server_class=None, connection_handler_class=None):
-        if server_class is None:
-            server_class = test_server.TestingTCPServer
+        if server_class is not None:
+            self.server_class = server_class
         if connection_handler_class is None:
             connection_handler_class = TCPConnectionHandler
-        server =  self.server_in_thread_class(
-            ('localhost', 0), server_class, connection_handler_class)
+        server =  test_server.TestingTCPServerInAThread(
+            ('localhost', 0), self.server_class, connection_handler_class)
         server.start_server()
         self.addCleanup(server.stop_server)
         return server
@@ -158,7 +159,7 @@
         # Now the server has raise the exception in its own thread
         self.assertRaises(ServerFailure, server.stop_server)
 
-class TestTestingThreadingServerInAThread(TestTestingServerInAThread):
+class TestThreadingTCPServerInAThread(TestTCPServerInAThread):
 
-    server_in_thread_class = test_server.TestingThreadingTCPServerInAThread
+    server_class = test_server.TestingThreadingTCPServer
 



More information about the bazaar-commits mailing list