Rev 4739: Collect and shutdown clients for SmartTCPServer_for_testing. in file:///home/vila/src/bzr/bugs/392127-thread-leak/

Vincent Ladeuil v.ladeuil+lp at free.fr
Thu Oct 8 15:34:02 BST 2009


At file:///home/vila/src/bzr/bugs/392127-thread-leak/

------------------------------------------------------------
revno: 4739
revision-id: v.ladeuil+lp at free.fr-20091008143402-b4uivhhkykn95u8v
parent: v.ladeuil+lp at free.fr-20091008090300-f1uy74ok3krwd7q0
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: 392127-thread-leak
timestamp: Thu 2009-10-08 16:34:02 +0200
message:
  Collect and shutdown clients for SmartTCPServer_for_testing.
  
  * bzrlib/tests/http_server.py:
  (TestingHTTPServerMixin.serve): Try to avoid hanging in the server
  thread, we already catch the socket.timeout exception, we may as
  well use it.
  (TestingThreadingHTTPServer.process_request_thread): Get rid of
  the spurious pdb.set_trace().
  
  * bzrlib/tests/__init__.py:
  (ExtendedTestResult.stopTestRun): Display how many threads are
  left at the end.
  (TestCase._check_leaked_threads): 
  
  * bzrlib/smart/server.py:
  (SmartTCPServer.serve_conn): Return the created thread so daughter
  classes can play with it.
  (SmartTCPServer_for_testing.serve_conn): Collect the client
  sockets/threads.
  (SmartTCPServer_for_testing.shutdown_client): shutdown a client
  socket.
  (SmartTCPServer_for_testing.tearDown): shutdown client
  sockets/threads.
-------------- next part --------------
=== modified file 'bzrlib/smart/server.py'
--- a/bzrlib/smart/server.py	2009-09-23 06:54:14 +0000
+++ b/bzrlib/smart/server.py	2009-10-08 14:34:02 +0000
@@ -172,6 +172,7 @@
             None, handler.serve, name=thread_name)
         connection_thread.setDaemon(True)
         connection_thread.start()
+        return connection_thread
 
     def start_background_thread(self, thread_name_suffix=''):
         self._started.clear()
@@ -246,6 +247,9 @@
         SmartTCPServer.__init__(self, None)
         self.client_path_extra = None
         self.thread_name_suffix = thread_name_suffix
+        # We collect the sockets/threads used by the clients so we can
+        # close/join them when shutting down
+        self.clients = []
 
     def get_backing_transport(self, backing_transport_server):
         """Get a backing transport from a server we are decorating."""
@@ -279,8 +283,41 @@
         self.root_client_path = self.client_path_extra = client_path_extra
         self.start_background_thread(self.thread_name_suffix)
 
+    def serve_conn(self, conn, thread_name_suffix):
+        conn_thread = super(SmartTCPServer_for_testing, self).serve_conn(
+            conn, thread_name_suffix)
+        self.clients.append((conn, conn_thread))
+        return conn_thread
+
+    def shutdown_client(self, client_socket):
+        """Properly shutdown a client socket.
+
+        Under some circumstances (as in bug #383920), we need to force the
+        shutdown as python delays it until gc occur otherwise and the client
+        may hang.
+
+        This should be called only when no other thread is trying to use the
+        socket.
+        """
+        try:
+            # The request process has been completed, the thread is about to
+            # die, let's shutdown the socket if we can.
+            client_socket.shutdown(socket.SHUT_RDWR)
+        except (socket.error, select.error), e:
+            if e[0] in (errno.EBADF, errno.ENOTCONN):
+                # Right, the socket is already down
+                pass
+            else:
+                raise
+
     def tearDown(self):
         self.stop_background_thread()
+        # Let's close all our pending clients too
+        for sock, thread in self.clients:
+            self.shutdown_client(sock)
+            thread.join()
+            del thread
+        self.clients = []
         self.chroot_server.tearDown()
 
     def get_url(self):

=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py	2009-10-01 14:44:43 +0000
+++ b/bzrlib/tests/__init__.py	2009-10-08 14:34:02 +0000
@@ -222,6 +222,9 @@
                 '%s is leaking threads among %d leaking tests.\n' % (
                 TestCase._first_thread_leaker_id,
                 TestCase._leaking_threads_tests))
+            # We don't report the main thread as an active one.
+            self.stream.write('%d threads were left active in the end.\n'
+                              % (TestCase._active_threads - 1))
 
     def _extractBenchmarkTime(self, testCase):
         """Add a benchmark time for the current test case."""
@@ -846,7 +849,14 @@
         active = threading.activeCount()
         leaked_threads = active - TestCase._active_threads
         TestCase._active_threads = active
-        if leaked_threads:
+        # If some tests make the number of threads *decrease*, we'll consider
+        # that they are just observing old threads dieing, not agressively kill
+        # random threads. So we don't report these tests as leaking. The risk
+        # is that we have false positives that way (the test see 2 threads
+        # going away but leak one) but it seems less likely than the actual
+        # false positives (the test see threads going away and does not leak).
+        if leaked_threads > 0:
+            print '%s is leaking, active is now %d' % (self.id(), active)
             TestCase._leaking_threads_tests += 1
             if TestCase._first_thread_leaker_id is None:
                 TestCase._first_thread_leaker_id = self.id()

=== modified file 'bzrlib/tests/http_server.py'
--- a/bzrlib/tests/http_server.py	2009-10-08 09:03:00 +0000
+++ b/bzrlib/tests/http_server.py	2009-10-08 14:34:02 +0000
@@ -332,12 +332,14 @@
     def serve(self):
         self.serving = True
         self.is_shut_down.clear()
+        self.socket.settimeout(1)
         while self.serving:
             try:
                 # Really a connection but the python framework is generic and
                 # call them requests
                 self.handle_request()
             except socket.timeout:
+                # So we can check if we're asked to stop
                 pass
             except (socket.error, select.error), e:
                if e[0] == errno.EBADF:
@@ -443,27 +445,26 @@
         # process. This is prophylactic as we should not leave the threads
         # lying around.
         self.daemon_threads = True
-        # We collect the sockets used by the clients to we can close them when
-        # shutting down
+        # We collect the sockets/threads used by the clients so we can
+        # close/join them when shutting down
         self.clients = []
 
     def process_request_thread(self, request, client_address):
         self.clients.append((request, threading.currentThread()))
-        try:
-            SocketServer.ThreadingTCPServer.process_request_thread(
-                self, request, client_address)
-        except Exception, e:
-            import pdb; pdb.set_trace()
-
+        SocketServer.ThreadingTCPServer.process_request_thread(
+            self, request, client_address)
+        # Shutdown the socket as soon as possible, the thread will be joined
+        # later if needed during server shutdown thread.
         self.shutdown_client(request)
 
     def shutdown(self):
         TestingHTTPServerMixin.shutdown(self)
         # Let's close all our pending clients too
-        for c, t in self.clients:
-            self.shutdown_client(c)
-            t.join()
-            del t
+        for sock, thread in self.clients:
+            self.shutdown_client(sock)
+            thread.join()
+            del thread
+        self.clients = []
 
     def server_bind(self):
         SocketServer.ThreadingTCPServer.server_bind(self)



More information about the bazaar-commits mailing list