Rev 2384: Overhaul the SmartTCPServer connect-thread logic to synchronise on startup and shutdown and notify the server if it is in accept. in file:///home/robertc/source/baz/hpss-hooks/

Robert Collins robertc at robertcollins.net
Thu Apr 5 01:39:06 BST 2007


At file:///home/robertc/source/baz/hpss-hooks/

------------------------------------------------------------
revno: 2384
revision-id: robertc at robertcollins.net-20070405003903-u1ys8t2lo5gs6b35
parent: robertc at robertcollins.net-20070404085028-abgpe4zylhsdoaqd
committer: Robert Collins <robertc at robertcollins.net>
branch nick: hpss-hooks
timestamp: Thu 2007-04-05 10:39:03 +1000
message:
  Overhaul the SmartTCPServer connect-thread logic to synchronise on startup and shutdown and notify the server if it is in accept.
modified:
  bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
  bzrlib/transport/smart.py      ssh.py-20060608202016-c25gvf1ob7ypbus6-1
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- a/bzrlib/tests/test_smart_transport.py	2007-04-04 05:19:38 +0000
+++ b/bzrlib/tests/test_smart_transport.py	2007-04-05 00:39:03 +0000
@@ -1,4 +1,4 @@
-# Copyright (C) 2006 Canonical Ltd
+# Copyright (C) 2006, 2007 Canonical Ltd
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -822,30 +822,20 @@
 
 class TestServerSocketUsage(SmartTCPTests):
 
-    def test_server_closes_listening_sock_on_shutdown(self):
+    def test_server_setup_teardown(self):
+        """It should be safe to teardown the server with no requests."""
+        self.setUpServer()
+        server = self.server
+        transport = smart.SmartTCPTransport(self.server.get_url())
+        self.tearDownServer()
+        self.assertRaises(errors.ConnectionError, transport.has, '.')
+
+    def test_server_closes_listening_sock_on_shutdown_after_request(self):
         """The server should close its listening socket when it's stopped."""
         self.setUpServer()
-        # clean up the server and initial transport (which wont have connected):
-        # force a connection, which uses the listening socket to synchronise
-        # with the server thread, so that when we shut it down it has already
-        # executed the 'self._should_terminate = False' line in the server
-        # method.
         server = self.server
         self.transport.has('.')
         self.tearDownServer()
-        # make a new connection to break out the inner loop in the server.
-        transport = smart.SmartTCPTransport(server.get_url())
-        # force the connection
-        transport.has('.')
-        # and close it.
-        transport.disconnect()
-        # this del probably is not needed, but I wanted to be clear about what
-        # we are testing: having objects hanging around is not part of the test.
-        del transport
-        while server._server_thread.isAlive():
-            # this is fugly: we should have an event for the server we can
-            # wait for.
-            import time; time.sleep(0.001)
         # if the listening socket has closed, we should get a BADFD error
         # when connecting, rather than a hang.
         transport = smart.SmartTCPTransport(server.get_url())
@@ -963,24 +953,15 @@
             self.capture_server_call)
         self.setUpServer()
         result = [(self.backing_transport.base, self.transport.base)]
-        # check the stopping message isn't emitted up front, this also
-        # has the effect of synchronising with the server, so that
-        # when we shut it down it has already executed the 
-        # 'self._should_terminate = False' line in the server method.
+        # check the stopping message isn't emitted up front.
+        self.assertEqual([], self.hook_calls)
+        # nor after a single message
         self.transport.has('.')
         self.assertEqual([], self.hook_calls)
         # clean up the server
         server = self.server
         self.tearDownServer()
-        # make a new connection to break out the inner loop in the server.
-        transport = smart.SmartTCPTransport(result[0][1])
-        transport.has('.')
-        transport.disconnect()
-        del transport
-        while server._server_thread.isAlive():
-            # this is fugly: we should have an event for the server we can
-            # wait for.
-            import time; time.sleep(0.001)
+        # now it should have fired.
         self.assertEqual(result, self.hook_calls)
 
 # TODO: test that when the server suffers an exception that it calls the

=== modified file 'bzrlib/transport/smart.py'
--- a/bzrlib/transport/smart.py	2007-04-04 08:50:28 +0000
+++ b/bzrlib/transport/smart.py	2007-04-05 00:39:03 +0000
@@ -195,6 +195,7 @@
 #
 
 from cStringIO import StringIO
