Rev 5265: Spawn a thread for each connection from a client. in file:///home/vila/src/bzr/experimental/leaking-tests/

Vincent Ladeuil v.ladeuil+lp at free.fr
Thu Jun 3 17:10:48 BST 2010


At file:///home/vila/src/bzr/experimental/leaking-tests/

------------------------------------------------------------
revno: 5265
revision-id: v.ladeuil+lp at free.fr-20100603161048-n0ir7wz6t7qtycg4
parent: v.ladeuil+lp at free.fr-20100602205753-gcsgu553qcxr41ct
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: propagate-exceptions
timestamp: Thu 2010-06-03 18:10:48 +0200
message:
  Spawn a thread for each connection from a client.
  
  * bzrlib/tests/test_test_server.py:
  (TestTestingServerInAThread.test_client_talks_server_respond):
  Check that the connection has been collected.
  
  * bzrlib/tests/test_server.py:
  (TestingTCPServerMixin.get_request)
  (TestingThreadingTCPServer.get_request): Collect the resources
  used for a client connection.
  (TestingThreadingTCPServer.process_request): Spawn a thread for
  each connection.
-------------- next part --------------
=== modified file 'bzrlib/tests/test_server.py'
--- a/bzrlib/tests/test_server.py	2010-06-02 20:57:53 +0000
+++ b/bzrlib/tests/test_server.py	2010-06-03 16:10:48 +0000
@@ -296,6 +296,9 @@
         self.started = threading.Event()
         self.serving = threading.Event()
         self.stopped = threading.Event()
+        # We collect the resources used by the clients so we can release them
+        # when shutting down
+        self.clients = []
 
     def server_bind(self):
         # We need to override the SocketServer bind, yet, we still want to use
@@ -323,27 +326,35 @@
         """Verify the request.
 
         Return True if we should proceed with this request, False if we should
-        not even touch a single byte in the socket ! This is used to stop the
-        server with a dummy last connection.
+        not even touch a single byte in the socket ! This is useful when we
+        stop the server with a dummy last connection.
         """
         return self.serving.isSet()
 
-    def stop_clients(self):
-        pass
-
-
-class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
-
-    def __init__(self, server_address, request_handler_class):
-        TestingTCPServerMixin.__init__(self, SocketServer.TCPServer)
-        SocketServer.TCPServer.__init__(self, server_address,
-                                        request_handler_class)
-
     def handle_error(self, request, client_address):
         # Stop serving and re-raise the last exception seen
         self.serving.clear()
         raise
 
+    def stop_clients(self):
+        pass
+
+
+class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
+
+    def __init__(self, server_address, request_handler_class):
+        TestingTCPServerMixin.__init__(self, SocketServer.TCPServer)
+        SocketServer.TCPServer.__init__(self, server_address,
+                                        request_handler_class)
+
+    def get_request(self):
+        """Get the request and client address from the socket."""
+        sock, addr = self.sibling_class.get_request(self)
+        self.clients.append((sock, addr))
+        return sock, addr
+
+    # The following methods are called by the main thread
+
 
 class TestingThreadingTCPServer(TestingTCPServerMixin,
                                 SocketServer.ThreadingTCPServer):
@@ -353,6 +364,34 @@
         SocketServer.TCPServer.__init__(self, server_address,
                                         request_handler_class)
 
+    def get_request (self):
+        """Get the request and client address from the socket."""
+        sock, addr = self.sibling_class.get_request(self)
+        # The thread is not create yet, it will be updated in process_request
+        self.clients.append((sock, addr, None))
+        return sock, addr
+
+    def process_request_thread(self, stopped, request, client_address):
+        SocketServer.ThreadingTCPServer.process_request_thread(
+            self, request, client_address)
+        self.close_request(request)
+        stopped.set()
+
+    def process_request(self, request, client_address):
+        """Start a new thread to process the request."""
+        stopped = threading.Event()
+        t = test_server.ThreadWithException(
+            event=stopped,
+            target = self.process_request_thread,
+            args = (stopped, request, client_address))
+        t.name = '%s -> %s' % (client_address, self.server_address)
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Thread for: %s started' % (threading.currentThread().name,)
+        # Update the client description
+        self.clients.pop()
+        self.clients.append((request, client_address, t))
+        t.start()
+
 
 class TestingTCPServerInAThread(object):
     """A server in a thread that re-raise thread exceptions."""
@@ -408,6 +447,8 @@
             # server thread, it may happen that it's not blocked or even not
             # started.
             pass
+        # We don't have to wait for the server to shut down to start shutting
+        # down the clients, so let's start now.
         self.server.stop_clients()
         # Now we wait for the thread running self.server.serve() to finish
         self.server.stopped.wait()
@@ -424,6 +465,7 @@
             # Make sure we can be called twice safely
             self.server = None
 
+
 class TestingThreadingTCPServerInAThread(TestingTCPServerInAThread):
     """A socket server in a thread which spawn one thread for each connection"""
 

=== modified file 'bzrlib/tests/test_test_server.py'
--- a/bzrlib/tests/test_test_server.py	2010-06-02 20:57:53 +0000
+++ b/bzrlib/tests/test_test_server.py	2010-06-03 16:10:48 +0000
@@ -92,6 +92,13 @@
         self.addCleanup(client.disconnect)
         return client
 
+    def get_server_connection(self, server, conn_rank):
+        return server.server.clients[conn_rank]
+
+    def assertClientAddr(self, client, server, conn_rank):
+        conn = self.get_server_connection(server, conn_rank)
+        self.assertEquals(client.sock.getsockname(), conn[1])
+
     def test_start_stop(self):
         server = self.get_server()
         client = self.get_client()
@@ -108,6 +115,7 @@
         client.connect(server.server_address)
         self.assertIs(None, client.write('ping\n'))
         resp = client.read()
+        self.assertClientAddr(client, server, 0)
         self.assertEquals('pong\n', resp)
 
     def test_server_fails_to_start(self):



More information about the bazaar-commits mailing list