Rev 6186: Refactor the code a bit. Now we can ensure that the active loop in http://bazaar.launchpad.net/~jameinel/bzr/2.5-soft-hangup-795025

John Arbash Meinel john at arbash-meinel.com
Fri Sep 23 15:25:56 UTC 2011


At http://bazaar.launchpad.net/~jameinel/bzr/2.5-soft-hangup-795025

------------------------------------------------------------
revno: 6186
revision-id: john at arbash-meinel.com-20110923152547-osz802eyejhnl4v3
parent: john at arbash-meinel.com-20110923142749-jd2ketuz1u4vihfx
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.5-soft-hangup-795025
timestamp: Fri 2011-09-23 17:25:47 +0200
message:
  Refactor the code a bit. Now we can ensure that the active loop
  also reaps dead threads.
-------------- next part --------------
=== modified file 'bzrlib/smart/server.py'
--- a/bzrlib/smart/server.py	2011-09-23 14:27:49 +0000
+++ b/bzrlib/smart/server.py	2011-09-23 15:25:47 +0000
@@ -192,7 +192,7 @@
                             break
                         self.serve_conn(conn, thread_name_suffix)
                     # Cleanout any threads that have finished processing.
-                    # self._poll_active_connections()
+                    self._poll_active_connections()
             except KeyboardInterrupt:
                 # dont log when CTRL-C'd.
                 raise

=== modified file 'bzrlib/tests/test_smart_transport.py'
--- a/bzrlib/tests/test_smart_transport.py	2011-09-23 14:27:49 +0000
+++ b/bzrlib/tests/test_smart_transport.py	2011-09-23 15:25:47 +0000
@@ -1075,6 +1075,64 @@
 
 class TestSmartTCPServer(tests.TestCase):
 
+    def make_server(self):
+        """Create a SmartTCPServer that we can exercise.
+
+        Note: we don't use SmartTCPServer_for_testing because the testing
+        version overrides lots of functionality like 'serve', and we want to
+        test the raw service.
+
+        This will start the server in another thread, and wait for it to
+        indicate it has finished starting up.
+
+        :return: (server, server_thread)
+        """
+        t = _mod_transport.get_transport_from_url('memory:///')
+        server = _mod_server.SmartTCPServer(t, client_timeout=4.0)
+        server._ACCEPT_TIMEOUT = 0.1
+        # We don't use 'localhost' because that might be an IPv6 address.
+        server.start_server('127.0.0.1', 0)
+        server_thread = threading.Thread(target=server.serve,
+                                         args=(self.id(),))
+        server_thread.start()
+        # Ensure this gets called at some point
+        self.addCleanup(server._stop_gracefully)
+        server._started.wait()
+        return server, server_thread
+
+    def ensure_client_disconnected(self, client_sock):
+        """Ensure that a socket is closed, discarding all errors."""
+        try:
+            client_sock.close()
+        except Exception:
+            pass
+
+    def connect_to_server(self, server):
+        """Create a client socket that can talk to the server."""
+        client_sock = socket.socket()
+        server_info = server._server_socket.getsockname()
+        client_sock.connect(server_info)
+        self.addCleanup(self.ensure_client_disconnected, client_sock)
+        return client_sock
+
+    def connect_to_server_and_hangup(self, server):
+        """Connect to the server, and then hang up.
+        That way it doesn't sit waiting for 'accept()' to timeout.
+        """
+        client_sock = self.connect_to_server(server)
+        client_sock.close()
+
+    def say_hello(self, client_sock):
+        """Send the 'hello' smart RPC, and expect the response."""
+        client_sock.send('hello\n')
+        self.assertEqual('ok\x012\n', client_sock.recv(5))
+
+    def shutdown_server_cleanly(self, server, server_thread):
+        server._stop_gracefully()
+        self.connect_to_server_and_hangup(server)
+        server._stopped.wait()
+        server_thread.join()
+
     def test_get_error_unexpected(self):
         """Error reported by server with no specific representation"""
         self.overrideEnv('BZR_NO_SMART_VFS', None)
@@ -1120,46 +1178,47 @@
         self.assertEqual(0, len(server._active_connections))
 
     def test_serve_closes_out_finished_connections(self):
-        t = _mod_transport.get_transport_from_url('memory:///')
-        server = _mod_server.SmartTCPServer(t, client_timeout=1.0)
-        server._ACCEPT_TIMEOUT = 0.1
-        # We don't use 'localhost' because that might be an IPv6 address.
-        server.start_server('127.0.0.1', 0)
-        server_thread = threading.Thread(target=server.serve,
-                                         args=(self.id(),))
-        server_thread.start()
-        client_sock = None
-        try:
-            # Wait for the server to start. Then connect to the port
-            client_sock = socket.socket()
-            server._started.wait()
-            client_sock.connect(('127.0.0.1', server.port))
-            # We send and receive on the connection, so that we know the
-            # server-side has seen the connect, and started handling the
-            # results.
-            client_sock.send('hello\n')
-            self.assertEqual('ok\x012\n', client_sock.recv(5))
-            self.assertEqual(1, len(server._active_connections))
-            # Grab a handle to the thread that is processing our request
-            server_side_thread = server._active_connections[0][2]
-        finally:
-            # Close the connection, ask the server to stop, and wait for the
-            # server to stop, as well as the thread that was servicing the
-            # client request.
-            if client_sock is not None:
-                client_sock.close()
-            # Wait for the server-side request thread to notice we are closed.
-            server_side_thread.join()
-            # Stop the server, it should notice the connection has finished.
-            server._stop_gracefully()
-            # Do a throw-away connection so that the server stops immediately
-            s = socket.socket()
-            s.connect(('127.0.0.1', server.port))
-            s.close()
-            server._stopped.wait()
-            server_thread.join()
+        server, server_thread = self.make_server()
+        # The server is started, connect to it.
+        client_sock = self.connect_to_server(server)
+        # We send and receive on the connection, so that we know the
+        # server-side has seen the connect, and started handling the
+        # results.
+        self.say_hello(client_sock)
+        self.assertEqual(1, len(server._active_connections))
+        # Grab a handle to the thread that is processing our request
+        _, _, server_side_thread = server._active_connections[0]
+        # Close the connection, ask the server to stop, and wait for the
+        # server to stop, as well as the thread that was servicing the
+        # client request.
+        client_sock.close()
+        # Wait for the server-side request thread to notice we are closed.
+        server_side_thread.join()
+        # Stop the server, it should notice the connection has finished.
+        self.shutdown_server_cleanly(server, server_thread)
+        # The server should have noticed that all clients are gone before
+        # exiting.
         self.assertEqual(0, len(server._active_connections))
 
+    def test_serve_reaps_finished_connections(self):
+        server, server_thread = self.make_server()
+        client_sock1 = self.connect_to_server(server)
+        # We send and receive on the connection, so that we know the
+        # server-side has seen the connect, and started handling the
+        # results.
+        self.say_hello(client_sock1)
+        self.assertEqual(1, len(server._active_connections))
+        _, _, server_side_thread1 = server._active_connections[0]
+        client_sock1.close()
+        server_side_thread1.join()
+        client_sock2 = self.connect_to_server(server)
+        self.say_hello(client_sock2)
+        self.assertEqual(1, len(server._active_connections))
+        _, _, server_side_thread2 = server._active_connections[0]
+        client_sock2.close()
+        server_side_thread2.join()
+        self.shutdown_server_cleanly(server, server_thread)
+
 
 class SmartTCPTests(tests.TestCase):
     """Tests for connection/end to end behaviour using the TCP server.



More information about the bazaar-commits mailing list