Rev 4743: Start adding a more precise synchronization between the various test threads. in file:///home/vila/src/bzr/bugs/392127-thread-leak/

Vincent Ladeuil v.ladeuil+lp at free.fr
Sat Oct 10 00:15:22 BST 2009


At file:///home/vila/src/bzr/bugs/392127-thread-leak/

------------------------------------------------------------
revno: 4743
revision-id: v.ladeuil+lp at free.fr-20091009231522-09cs7tqrzi2splv7
parent: v.ladeuil+lp at free.fr-20091009121020-9zlxqc2hz3esbd61
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: 392127-thread-leak
timestamp: Sat 2009-10-10 01:15:22 +0200
message:
  Start adding a more precise synchronization between the various test threads.
  
  * bzrlib/tests/https_server.py:
  (TestingHTTPSServerMixin): Ensure the server is still active
  before ssl-wrapping the socket.
  
  * bzrlib/tests/http_server.py:
  (TestingHTTPServerMixin): Drill down to address the thread
  synchronizations bugs. Introduce a threading.Event 'serving' used
  to properly handle the last connection.
  (TestingHTTPServerMixin.serve): Use the 'serving' event, delegate
  the execption handling to handle_request().
  (TestingHTTPServerMixin.join_thread): Catch the hung threads
  instead of hanging ourself.
  (TestingThreadingHTTPServer.process_request_thread): Use the
  received 'started' event to synchronize with the spawner.
  (TestingThreadingHTTPServer.process_request): Add the thread to
  the client data and wait for it to really start.
  (TestingThreadingHTTPServer.shutdown_client): Join the client
  thread after shutting down the socket.
  (HttpServer._http_start): Add a 'started' event set when the
  server is ready to accept connections.
  (HttpServer.setUp): Use a threading.Event 'started' to ensure the
  server is ready to accept connections.
  
  * bzrlib/tests/__init__.py:
  (CommandFailed): Not used anymore and I'm not sure it was ever
  used.
-------------- next part --------------
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py	2009-10-08 17:19:38 +0000
+++ b/bzrlib/tests/__init__.py	2009-10-09 23:15:22 +0000
@@ -713,11 +713,6 @@
     The feature should be used to construct the exception.
     """
 
-
-class CommandFailed(Exception):
-    pass
-
-
 class StringIOWrapper(object):
     """A wrapper around cStringIO which just adds an encoding attribute.
 

=== modified file 'bzrlib/tests/http_server.py'
--- a/bzrlib/tests/http_server.py	2009-10-09 12:10:20 +0000
+++ b/bzrlib/tests/http_server.py	2009-10-09 23:15:22 +0000
@@ -324,8 +324,44 @@
         # the tests cases.
         self.test_case_server = test_case_server
         self._home_dir = test_case_server._home_dir
-        self.serving = False
+        self.serving = None
         self.is_shut_down = threading.Event()
+        # We collect the sockets/threads used by the clients so we can
+        # close/join them when shutting down
+        self.clients = []
+
+    def get_request (self):
+        """Get the request and client address from the socket.
+        """
+        sock, addr = self._get_request()
+        self.clients.append([sock, addr])
+        return sock, addr
+
+    def verify_request(self, request, client_address):
+        """Verify the request.
+
+        Return True if we should proceed with this request, False if we should
+        not even touch a single byte in the socket !
+        """
+        return self.serving is not None and self.serving.isSet()
+
+    def handle_request(self):
+        request, client_address = self.get_request()
+        try:
+            if self.verify_request(request, client_address):
+                self.process_request(request, client_address)
+        except:
+            if self.serving is not None and self.serving.isSet():
+                self.handle_error(request, client_address)
+            else:
+                # Exceptions raised while we shut down are just noise, but feel
+                # free to put a breakpoint here if you suspect something
+                # else. Such an example is the SSL handshake: it's automatic
+                # once we start processing the request but the last connection
+                # will close immediately and will not be able to correctly
+                # reply.
+                pass
+            self.close_request(request)
 
     def server_bind(self):
         # The following has been fixed in 2.5 so we need to provide it for
@@ -334,13 +370,14 @@
             self.server_address = self.socket.getsockname()
 
     def serve(self):
-        self.serving = True
+        self.serving  = threading.Event()
+        self.serving.set()
         self.is_shut_down.clear()
         # Ensure that we will not stay blocked in listen()
         self.socket.settimeout(1)
         if 'threads' in tests.selftest_debug_flags:
             print 'Starting %r' % (self.server_address,)
-        while self.serving:
+        while self.serving.isSet():
             try:
                 # Really a connection but the python framework is generic and
                 # call them requests
@@ -348,18 +385,12 @@
             except socket.timeout:
                 # So we can check if we're asked to stop
                 pass
