Rev 5265: Spawn a thread for each connection from a client. in file:///home/vila/src/bzr/experimental/leaking-tests/
Vincent Ladeuil
v.ladeuil+lp at free.fr
Thu Jun 3 17:10:48 BST 2010
At file:///home/vila/src/bzr/experimental/leaking-tests/
------------------------------------------------------------
revno: 5265
revision-id: v.ladeuil+lp at free.fr-20100603161048-n0ir7wz6t7qtycg4
parent: v.ladeuil+lp at free.fr-20100602205753-gcsgu553qcxr41ct
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: propagate-exceptions
timestamp: Thu 2010-06-03 18:10:48 +0200
message:
Spawn a thread for each connection from a client.
* bzrlib/tests/test_test_server.py:
(TestTestingServerInAThread.test_client_talks_server_respond):
Check that the connection has been collected.
* bzrlib/tests/test_server.py:
(TestingTCPServerMixin.get_request)
(TestingThreadingTCPServer.get_request): Collect the resources
used for a client connection.
(TestingThreadingTCPServer.process_request): Spawn a thread for
each connection.
-------------- next part --------------
=== modified file 'bzrlib/tests/test_server.py'
--- a/bzrlib/tests/test_server.py 2010-06-02 20:57:53 +0000
+++ b/bzrlib/tests/test_server.py 2010-06-03 16:10:48 +0000
@@ -296,6 +296,9 @@
self.started = threading.Event()
self.serving = threading.Event()
self.stopped = threading.Event()
+ # We collect the resources used by the clients so we can release them
+ # when shutting down
+ self.clients = []
def server_bind(self):
# We need to override the SocketServer bind, yet, we still want to use
@@ -323,27 +326,35 @@
"""Verify the request.
Return True if we should proceed with this request, False if we should
- not even touch a single byte in the socket ! This is used to stop the
- server with a dummy last connection.
+ not even touch a single byte in the socket ! This is useful when we
+ stop the server with a dummy last connection.
"""
return self.serving.isSet()
- def stop_clients(self):
- pass
-
-
-class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
-
- def __init__(self, server_address, request_handler_class):
- TestingTCPServerMixin.__init__(self, SocketServer.TCPServer)
- SocketServer.TCPServer.__init__(self, server_address,
- request_handler_class)
-
def handle_error(self, request, client_address):
# Stop serving and re-raise the last exception seen
self.serving.clear()
raise
+ def stop_clients(self):
+ pass
+
+
+class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
+
+ def __init__(self, server_address, request_handler_class):
+ TestingTCPServerMixin.__init__(self, SocketServer.TCPServer)
+ SocketServer.TCPServer.__init__(self, server_address,
+ request_handler_class)
+
+ def get_request(self):
+ """Get the request and client address from the socket."""
+ sock, addr = self.sibling_class.get_request(self)
+ self.clients.append((sock, addr))
+ return sock, addr
+
+ # The following methods are called by the main thread
+
class TestingThreadingTCPServer(TestingTCPServerMixin,
SocketServer.ThreadingTCPServer):
@@ -353,6 +364,34 @@
SocketServer.TCPServer.__init__(self, server_address,
request_handler_class)
+ def get_request (self):
+ """Get the request and client address from the socket."""
+ sock, addr = self.sibling_class.get_request(self)
+ # The thread is not create yet, it will be updated in process_request
+ self.clients.append((sock, addr, None))
+ return sock, addr
+
+ def process_request_thread(self, stopped, request, client_address):
+ SocketServer.ThreadingTCPServer.process_request_thread(
+ self, request, client_address)
+ self.close_request(request)
+ stopped.set()
+
+ def process_request(self, request, client_address):
+ """Start a new thread to process the request."""
+ stopped = threading.Event()
+ t = test_server.ThreadWithException(
+ event=stopped,
+ target = self.process_request_thread,
+ args = (stopped, request, client_address))
+ t.name = '%s -> %s' % (client_address, self.server_address)
+ if 'threads' in tests.selftest_debug_flags:
+ print 'Thread for: %s started' % (threading.currentThread().name,)
+ # Update the client description
+ self.clients.pop()
+ self.clients.append((request, client_address, t))
+ t.start()
+
class TestingTCPServerInAThread(object):
"""A server in a thread that re-raise thread exceptions."""
@@ -408,6 +447,8 @@
# server thread, it may happen that it's not blocked or even not
# started.
pass
+ # We don't have to wait for the server to shut down to start shutting
+ # down the clients, so let's start now.
self.server.stop_clients()
# Now we wait for the thread running self.server.serve() to finish
self.server.stopped.wait()
@@ -424,6 +465,7 @@
# Make sure we can be called twice safely
self.server = None
+
class TestingThreadingTCPServerInAThread(TestingTCPServerInAThread):
"""A socket server in a thread which spawn one thread for each connection"""
=== modified file 'bzrlib/tests/test_test_server.py'
--- a/bzrlib/tests/test_test_server.py 2010-06-02 20:57:53 +0000
+++ b/bzrlib/tests/test_test_server.py 2010-06-03 16:10:48 +0000
@@ -92,6 +92,13 @@
self.addCleanup(client.disconnect)
return client
+ def get_server_connection(self, server, conn_rank):
+ return server.server.clients[conn_rank]
+
+ def assertClientAddr(self, client, server, conn_rank):
+ conn = self.get_server_connection(server, conn_rank)
+ self.assertEquals(client.sock.getsockname(), conn[1])
+
def test_start_stop(self):
server = self.get_server()
client = self.get_client()
@@ -108,6 +115,7 @@
client.connect(server.server_address)
self.assertIs(None, client.write('ping\n'))
resp = client.read()
+ self.assertClientAddr(client, server, 0)
self.assertEquals('pong\n', resp)
def test_server_fails_to_start(self):
More information about the bazaar-commits
mailing list