Rev 5266: Really test against a threading server and properly shutdown socket and threads. in file:///home/vila/src/bzr/experimental/leaking-tests/
Vincent Ladeuil
v.ladeuil+lp at free.fr
Thu Jun 3 18:40:49 BST 2010
At file:///home/vila/src/bzr/experimental/leaking-tests/
------------------------------------------------------------
revno: 5266
revision-id: v.ladeuil+lp at free.fr-20100603174049-st3ip75ad46j3npv
parent: v.ladeuil+lp at free.fr-20100603161048-n0ir7wz6t7qtycg4
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: propagate-exceptions
timestamp: Thu 2010-06-03 19:40:49 +0200
message:
Really test against a threading server and properly shutdown socket and threads.
-------------- next part --------------
=== modified file 'bzrlib/tests/test_server.py'
--- a/bzrlib/tests/test_server.py 2010-06-03 16:10:48 +0000
+++ b/bzrlib/tests/test_server.py 2010-06-03 17:40:49 +0000
@@ -14,6 +14,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+import errno
import socket
import SocketServer
import select
@@ -260,28 +261,40 @@
self.ready.clear()
try:
super(ThreadWithException, self).run()
- except Exception, e:
+ except:
self.exception = sys.exc_info()
finally:
# Make sure the calling thread is released
self.ready.set()
- def join(self, *args, **kwargs):
+ def join(self, timeout=5):
"""Overrides Thread.join to raise any exception caught.
Calling join(timeout=0) will raise the caught exception or return None
if the thread is still alive.
+
+ The default timeout is set to 5 and should expire only when a thread
+ serving a client connection is hung.
"""
- # Note that we don't care about the timeout parameter here: either the
- # thread has raised an exception and it should be raised (and join()
- # should succeed whatever the timeout is) or it's still alive which
- # means it didn't encounter an exception.
- super(ThreadWithException, self).join(*args, **kwargs)
+ super(ThreadWithException, self).join(timeout)
if self.exception is not None:
exc_class, exc_value, exc_tb = self.exception
+ self.execption = None # The exception should be raised only once
raise exc_class, exc_value, exc_tb
+ if timeout and self.isAlive():
+ # The timeout expired without joining the thread, the thread is
+ # therefore stucked and that's a failure as far as the test is
+ # concerned. We used to hang here.
+ raise AssertionError('thread %s hung' % (self.name,))
+
+ def pending_exception(self):
+ """Raise the caught exception.
+
+ This does nothing if no exception occurred.
+ """
+ self.join(timeout=0)
class TestingTCPServerMixin:
@@ -336,8 +349,34 @@
self.serving.clear()
raise
- def stop_clients(self):
- pass
+ # The following methods are called by the main thread
+
+ def stop_client_connections(self):
+ while self.clients:
+ c = self.clients.pop()
+ self.shutdown_client(c)
+
+ def shutdown_client_socket(self, sock):
+ """Properly shutdown a client socket.
+
+ Under some circumstances (as in bug #383920), we need to force the
+ shutdown as python delays it until gc occur otherwise and the client
+ may hang.
+
+ This should be called only when no other thread is trying to use the
+ socket.
+ """
+ try:
+ # The request process has been completed, the thread is about to
+ # die, let's shutdown the socket if we can.
+ sock.shutdown(socket.SHUT_RDWR)
+ sock.close()
+ except (socket.error, select.error), e:
+ if e[0] in (errno.EBADF, errno.ENOTCONN):
+ # Right, the socket is already down
+ pass
+ else:
+ raise
class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
@@ -355,6 +394,11 @@
# The following methods are called by the main thread
+ def shutdown_client(self, client):
+ sock, addr = client
+ self.shutdown_client_socket(sock)
+
+
class TestingThreadingTCPServer(TestingTCPServerMixin,
SocketServer.ThreadingTCPServer):
@@ -371,7 +415,8 @@
self.clients.append((sock, addr, None))
return sock, addr
- def process_request_thread(self, stopped, request, client_address):
+ def process_request_thread(self, started, stopped, request, client_address):
+ started.set()
SocketServer.ThreadingTCPServer.process_request_thread(
self, request, client_address)
self.close_request(request)
@@ -379,18 +424,32 @@
def process_request(self, request, client_address):
"""Start a new thread to process the request."""
+ started = threading.Event()
stopped = threading.Event()
- t = test_server.ThreadWithException(
+ t = ThreadWithException(
event=stopped,
target = self.process_request_thread,
- args = (stopped, request, client_address))
+ args = (started, stopped, request, client_address))
t.name = '%s -> %s' % (client_address, self.server_address)
- if 'threads' in tests.selftest_debug_flags:
- print 'Thread for: %s started' % (threading.currentThread().name,)
# Update the client description
self.clients.pop()
self.clients.append((request, client_address, t))
t.start()
+ started.wait()
+ # If an exception occured during the thread start, it will get raised.
+ t.pending_exception()
+
+ # The following methods are called by the main thread
+
+ def shutdown_client(self, client):
+ sock, addr, t = client
+ self.shutdown_client_socket(sock)
+ if t is not None:
+ # The thread has been created only if the request is processed but
+ # after the connection is inited. This could happen during server
+ # shutdown. If an exception occurred in the thread it will be
+ # re-raised
+ t.join()
class TestingTCPServerInAThread(object):
@@ -406,13 +465,6 @@
return self.server_class(self.server_address,
self.request_handler_class)
- def pending_exception(self):
- """Re-raise the exception raised by the server in its own thread.
-
- This does nothing if no exception occurred.
- """
- self._server_thread.join(timeout=0)
-
def start_server(self):
self.server = self.create_server()
self._server_thread = ThreadWithException(
@@ -422,8 +474,10 @@
self.server.started.wait()
# Get the real address, especially the port
self.server_address = self.server.server_address
- # If an exception occured during the server start, it will get raised
- self.pending_exception()
+ self._server_thread.name = self.server_address
+ # If an exception occured during the server start, it will get raised,
+ # otherwise, the server is blocked on its accept() call.
+ self._server_thread.pending_exception()
# From now on, we'll use a different event to ensure the server can set
# its exception
self._server_thread.set_event(self.server.stopped)
@@ -434,44 +488,40 @@
def stop_server(self):
if self.server is None:
return
- # The server has been started successfully, shut it down now
- # As soon as we stop serving, no more connection are accepted except
- # one to get out of the blocking listen.
- self.server.serving.clear()
- # The server is listening for a last connection, let's give it:
- last_conn = None
- try:
- last_conn = osutils.connect_socket(self.server.server_address)
- except socket.error, e:
- # But ignore connection errors as the point is to unblock the
- # server thread, it may happen that it's not blocked or even not
- # started.
- pass
- # We don't have to wait for the server to shut down to start shutting
- # down the clients, so let's start now.
- self.server.stop_clients()
- # Now we wait for the thread running self.server.serve() to finish
- self.server.stopped.wait()
- if last_conn is not None:
- # Close the last connection without trying to use it. The server
- # will not process a single byte on that socket to avoid
- # complications (SSL starts with a handshake for example).
- last_conn.close()
- try:
+ try:
+ # The server has been started successfully, shut it down now. As
+ # soon as we stop serving, no more connection are accepted except
+ # one to get out of the blocking listen.
+ self.server.serving.clear()
+ # The server is listening for a last connection, let's give it:
+ last_conn = None
+ try:
+ last_conn = osutils.connect_socket(self.server.server_address)
+ except socket.error, e:
+ # But ignore connection errors as the point is to unblock the
+ # server thread, it may happen that it's not blocked or even
+ # not started.
+ pass
+ # We start shutting down the client while the server itself is
+ # shutting down.
+ self.server.stop_client_connections()
+ # Now we wait for the thread running self.server.serve() to finish
+ self.server.stopped.wait()
+ if last_conn is not None:
+ # Close the last connection without trying to use it. The
+ # server will not process a single byte on that socket to avoid
+ # complications (SSL starts with a handshake for example).
+ last_conn.close()
# Check for any exception that could have occurred in the server
# thread
self._server_thread.join()
finally:
- # Make sure we can be called twice safely
+ # Make sure we can be called twice safely, note that this means
+ # that we will raise a single exception even if several occurred in
+ # the various threads involved.
self.server = None
-class TestingThreadingTCPServerInAThread(TestingTCPServerInAThread):
- """A socket server in a thread which spawn one thread for each connection"""
-
- pass
-
-
class SmartTCPServer_for_testing(server.SmartTCPServer):
"""Server suitable for use by transport tests.
=== modified file 'bzrlib/tests/test_test_server.py'
--- a/bzrlib/tests/test_test_server.py 2010-06-03 16:10:48 +0000
+++ b/bzrlib/tests/test_test_server.py 2010-06-03 17:40:49 +0000
@@ -72,17 +72,18 @@
else:
raise ValueError('[%s] not understood' % req)
-class TestTestingServerInAThread(tests.TestCase):
-
- server_in_thread_class = test_server.TestingTCPServerInAThread
+
+class TestTCPServerInAThread(tests.TestCase):
+
+ server_class = test_server.TestingTCPServer
def get_server(self, server_class=None, connection_handler_class=None):
- if server_class is None:
- server_class = test_server.TestingTCPServer
+ if server_class is not None:
+ self.server_class = server_class
if connection_handler_class is None:
connection_handler_class = TCPConnectionHandler
- server = self.server_in_thread_class(
- ('localhost', 0), server_class, connection_handler_class)
+ server = test_server.TestingTCPServerInAThread(
+ ('localhost', 0), self.server_class, connection_handler_class)
server.start_server()
self.addCleanup(server.stop_server)
return server
@@ -158,7 +159,7 @@
# Now the server has raise the exception in its own thread
self.assertRaises(ServerFailure, server.stop_server)
-class TestTestingThreadingServerInAThread(TestTestingServerInAThread):
+class TestThreadingTCPServerInAThread(TestTCPServerInAThread):
- server_in_thread_class = test_server.TestingThreadingTCPServerInAThread
+ server_class = test_server.TestingThreadingTCPServer
More information about the bazaar-commits
mailing list