-            except (socket.error, select.error), e:
-               if e[0] == errno.EBADF:
-                   # Starting with python-2.6, handle_request may raise socket
-                   # or select exceptions when the server is shut down as we
-                   # do.
-                   pass
-               else:
-                   raise
         if 'threads' in tests.selftest_debug_flags:
             print 'Closing  %r' % (self.server_address,)
         # Let's close the listening socket
         self.server_close()
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Closed   %r' % (self.server_address,)
         self.is_shut_down.set()
 
     def connect_socket(self):
@@ -377,18 +408,33 @@
                     sock.close()
         raise socket.error, msg
 
+    def join_thread(self, thread, timeout=2):
+        thread.join(timeout)
+        if thread.isAlive():
+            # The timeout expired without joining the thread, the thread is
+            # therefore stucked and that's a failure as far as the test is
+            # concerned. We used to hang here.
+            raise AssertionError('thread %s hung' % (thread.name,))
+
     def shutdown(self):
         """Stops the serve() loop.
 
         Blocks until the loop has finished. This must be called while serve()
         is running in another thread, or it will deadlock.
         """
-        if not self.serving:
+        if self.serving is None:
+            # If the server wasn't properly started, there is nothing to
+            # shutdown.
             return
-        self.serving = False
+        # As soon as we stop serving, no more connection are accepted except
+        # one to get out of the blocking listen.
+        self.serving.clear()
         # The server is listening for a last connection, let's give it:
         try:
             fake_conn = self.connect_socket()
+            # But close it immediately without trying to use it. The server
+            # will not process a single byte on that socket to avoid
+            # complications (SSL starts with a handshake for example).
             fake_conn.close()
         except socket.error, e:
             # But ignore connection errors as the point is to unblock the
@@ -397,10 +443,19 @@
             # leading to self.setUp() *not* being called but self.tearDown()
             # still being called)
             pass
+        # We don't have to wait for the server to shut down to start shutting
+        # the clients, so let's start now.
+        for c in self.clients:
+            self.shutdown_client(c)
+        self.clients = []
+        # Now we wait for the thread running serve() to finish
         self.is_shut_down.wait()
 
+    def shutdown_client(self, client):
+        sock, addr = client[:2]
+        self.shutdown_client_socket(sock)
 
