Rev 6186: Refactor the code a bit. Now we can ensure that the active loop in http://bazaar.launchpad.net/~jameinel/bzr/2.5-soft-hangup-795025
John Arbash Meinel
john at arbash-meinel.com
Fri Sep 23 15:25:56 UTC 2011
At http://bazaar.launchpad.net/~jameinel/bzr/2.5-soft-hangup-795025
------------------------------------------------------------
revno: 6186
revision-id: john at arbash-meinel.com-20110923152547-osz802eyejhnl4v3
parent: john at arbash-meinel.com-20110923142749-jd2ketuz1u4vihfx
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.5-soft-hangup-795025
timestamp: Fri 2011-09-23 17:25:47 +0200
message:
Refactor the code a bit. Now we can ensure that the active loop
also reaps dead threads.
-------------- next part --------------
=== modified file 'bzrlib/smart/server.py'
--- a/bzrlib/smart/server.py 2011-09-23 14:27:49 +0000
+++ b/bzrlib/smart/server.py 2011-09-23 15:25:47 +0000
@@ -192,7 +192,7 @@
break
self.serve_conn(conn, thread_name_suffix)
# Cleanout any threads that have finished processing.
- # self._poll_active_connections()
+ self._poll_active_connections()
except KeyboardInterrupt:
# dont log when CTRL-C'd.
raise
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- a/bzrlib/tests/test_smart_transport.py 2011-09-23 14:27:49 +0000
+++ b/bzrlib/tests/test_smart_transport.py 2011-09-23 15:25:47 +0000
@@ -1075,6 +1075,64 @@
class TestSmartTCPServer(tests.TestCase):
+ def make_server(self):
+ """Create a SmartTCPServer that we can exercise.
+
+ Note: we don't use SmartTCPServer_for_testing because the testing
+ version overrides lots of functionality like 'serve', and we want to
+ test the raw service.
+
+ This will start the server in another thread, and wait for it to
+ indicate it has finished starting up.
+
+ :return: (server, server_thread)
+ """
+ t = _mod_transport.get_transport_from_url('memory:///')
+ server = _mod_server.SmartTCPServer(t, client_timeout=4.0)
+ server._ACCEPT_TIMEOUT = 0.1
+ # We don't use 'localhost' because that might be an IPv6 address.
+ server.start_server('127.0.0.1', 0)
+ server_thread = threading.Thread(target=server.serve,
+ args=(self.id(),))
+ server_thread.start()
+ # Ensure this gets called at some point
+ self.addCleanup(server._stop_gracefully)
+ server._started.wait()
+ return server, server_thread
+
+ def ensure_client_disconnected(self, client_sock):
+ """Ensure that a socket is closed, discarding all errors."""
+ try:
+ client_sock.close()
+ except Exception:
+ pass
+
+ def connect_to_server(self, server):
+ """Create a client socket that can talk to the server."""
+ client_sock = socket.socket()
+ server_info = server._server_socket.getsockname()
+ client_sock.connect(server_info)
+ self.addCleanup(self.ensure_client_disconnected, client_sock)
+ return client_sock
+
+ def connect_to_server_and_hangup(self, server):
+ """Connect to the server, and then hang up.
+ That way it doesn't sit waiting for 'accept()' to timeout.
+ """
+ client_sock = self.connect_to_server(server)
+ client_sock.close()
+
+ def say_hello(self, client_sock):
+ """Send the 'hello' smart RPC, and expect the response."""
+ client_sock.send('hello\n')
+ self.assertEqual('ok\x012\n', client_sock.recv(5))
+
+ def shutdown_server_cleanly(self, server, server_thread):
+ server._stop_gracefully()
+ self.connect_to_server_and_hangup(server)
+ server._stopped.wait()
+ server_thread.join()
+
def test_get_error_unexpected(self):
"""Error reported by server with no specific representation"""
self.overrideEnv('BZR_NO_SMART_VFS', None)
@@ -1120,46 +1178,47 @@
self.assertEqual(0, len(server._active_connections))
def test_serve_closes_out_finished_connections(self):
- t = _mod_transport.get_transport_from_url('memory:///')
- server = _mod_server.SmartTCPServer(t, client_timeout=1.0)
- server._ACCEPT_TIMEOUT = 0.1
- # We don't use 'localhost' because that might be an IPv6 address.
- server.start_server('127.0.0.1', 0)
- server_thread = threading.Thread(target=server.serve,
- args=(self.id(),))
- server_thread.start()
- client_sock = None
- try:
- # Wait for the server to start. Then connect to the port
- client_sock = socket.socket()
- server._started.wait()
- client_sock.connect(('127.0.0.1', server.port))
- # We send and receive on the connection, so that we know the
- # server-side has seen the connect, and started handling the
- # results.
- client_sock.send('hello\n')
- self.assertEqual('ok\x012\n', client_sock.recv(5))
- self.assertEqual(1, len(server._active_connections))
- # Grab a handle to the thread that is processing our request
- server_side_thread = server._active_connections[0][2]
- finally:
- # Close the connection, ask the server to stop, and wait for the
- # server to stop, as well as the thread that was servicing the
- # client request.
- if client_sock is not None:
- client_sock.close()
- # Wait for the server-side request thread to notice we are closed.
- server_side_thread.join()
- # Stop the server, it should notice the connection has finished.
- server._stop_gracefully()
- # Do a throw-away connection so that the server stops immediately
- s = socket.socket()
- s.connect(('127.0.0.1', server.port))
- s.close()
- server._stopped.wait()
- server_thread.join()
+ server, server_thread = self.make_server()
+ # The server is started, connect to it.
+ client_sock = self.connect_to_server(server)
+ # We send and receive on the connection, so that we know the
+ # server-side has seen the connect, and started handling the
+ # results.
+ self.say_hello(client_sock)
+ self.assertEqual(1, len(server._active_connections))
+ # Grab a handle to the thread that is processing our request
+ _, _, server_side_thread = server._active_connections[0]
+ # Close the connection, ask the server to stop, and wait for the
+ # server to stop, as well as the thread that was servicing the
+ # client request.
+ client_sock.close()
+ # Wait for the server-side request thread to notice we are closed.
+ server_side_thread.join()
+ # Stop the server, it should notice the connection has finished.
+ self.shutdown_server_cleanly(server, server_thread)
+ # The server should have noticed that all clients are gone before
+ # exiting.
self.assertEqual(0, len(server._active_connections))
+ def test_serve_reaps_finished_connections(self):
+ server, server_thread = self.make_server()
+ client_sock1 = self.connect_to_server(server)
+ # We send and receive on the connection, so that we know the
+ # server-side has seen the connect, and started handling the
+ # results.
+ self.say_hello(client_sock1)
+ self.assertEqual(1, len(server._active_connections))
+ _, _, server_side_thread1 = server._active_connections[0]
+ client_sock1.close()
+ server_side_thread1.join()
+ client_sock2 = self.connect_to_server(server)
+ self.say_hello(client_sock2)
+ self.assertEqual(1, len(server._active_connections))
+ _, _, server_side_thread2 = server._active_connections[0]
+ client_sock2.close()
+ server_side_thread2.join()
+ self.shutdown_server_cleanly(server, server_thread)
+
class SmartTCPTests(tests.TestCase):
"""Tests for connection/end to end behaviour using the TCP server.
More information about the bazaar-commits
mailing list