Rev 6187: Teach the SmartTCPServer to wait gracefully for the client threads to exit. in http://bazaar.launchpad.net/~jameinel/bzr/2.5-soft-hangup-795025

John Arbash Meinel john at arbash-meinel.com
Fri Sep 23 16:00:31 UTC 2011


At http://bazaar.launchpad.net/~jameinel/bzr/2.5-soft-hangup-795025

------------------------------------------------------------
revno: 6187
revision-id: john at arbash-meinel.com-20110923160023-lr49ty2wn70nqu91
parent: john at arbash-meinel.com-20110923152547-osz802eyejhnl4v3
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.5-soft-hangup-795025
timestamp: Fri 2011-09-23 18:00:23 +0200
message:
  Teach the SmartTCPServer to wait gracefully for the client threads to exit.
-------------- next part --------------
=== modified file 'bzrlib/smart/server.py'
--- a/bzrlib/smart/server.py	2011-09-23 15:25:47 +0000
+++ b/bzrlib/smart/server.py	2011-09-23 16:00:23 +0000
@@ -20,6 +20,7 @@
 import os.path
 import socket
 import sys
+import time
 import threading
 
 from bzrlib.hooks import Hooks
@@ -58,6 +59,10 @@
     # so the test suite can set it faster. (It thread.interrupt_main() will not
     # fire a KeyboardInterrupt during socket.accept)
     _ACCEPT_TIMEOUT = 1.0
+    _SHUTDOWN_POLL_TIMEOUT = 1.0
+    _LOG_WAITING_TIMEOUT = 10.0
+
+    _timer = time.time
 
     def __init__(self, backing_transport, root_client_path='/',
                  client_timeout=None):
@@ -76,6 +81,9 @@
         self.root_client_path = root_client_path
         self._client_timeout = client_timeout
         self._active_connections = []
+        # This is set to indicate we want to wait for clients to finish before
+        # we disconnect.
+        self._gracefully_stopping = False
 
     def start_server(self, host, port):
         """Create the server listening socket.
@@ -108,8 +116,14 @@
         self.port = self._sockname[1]
         self._server_socket.listen(1)
         self._server_socket.settimeout(self._ACCEPT_TIMEOUT)
+        # Once we start accept()ing connections, we set started.
         self._started = threading.Event()
+        # Once we stop accept()ing connections (and are closing the socket) we
+        # set _stopped
         self._stopped = threading.Event()
+        # Once we have finished waiting for all clients, etc. We set
+        # _fully_stopped
+        self._fully_stopped = threading.Event()
 
     def _backing_urls(self):
         # There are three interesting urls:
@@ -157,6 +171,22 @@
         #      have a good way (yet) to poll the spawned clients and
         trace.note('Requested to stop gracefully')
         self._should_terminate = True
+        self._gracefully_stopping = True
+
+    def _wait_for_clients_to_disconnect(self):
+        self._poll_active_connections()
+        if not self._active_connections:
+            return
+        trace.note('Waiting for %d client(s) to finish'
+                   % (len(self._active_connections),))
+        t_next_log = self._timer() + self._LOG_WAITING_TIMEOUT
+        while self._active_connections:
+            now = self._timer()
+            if now >= t_next_log:
+                trace.note('Still waiting for %d client(s) to finish'
+                           % (len(self._active_connections),))
+                t_next_log = now + self._LOG_WAITING_TIMEOUT
+            self._poll_active_connections(self._SHUTDOWN_POLL_TIMEOUT)
 
     def serve(self, thread_name_suffix=''):
         # Note: There is a temptation to do
@@ -208,8 +238,10 @@
             except self._socket_error:
                 # ignore errors on close
                 pass
-            self._poll_active_connections()
             self.run_server_stopped_hooks()
+        if self._gracefully_stopping:
+            self._wait_for_clients_to_disconnect()
+        self._fully_stopped.set()
 
     def get_url(self):
         """Return the url of the server"""

=== modified file 'bzrlib/tests/test_smart_transport.py'
--- a/bzrlib/tests/test_smart_transport.py	2011-09-23 15:25:47 +0000
+++ b/bzrlib/tests/test_smart_transport.py	2011-09-23 16:00:23 +0000
@@ -1211,6 +1211,8 @@
         _, _, server_side_thread1 = server._active_connections[0]
         client_sock1.close()
         server_side_thread1.join()
+        # By waiting until the first connection is fully done, the server
+        # should notice after another connection that the first has finished.
         client_sock2 = self.connect_to_server(server)
         self.say_hello(client_sock2)
         self.assertEqual(1, len(server._active_connections))
@@ -1219,6 +1221,29 @@
         server_side_thread2.join()
         self.shutdown_server_cleanly(server, server_thread)
 
+    def test_graceful_shutdown_waits_for_clients_to_stop(self):
+        server, server_thread = self.make_server()
+        client_sock = self.connect_to_server(server)
+        self.say_hello(client_sock)
+        _, _, server_side_thread = server._active_connections[0]
+        # Ask the server to stop gracefully, and wait for it.
+        server._stop_gracefully()
+        self.connect_to_server_and_hangup(server)
+        server._stopped.wait()
+        # It should not be accepting another connection.
+        self.assertRaises(socket.error, self.connect_to_server, server)
+        # It should also not be fully stopped
+        server._fully_stopped.wait(0.01)
+        self.assertFalse(server._fully_stopped.isSet())
+        client_sock.close()
+        server_side_thread.join()
+        server_thread.join()
+        self.assertTrue(server._fully_stopped.isSet())
+        log = self.get_log()
+        self.assertEqual('    INFO  Requested to stop gracefully\n'
+                         '    INFO  Waiting for 1 client(s) to finish\n',
+                         log)
+
 
 class SmartTCPTests(tests.TestCase):
     """Tests for connection/end to end behaviour using the TCP server.



More information about the bazaar-commits mailing list