-    def shutdown_client(self, client_socket):
+    def shutdown_client_socket(self, sock):
         """Properly shutdown a client socket.
 
         Under some circumstances (as in bug #383920), we need to force the
@@ -413,7 +468,7 @@
         try:
             # The request process has been completed, the thread is about to
             # die, let's shutdown the socket if we can.
-            client_socket.shutdown(socket.SHUT_RDWR)
+            sock.shutdown(socket.SHUT_RDWR)
         except (socket.error, select.error), e:
             if e[0] in (errno.EBADF, errno.ENOTCONN):
                 # Right, the socket is already down
@@ -430,6 +485,9 @@
         SocketServer.TCPServer.__init__(self, server_address,
                                         request_handler_class)
 
+    def _get_request (self):
+        return SocketServer.TCPServer.get_request(self)
+
     def server_bind(self):
         SocketServer.TCPServer.server_bind(self)
         TestingHTTPServerMixin.server_bind(self)
@@ -454,26 +512,41 @@
         # process. This is prophylactic as we should not leave the threads
         # lying around.
         self.daemon_threads = True
-        # We collect the sockets/threads used by the clients so we can
-        # close/join them when shutting down
-        self.clients = []
-
-    def process_request_thread(self, request, client_address):
-        self.clients.append((request, threading.currentThread()))
+
+    def _get_request (self):
+        return SocketServer.ThreadingTCPServer.get_request(self)
+
+    def process_request_thread(self, started, request, client_address):
+        started.set()
         SocketServer.ThreadingTCPServer.process_request_thread(
             self, request, client_address)
         # Shutdown the socket as soon as possible, the thread will be joined
         # later if needed during server shutdown thread.
-        self.shutdown_client(request)
-
-    def shutdown(self):
-        TestingHTTPServerMixin.shutdown(self)
-        # Let's close all our pending clients too
-        for sock, thread in self.clients:
-            self.shutdown_client(sock)
-            thread.join()
-            del thread
-        self.clients = []
+        self.shutdown_client_socket(request)
+
+    def process_request(self, request, client_address):
+        """Start a new thread to process the request."""
+        client = self.clients.pop()
+        started = threading.Event()
+        t = threading.Thread(target = self.process_request_thread,
+                             args = (started, request, client_address))
+        t.name = '%s -> %s' % (client_address, self.server_address)
+        client.append(t)
+        self.clients.append(client)
+        if self.daemon_threads:
+            t.setDaemon (1)
+        t.start()
+        started.wait()
+
+    def shutdown_client(self, client):
+        TestingHTTPServerMixin.shutdown_client(self, client)
+        if len(client) == 3:
+            # The thread has been created only if the request is processed but
+            # after the connection is inited. This could happne when the server
+            # is shut down.
+            sock, addr, thread = client
+            if thread.isAlive():
+                self.join_thread(thread)
 
     def server_bind(self):
         SocketServer.ThreadingTCPServer.server_bind(self)
@@ -546,7 +619,7 @@
             host, self.port = self._httpd.server_address
         return self._httpd
 
-    def _http_start(self):
+    def _http_start(self, started):
         """Server thread main entry point. """
         server = None
         try:
@@ -562,7 +635,7 @@
         finally:
             # Release the lock or the main thread will block and the whole
             # process will hang.
-            self._http_starting.release()
+            started.set()
 
         # From now on, exceptions are taken care of by the
         # SocketServer.BaseServer or the request handler.
@@ -604,40 +677,31 @@
         self._http_base_url = None
 
         # Create the server thread
-        self._http_starting = threading.Lock()
-        self._http_starting.acquire()
-        self._http_thread = threading.Thread(target=self._http_start)
+        started = threading.Event()
+        self._http_thread = threading.Thread(target=self._http_start,
+                                             args = (started,))
         self._http_thread.setDaemon(True)
         self._http_exception = None
         self._http_thread.start()
-        if 'threads' in tests.selftest_debug_flags:
-            print 'Thread started: %s' % (self._http_thread.ident,)
-
         # Wait for the server thread to start (i.e release the lock)
-        self._http_starting.acquire()
+        started.wait()
+        self._http_thread.name = self._http_base_url
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Thread started: %s' % (self._http_thread.name,)
+
 
         if self._http_exception is not None:
             # Something went wrong during server start
             exc_class, exc_value, exc_tb = self._http_exception
             raise exc_class, exc_value, exc_tb
-        self._http_starting.release()
         self.logs = []
 
     def tearDown(self):
         """See bzrlib.transport.Server.tearDown."""
         self._httpd.shutdown()
-        self._http_thread.join(5.0)
-        if self._http_thread.is_alive():
-            # The timeout expired without joining the thread, the server thread
-            # is therefore stucked and that's a failure as far as the test is
-            # concerned, we used to hang here, but that wasn't very productive.
-            raise AssertionError('http server at %r hanged'
-                                 % (self._httpd.server_address,))
-
+        self._httpd.join_thread(self._http_thread)
         if 'threads' in tests.selftest_debug_flags:
-            print 'Thread  joined: %s' % (self._http_thread.ident,)
-        del self._http_thread
-        self._http_thread = None
+            print 'Thread  joined: %s' % (self._http_thread.name,)
 
     def get_url(self):
         """See bzrlib.transport.Server.get_url."""

=== modified file 'bzrlib/tests/https_server.py'
--- a/bzrlib/tests/https_server.py	2009-03-23 14:59:43 +0000
+++ b/bzrlib/tests/https_server.py	2009-10-09 23:15:22 +0000
@@ -30,18 +30,24 @@
         self.key_file = key_file
         self.cert_file = cert_file
 
-    def get_request (self):
-        """Get the request and client address from the socket.
-
-        This is called in response to a connection issued to the server, we
-        wrap the socket with SSL.
+    def _get_ssl_request (self, sock, addr):
+        """Wrap the socket with SSL"""
+        ssl_sock = ssl.wrap_socket(sock, server_side=True,
+                                   keyfile=self.key_file,
+                                   certfile=self.cert_file,
+                                   do_handshake_on_connect=False)
+        return ssl_sock, addr
+
+    def verify_request(self, request, client_address):
+        """Verify the request.
+
+        Return True if we should proceed with this request, False if we should
+        not even touch a single byte in the socket !
         """
-        sock, addr = self.socket.accept()
-        sslconn = ssl.wrap_socket(sock, server_side=True,
-                                  keyfile=self.key_file,
-                                  certfile=self.cert_file)
-        return sslconn, addr
-
+        serving = self.serving is not None and self.serving.isSet()
+        if serving:
+            request.do_handshake()
+        return serving
 
 class TestingHTTPSServer(TestingHTTPSServerMixin,
                          http_server.TestingHTTPServer):
@@ -52,6 +58,10 @@
         http_server.TestingHTTPServer.__init__(
             self, server_address, request_handler_class, test_case_server)
 
+    def _get_request (self):
+        sock, addr = http_server.TestingHTTPServer._get_request(self)
+        return self._get_ssl_request(sock, addr)
+
 
 class TestingThreadingHTTPSServer(TestingHTTPSServerMixin,
                                   http_server.TestingThreadingHTTPServer):
@@ -62,6 +72,10 @@
         http_server.TestingThreadingHTTPServer.__init__(
             self, server_address, request_handler_class, test_case_server)
 
+    def _get_request (self):
+        sock, addr = http_server.TestingThreadingHTTPServer._get_request(self)
+        return self._get_ssl_request(sock, addr)
+
 
 class HTTPSServer(http_server.HttpServer):
 
@@ -73,7 +87,7 @@
                          }
 
     # Provides usable defaults since an https server requires both a
-    # private key and certificate to work.
+    # private key and a certificate to work.
     def __init__(self, request_handler=http_server.TestingHTTPRequestHandler,
                  protocol_version=None,
                  key_file=ssl_certs.build_path('server_without_pass.key'),



More information about the bazaar-commits mailing list