+import errno
 import os
 import socket
 import sys
@@ -835,6 +836,8 @@
         :param host: Name of the interface to listen on.
         :param port: TCP port to listen on, or 0 to allocate a transient port.
         """
+        from socket import error as socket_error
+        self._socket_error = socket_error
         self._server_socket = socket.socket()
         self._server_socket.bind((host, port))
         self._sockname = self._server_socket.getsockname()
@@ -842,27 +845,34 @@
         self._server_socket.listen(1)
         self._server_socket.settimeout(1)
         self.backing_transport = backing_transport
+        self._started = threading.Event()
+        self._stopped = threading.Event()
 
     def serve(self):
         # let connections timeout so that we get a chance to terminate
         # Keep a reference to the exceptions we want to catch because the socket
         # module's globals get set to None during interpreter shutdown.
         from socket import timeout as socket_timeout
-        from socket import error as socket_error
         self._should_terminate = False
         for hook in SmartTCPServer.hooks['server_started']:
             hook(self.backing_transport.base, self.get_url())
+        self._started.set()
         try:
             try:
                 while not self._should_terminate:
                     try:
-                        self.accept_and_serve()
+                        conn, client_addr = self._server_socket.accept()
                     except socket_timeout:
                         # just check if we're asked to stop
                         pass
-                    except socket_error, e:
-                        trace.warning("client disconnected: %s", e)
-                        pass
+                    except self._socket_error, e:
+                        # if the socket is closed by stop_background_thread
+                        # we might get a EBADF here, any other socket errors
+                        # should get logged.
+                        if e.args[0] != errno.EBADF:
+                            trace.warning("listening socket error: %s", e)
+                    else:
+                        self.serve_conn(conn)
             except KeyboardInterrupt:
                 # dont log when CTRL-C'd.
                 raise
@@ -871,9 +881,11 @@
                 trace.log_exception_quietly()
                 raise
         finally:
+            self._stopped.set()
             try:
+                # ensure the server socket is closed.
                 self._server_socket.close()
-            except socket_error:
+            except self._socket_error:
                 # ignore errors on close
                 pass
             for hook in SmartTCPServer.hooks['server_stopped']:
@@ -883,8 +895,7 @@
         """Return the url of the server"""
         return "bzr://%s:%d/" % self._sockname
 
-    def accept_and_serve(self):
-        conn, client_addr = self._server_socket.accept()
+    def serve_conn(self, conn):
         # For WIN32, where the timeout value from the listening socket
         # propogates to the newly accepted socket.
         conn.setblocking(True)
@@ -895,20 +906,38 @@
         connection_thread.start()
 
     def start_background_thread(self):
+        self._started.clear()
         self._server_thread = threading.Thread(None,
                 self.serve,
                 name='server-' + self.get_url())
         self._server_thread.setDaemon(True)
         self._server_thread.start()
+        self._started.wait()
 
     def stop_background_thread(self):
+        self._stopped.clear()
+        # tell the main loop to quit on the next iteration.
         self._should_terminate = True
-        # At one point we would wait to join the threads here, but it looks
-        # like they don't actually exit.  So now we just leave them running
-        # and expect to terminate the process. -- mbp 20070215
-        # self._server_socket.close()
-        ## sys.stderr.write("waiting for server thread to finish...")
-        ## self._server_thread.join()
+        # close the socket - gives error to connections from here on in,
+        # rather than a connection reset error to connections made during
+        # the period between setting _should_terminate = True and 
+        # the current request completing/aborting. It may also break out the
+        # main loop if it was currently in accept() (on some platforms).
+        try:
+            self._server_socket.close()
+        except self._socket_error:
+            # ignore errors on close
+            pass
+        if not self._stopped.isSet():
+            # server has not stopped (though it may be stopping)
+            # its likely in accept(), so give it a connection
+            temp_socket = socket.socket()
+            temp_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+            if not temp_socket.connect_ex(self._sockname):
+                # and close it immediately: we dont choose to send any requests.
+                temp_socket.close()
+        self._stopped.wait()
+        self._server_thread.join()
 
 
 class SmartServerHooks(Hooks):



More information about the bazaar-commits mailing list