Rev 5248: Merge first-try into propagate-exceptions in file:///home/vila/src/bzr/experimental/leaking-tests/

Vincent Ladeuil v.ladeuil+lp at free.fr
Wed May 26 11:38:18 BST 2010


At file:///home/vila/src/bzr/experimental/leaking-tests/

------------------------------------------------------------
revno: 5248 [merge]
revision-id: v.ladeuil+lp at free.fr-20100526103818-1x38taslbkp097z9
parent: pqm at pqm.ubuntu.com-20100521084917-u9sae95g7s0nu2bf
parent: v.ladeuil+lp at free.fr-20100525124713-f002xif3ecmu5d3q
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: propagate-exceptions
timestamp: Wed 2010-05-26 12:38:18 +0200
message:
  Merge first-try into propagate-exceptions
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/builtins.py             builtins.py-20050830033751-fc01482b9ca23183
  bzrlib/smart/server.py         server.py-20061110062051-chzu10y32vx8gvur-1
  bzrlib/tests/EncodingAdapter.py EncodingAdapter.py-20060113032051-4d7e1d8c1e38b4a1
  bzrlib/tests/__init__.py       selftest.py-20050531073622-8d0e3c8845c97a64
  bzrlib/tests/ftp_server/medusa_based.py ftpserver.py-20071019102346-61jbvdkrr70igauv-1
  bzrlib/tests/ftp_server/pyftpdlib_based.py pyftpdlib_based.py-20090227151014-882k9q34m1gwnhvi-1
  bzrlib/tests/http_server.py    httpserver.py-20061012142527-m1yxdj1xazsf8d7s-1
  bzrlib/tests/https_server.py   https_server.py-20071121173708-aj8zczi0ziwbwz21-1
  bzrlib/tests/stub_sftp.py      stub_sftp.py-20051027032739-0e7ef4f7bab0e174
  bzrlib/tests/test_http.py      testhttp.py-20051018020158-b2eef6e867c514d9
  bzrlib/tests/test_server.py    test_server.py-20100209163834-im1ozfuenfmqaa2m-1
-------------- next part --------------
=== modified file 'NEWS'
--- a/NEWS	2010-05-21 07:17:13 +0000
+++ b/NEWS	2010-05-24 01:05:14 +0000
@@ -111,6 +111,10 @@
   estimate of the number of records to be fetched vs actually fetched.
   (Parth Malwankar, #374740, #538868)
 
+* Most of the leaked threads during selftest are now fixed, allowing the
+  full test suite to pass on gentoo.
+  (Vincent Ladeuil, #392127)
+
 * Reduce peak memory by one copy of compressed text.
   (John Arbash Meinel, #566940)
 
@@ -240,6 +244,9 @@
 * ``bzr selftest --parallel=subprocess`` now works correctly on win32.
    (Gordon Tyler, #551332)
 
+* HTTP test servers will leak less threads (and sockets) and will not hang on
+  AIX anymore. (Vincent Ladeuil, #405745)
+
 * Workaround ``Crypto.Random`` check leading to spurious test
   failures on Lucid, FreeBSD and gentoo.  
   (Vincent Ladeuil, #528436)

=== modified file 'bzrlib/builtins.py'
--- a/bzrlib/builtins.py	2010-05-19 02:58:05 +0000
+++ b/bzrlib/builtins.py	2010-05-24 01:05:14 +0000
@@ -3567,15 +3567,17 @@
             randomize=None, exclude=None, strict=False,
             load_list=None, debugflag=None, starting_with=None, subunit=False,
             parallel=None, lsprof_tests=False):
-        from bzrlib.tests import selftest
-        import bzrlib.benchmarks as benchmarks
-        from bzrlib.benchmarks import tree_creator
+        from bzrlib import (
+            benchmarks,
+            tests,
+            )
 
         # Make deprecation warnings visible, unless -Werror is set
         symbol_versioning.activate_deprecation_warnings(override=False)
 
         if cache_dir is not None:
-            tree_creator.TreeCreator.CACHE_ROOT = osutils.abspath(cache_dir)
+            benchmarks.tree_creator.TreeCreator.CACHE_ROOT = osutils.abspath(
+                cache_dir)
         if testspecs_list is not None:
             pattern = '|'.join(testspecs_list)
         else:
@@ -3627,7 +3629,7 @@
                           "starting_with": starting_with
                           }
         selftest_kwargs.update(self.additional_selftest_args)
-        result = selftest(**selftest_kwargs)
+        result = tests.selftest(**selftest_kwargs)
         return int(not result)
 
 

=== modified file 'bzrlib/smart/server.py'
--- a/bzrlib/smart/server.py	2010-03-05 07:27:58 +0000
+++ b/bzrlib/smart/server.py	2010-05-24 01:05:14 +0000
@@ -18,6 +18,7 @@
 
 import errno
 import os.path
+import select
 import socket
 import sys
 import threading
@@ -174,6 +175,7 @@
             None, handler.serve, name=thread_name)
         connection_thread.setDaemon(True)
         connection_thread.start()
+        return connection_thread
 
     def start_background_thread(self, thread_name_suffix=''):
         self._started.clear()

=== modified file 'bzrlib/tests/EncodingAdapter.py'
--- a/bzrlib/tests/EncodingAdapter.py	2009-03-23 14:59:43 +0000
+++ b/bzrlib/tests/EncodingAdapter.py	2010-05-24 01:05:14 +0000
@@ -1,4 +1,4 @@
-# Copyright (C) 2006 Canonical Ltd
+# Copyright (C) 2006, 2009, 2010 Canonical Ltd
 # -*- coding: utf-8 -*-
 #
 # This program is free software; you can redistribute it and/or modify
@@ -19,9 +19,6 @@
 
 from copy import deepcopy
 
-from bzrlib.tests import TestSuite
-
-
 # prefix for micro (1/1000000)
 _mu = u'\xb5'
 

=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py	2010-05-20 18:23:17 +0000
+++ b/bzrlib/tests/__init__.py	2010-05-25 07:30:27 +0000
@@ -114,11 +114,6 @@
     TestUtil,
     treeshape,
     )
-from bzrlib.tests.http_server import HttpServer
-from bzrlib.tests.TestUtil import (
-                          TestSuite,
-                          TestLoader,
-                          )
 from bzrlib.ui import NullProgressView
 from bzrlib.ui.text import TextUIFactory
 import bzrlib.version_info_formats.format_custom
@@ -846,6 +841,8 @@
         # going away but leak one) but it seems less likely than the actual
         # false positives (the test see threads going away and does not leak).
         if leaked_threads > 0:
+            if 'threads' in selftest_debug_flags:
+                print '%s is leaking, active is now %d' % (self.id(), active)
             TestCase._leaking_threads_tests += 1
             if TestCase._first_thread_leaker_id is None:
                 TestCase._first_thread_leaker_id = self.id()
@@ -2725,9 +2722,10 @@
     """
 
     def setUp(self):
+        from bzrlib.tests import http_server
         super(ChrootedTestCase, self).setUp()
         if not self.vfs_transport_factory == memory.MemoryServer:
-            self.transport_readonly_server = HttpServer
+            self.transport_readonly_server = http_server.HttpServer
 
 
 def condition_id_re(pattern):
@@ -3055,7 +3053,7 @@
     return suite
 
 
-class TestDecorator(TestSuite):
+class TestDecorator(TestUtil.TestSuite):
     """A decorator for TestCase/TestSuite objects.
     
     Usually, subclasses should override __iter__(used when flattening test
@@ -3064,7 +3062,7 @@
     """
 
     def __init__(self, suite):
-        TestSuite.__init__(self)
+        TestUtil.TestSuite.__init__(self)
         self.addTest(suite)
 
     def countTestCases(self):
@@ -3237,7 +3235,7 @@
 
     test_blocks = partition_tests(suite, concurrency)
     for process_tests in test_blocks:
-        process_suite = TestSuite()
+        process_suite = TestUtil.TestSuite()
         process_suite.addTests(process_tests)
         c2pread, c2pwrite = os.pipe()
         pid = os.fork()
@@ -3393,6 +3391,8 @@
 #                           rather than failing tests. And no longer raise
 #                           LockContention when fctnl locks are not being used
 #                           with proper exclusion rules.
+#   -Ethreads               Will display thread ident at creation/join time to
+#                           help track thread leaks
 selftest_debug_flags = set()
 
 
@@ -3981,7 +3981,7 @@
     ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
     ...     [('one', dict(param=1)),
     ...      ('two', dict(param=2))],
-    ...     TestSuite())
+    ...     TestUtil.TestSuite())
     >>> tests = list(iter_suite_tests(r))
     >>> len(tests)
     2

=== modified file 'bzrlib/tests/ftp_server/medusa_based.py'
--- a/bzrlib/tests/ftp_server/medusa_based.py	2010-02-23 07:43:11 +0000
+++ b/bzrlib/tests/ftp_server/medusa_based.py	2010-05-24 01:05:14 +0000
@@ -254,6 +254,8 @@
         self._async_thread = threading.Thread(
                 target=FTPTestServer._asyncore_loop_ignore_EBADF,
                 kwargs={'timeout':0.1, 'count':10000})
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Thread started: %s' % (self._async_thread.ident,)
         self._async_thread.setDaemon(True)
         self._async_thread.start()
 
@@ -261,6 +263,8 @@
         self._ftp_server.close()
         asyncore.close_all()
         self._async_thread.join()
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Thread  joined: %s' % (self._async_thread.ident,)
 
     @staticmethod
     def _asyncore_loop_ignore_EBADF(*args, **kwargs):

=== modified file 'bzrlib/tests/ftp_server/pyftpdlib_based.py'
--- a/bzrlib/tests/ftp_server/pyftpdlib_based.py	2010-02-23 07:43:11 +0000
+++ b/bzrlib/tests/ftp_server/pyftpdlib_based.py	2010-05-24 01:05:14 +0000
@@ -28,6 +28,7 @@
 
 from bzrlib import (
     osutils,
+    tests,
     trace,
     )
 from bzrlib.tests import test_server
@@ -183,6 +184,8 @@
         self._ftpd_starting.acquire() # So it can be released by the server
         self._ftpd_thread = threading.Thread(target=self._run_server,)
         self._ftpd_thread.start()
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Thread started: %s' % (self._ftpd_thread.ident,)
         # Wait for the server thread to start (i.e release the lock)
         self._ftpd_starting.acquire()
         self._ftpd_starting.release()
@@ -195,6 +198,8 @@
         self._ftp_server.close()
         self._ftpd_running = False
         self._ftpd_thread.join()
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Thread  joined: %s' % (self._ftpd_thread.ident,)
 
     def _run_server(self):
         """Run the server until stop_server is called, shut it down properly then.

=== modified file 'bzrlib/tests/http_server.py'
--- a/bzrlib/tests/http_server.py	2010-02-23 07:43:11 +0000
+++ b/bzrlib/tests/http_server.py	2010-05-25 12:47:13 +0000
@@ -30,7 +30,10 @@
 import urllib
 import urlparse
 
-from bzrlib import transport
+from bzrlib import (
+    tests,
+    transport,
+    )
 from bzrlib.tests import test_server
 from bzrlib.transport import local
 
@@ -78,7 +81,7 @@
         connection early to avoid polluting the test results.
         """
         try:
-            SimpleHTTPServer.SimpleHTTPRequestHandler.handle_one_request(self)
+            self._handle_one_request()
         except socket.error, e:
             # Any socket error should close the connection, but some errors are
             # due to the client closing early and we don't want to pollute test
@@ -89,6 +92,9 @@
                                      errno.ECONNABORTED, errno.EBADF)):
                 raise
 
+    def _handle_one_request(self):
+        SimpleHTTPServer.SimpleHTTPRequestHandler.handle_one_request(self)
+
     _range_regexp = re.compile(r'^(?P<start>\d+)-(?P<end>\d+)$')
     _tail_regexp = re.compile(r'^-(?P<tail>\d+)$')
 
@@ -318,44 +324,162 @@
         # the tests cases.
         self.test_case_server = test_case_server
         self._home_dir = test_case_server._home_dir
-
-    def stop_server(self):
-         """Called to clean-up the server.
-
-         Since the server may be (surely is, even) in a blocking listen, we
-         shutdown its socket before closing it.
-         """
-         # Note that is this executed as part of the implicit tear down in the
-         # main thread while the server runs in its own thread. The clean way
-         # to tear down the server is to instruct him to stop accepting
-         # connections and wait for the current connection(s) to end
-         # naturally. To end the connection naturally, the http transports
-         # should close their socket when they do not need to talk to the
-         # server anymore. This happens naturally during the garbage collection
-         # phase of the test transport objetcs (the server clients), so we
-         # don't have to worry about them.  So, for the server, we must tear
-         # down here, from the main thread, when the test have ended.  Note
-         # that since the server is in a blocking operation and since python
-         # use select internally, shutting down the socket is reliable and
-         # relatively clean.
-         try:
-             self.socket.shutdown(socket.SHUT_RDWR)
-         except socket.error, e:
-             # WSAENOTCONN (10057) 'Socket is not connected' is harmless on
-             # windows (occurs before the first connection attempt
-             # vila--20071230)
-
-             # 'Socket is not connected' can also occur on OSX, with a
-             # "regular" ENOTCONN (when something went wrong during test case
-             # setup leading to self.setUp() *not* being called but
-             # self.stop_server() still being called -- vila20081106
-             if not len(e.args) or e.args[0] not in (errno.ENOTCONN, 10057):
-                 raise
-         # Let the server properly close the socket
-         self.server_close()
-
-
-class TestingHTTPServer(SocketServer.TCPServer, TestingHTTPServerMixin):
+        self.serving = None
+        self.is_shut_down = threading.Event()
+        # We collect the sockets/threads used by the clients so we can
+        # close/join them when shutting down
+        self.clients = []
+
+    def get_request (self):
+        """Get the request and client address from the socket.
+        """
+        sock, addr = self._get_request()
+        self.clients.append([sock, addr])
+        return sock, addr
+
+    def verify_request(self, request, client_address):
+        """Verify the request.
+
+        Return True if we should proceed with this request, False if we should
+        not even touch a single byte in the socket !
+        """
+        return self.serving is not None and self.serving.isSet()
+
+    def handle_request(self):
+        request, client_address = self.get_request()
+        try:
+            if self.verify_request(request, client_address):
+                self.process_request(request, client_address)
+        except:
+            if self.serving is not None and self.serving.isSet():
+                self.handle_error(request, client_address)
+            else:
+                # Exceptions raised while we shut down are just noise, but feel
+                # free to put a breakpoint here if you suspect something
+                # else. Such an example is the SSL handshake: it's automatic
+                # once we start processing the request but the last connection
+                # will close immediately and will not be able to correctly
+                # reply.
+                pass
+            self.close_request(request)
+
+    def server_bind(self):
+        # The following has been fixed in 2.5 so we need to provide it for
+        # older python versions.
+        if sys.version < (2, 5):
+            self.server_address = self.socket.getsockname()
+
+    def serve(self, started):
+        self.serving  = threading.Event()
+        self.serving.set()
+        self.is_shut_down.clear()
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Starting %r' % (self.server_address,)
+        # We are listening and ready to accept connections
+        started.set()
+        while self.serving.isSet():
+            if 'threads' in tests.selftest_debug_flags:
+                print 'Accepting on %r' % (self.server_address,)
+            # Really a connection but the python framework is generic and
+            # call them requests
+            self.handle_request()
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Closing  %r' % (self.server_address,)
+        # Let's close the listening socket
+        self.server_close()
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Closed   %r' % (self.server_address,)
+        self.is_shut_down.set()
+
+    def connect_socket(self):
+        err = socket.error('getaddrinfo returns an empty list')
+        for res in socket.getaddrinfo(*self.server_address):
+            af, socktype, proto, canonname, sa = res
+            sock = None
+            try:
+                sock = socket.socket(af, socktype, proto)
+                sock.connect(sa)
+                return sock
+
+            except socket.error, err:
+                # 'err' is now the most recent error
+                if sock is not None:
+                    sock.close()
+        raise err
+
+    def join_thread(self, thread, timeout=2):
+        thread.join(timeout)
+        if thread.isAlive():
+            # The timeout expired without joining the thread, the thread is
+            # therefore stucked and that's a failure as far as the test is
+            # concerned. We used to hang here.
+            raise AssertionError('thread %s hung' % (thread.name,))
+
+    def shutdown(self):
+        """Stops the serve() loop.
+
+        Blocks until the loop has finished. This must be called while serve()
+        is running in another thread, or it will deadlock.
+        """
+        if self.serving is None:
+            # If the server wasn't properly started, there is nothing to
+            # shutdown.
+            return
+        # As soon as we stop serving, no more connection are accepted except
+        # one to get out of the blocking listen.
+        self.serving.clear()
+        # The server is listening for a last connection, let's give it:
+        last_conn = None
+        try:
+            last_conn = self.connect_socket()
+        except socket.error, e:
+            # But ignore connection errors as the point is to unblock the
+            # server thread, it may happen that it's not blocked or even not
+            # started (when something went wrong during test case setup
+            # leading to self.setUp() *not* being called but self.tearDown()
+            # still being called)
+            pass
+        # We don't have to wait for the server to shut down to start shutting
+        # down the clients, so let's start now.
+        for c in self.clients:
+            self.shutdown_client(c)
+        self.clients = []
+        # Now we wait for the thread running serve() to finish
+        self.is_shut_down.wait()
+        if last_conn is not None:
+            # Close the last connection without trying to use it. The server
+            # will not process a single byte on that socket to avoid
+            # complications (SSL starts with a handshake for example).
+            last_conn.close()
+
+    def shutdown_client(self, client):
+        sock, addr = client[:2]
+        self.shutdown_client_socket(sock)
+
+    def shutdown_client_socket(self, sock):
+        """Properly shutdown a client socket.
+
+        Under some circumstances (as in bug #383920), we need to force the
+        shutdown as python delays it until gc occur otherwise and the client
+        may hang.
+
+        This should be called only when no other thread is trying to use the
+        socket.
+        """
+        try:
+            # The request process has been completed, the thread is about to
+            # die, let's shutdown the socket if we can.
+            sock.shutdown(socket.SHUT_RDWR)
+        except (socket.error, select.error), e:
+            print 'exception in shutdown_client_socket: %r' % (e,)
+            if e[0] in (errno.EBADF, errno.ENOTCONN):
+                # Right, the socket is already down
+                pass
+            else:
+                raise
+
+
+class TestingHTTPServer(TestingHTTPServerMixin, SocketServer.TCPServer):
 
     def __init__(self, server_address, request_handler_class,
                  test_case_server):
@@ -363,9 +487,17 @@
         SocketServer.TCPServer.__init__(self, server_address,
                                         request_handler_class)
 
-
-class TestingThreadingHTTPServer(SocketServer.ThreadingTCPServer,
-                                 TestingHTTPServerMixin):
+    def _get_request (self):
+        return SocketServer.TCPServer.get_request(self)
+
+    def server_bind(self):
+        SocketServer.TCPServer.server_bind(self)
+        TestingHTTPServerMixin.server_bind(self)
+
+
+class TestingThreadingHTTPServer(TestingHTTPServerMixin,
+                                 SocketServer.ThreadingTCPServer,
+                                 ):
     """A threading HTTP test server for HTTP 1.1.
 
     Since tests can initiate several concurrent connections to the same http
@@ -383,22 +515,47 @@
         # lying around.
         self.daemon_threads = True
 
-    def process_request_thread(self, request, client_address):
+    def _get_request (self):
+        return SocketServer.ThreadingTCPServer.get_request(self)
+
+    def process_request_thread(self, started, request, client_address):
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Processing: %s' % (threading.currentThread().name,)
+        started.set()
         SocketServer.ThreadingTCPServer.process_request_thread(
             self, request, client_address)
-        # Under some circumstances (as in bug #383920), we need to force the
-        # shutdown as python delays it until gc occur otherwise and the client
-        # may hang.
-        try:
-            # The request process has been completed, the thread is about to
-            # die, let's shutdown the socket if we can.
-            request.shutdown(socket.SHUT_RDWR)
-        except (socket.error, select.error), e:
-            if e[0] in (errno.EBADF, errno.ENOTCONN):
-                # Right, the socket is already down
-                pass
-            else:
-                raise
+        # Shutdown the socket as soon as possible, the thread will be joined
+        # later if needed during server shutdown thread.
+        self.shutdown_client_socket(request)
+
+    def process_request(self, request, client_address):
+        """Start a new thread to process the request."""
+        client = self.clients.pop()
+        started = threading.Event()
+        t = threading.Thread(target = self.process_request_thread,
+                             args = (started, request, client_address))
+        t.name = '%s -> %s' % (client_address, self.server_address)
+        client.append(t)
+        self.clients.append(client)
+        if self.daemon_threads:
+            t.setDaemon (1)
+        t.start()
+        started.wait()
+
+    def shutdown_client(self, client):
+        TestingHTTPServerMixin.shutdown_client(self, client)
+        if len(client) == 3:
+            # The thread has been created only if the request is processed but
+            # after the connection is inited. This could happne when the server
+            # is shut down.
+            sock, addr, thread = client
+            if 'threads' in tests.selftest_debug_flags:
+                print 'Try    joining: %s' % (thread.name,)
+            self.join_thread(thread)
+
+    def server_bind(self):
+        SocketServer.ThreadingTCPServer.server_bind(self)
+        TestingHTTPServerMixin.server_bind(self)
 
 
 class HttpServer(transport.Server):
@@ -463,48 +620,30 @@
                 raise httplib.UnknownProtocol(proto_vers)
             else:
                 self._httpd = self.create_httpd(serv_cls, rhandler)
-            self.host, self.port = self._httpd.socket.getsockname()
+            # Ensure we get the right port and an updated host if needed
+            self.host, self.port = self._httpd.server_address
         return self._httpd
 
-    def _http_start(self):
+    def _http_start(self, started):
         """Server thread main entry point. """
-        self._http_running = False
+        server = None
         try:
-            try:
-                httpd = self._get_httpd()
-                self._http_base_url = '%s://%s:%s/' % (self._url_protocol,
-                                                       self.host, self.port)
-                self._http_running = True
-            except:
-                # Whatever goes wrong, we save the exception for the main
-                # thread. Note that since we are running in a thread, no signal
-                # can be received, so we don't care about KeyboardInterrupt.
-                self._http_exception = sys.exc_info()
-        finally:
-            # Release the lock or the main thread will block and the whole
-            # process will hang.
-            self._http_starting.release()
+            server = self._get_httpd()
+            self._http_base_url = '%s://%s:%s/' % (self._url_protocol,
+                                                   self.host, self.port)
+        except:
+            # Whatever goes wrong, we save the exception for the main
+            # thread. Note that since we are running in a thread, no signal
+            # can be received, so we don't care about KeyboardInterrupt.
+            self._http_exception = sys.exc_info()
 
-        # From now on, exceptions are taken care of by the
-        # SocketServer.BaseServer or the request handler.
-        while self._http_running:
-            try:
-                # Really an HTTP connection but the python framework is generic
-                # and call them requests
-                httpd.handle_request()
-            except socket.timeout:
-                pass
-            except (socket.error, select.error), e:
-                if (e[0] == errno.EBADF
-                    or (sys.platform == 'win32' and e[0] == 10038)):
-                    # Starting with python-2.6, handle_request may raise socket
-                    # or select exceptions when the server is shut down (as we
-                    # do).
-                    # 10038 = WSAENOTSOCK
-                    # http://msdn.microsoft.com/en-us/library/ms740668%28VS.85%29.aspx
-                    pass
-                else:
-                    raise
+        if server is not None:
+            # From now on, exceptions are taken care of by the
+            # SocketServer.BaseServer or the request handler.
+            server.serve(started)
+        if not started.isSet():
+            # Hmm, something went wrong, but we can release the caller anyway
+            started.set()
 
     def _get_remote_url(self, path):
         path_parts = path.split(os.path.sep)
@@ -535,36 +674,40 @@
                 or isinstance(backing_transport_server,
                               test_server.LocalURLServer)):
             raise AssertionError(
-                "HTTPServer currently assumes local transport, got %s" % \
+                "HTTPServer currently assumes local transport, got %s" %
                 backing_transport_server)
         self._home_dir = os.getcwdu()
         self._local_path_parts = self._home_dir.split(os.path.sep)
         self._http_base_url = None
 
         # Create the server thread
-        self._http_starting = threading.Lock()
-        self._http_starting.acquire()
-        self._http_thread = threading.Thread(target=self._http_start)
+        started = threading.Event()
+        self._http_thread = threading.Thread(target=self._http_start,
+                                             args = (started,))
         self._http_thread.setDaemon(True)
         self._http_exception = None
         self._http_thread.start()
-
         # Wait for the server thread to start (i.e release the lock)
-        self._http_starting.acquire()
+        started.wait()
+        self._http_thread.name = self._http_base_url
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Thread started: %s' % (self._http_thread.name,)
+
 
         if self._http_exception is not None:
             # Something went wrong during server start
             exc_class, exc_value, exc_tb = self._http_exception
             raise exc_class, exc_value, exc_tb
-        self._http_starting.release()
         self.logs = []
 
     def stop_server(self):
-        self._httpd.stop_server()
-        self._http_running = False
-        # We don't need to 'self._http_thread.join()' here since the thread is
-        # a daemonic one and will be garbage collected anyway. Joining just
-        # slows us down for no added benefit.
+        """See bzrlib.transport.Server.tearDown."""
+        self._httpd.shutdown()
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Try    joining: %s' % (self._http_thread.name,)
+        self._httpd.join_thread(self._http_thread)
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Thread  joined: %s' % (self._http_thread.name,)
 
     def get_url(self):
         """See bzrlib.transport.Server.get_url."""

=== modified file 'bzrlib/tests/https_server.py'
--- a/bzrlib/tests/https_server.py	2009-03-23 14:59:43 +0000
+++ b/bzrlib/tests/https_server.py	2010-05-24 01:05:14 +0000
@@ -1,4 +1,4 @@
-# Copyright (C) 2007 Canonical Ltd
+# Copyright (C) 2007-2010 Canonical Ltd
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -30,18 +30,24 @@
         self.key_file = key_file
         self.cert_file = cert_file
 
-    def get_request (self):
-        """Get the request and client address from the socket.
-
-        This is called in response to a connection issued to the server, we
-        wrap the socket with SSL.
+    def _get_ssl_request (self, sock, addr):
+        """Wrap the socket with SSL"""
+        ssl_sock = ssl.wrap_socket(sock, server_side=True,
+                                   keyfile=self.key_file,
+                                   certfile=self.cert_file,
+                                   do_handshake_on_connect=False)
+        return ssl_sock, addr
+
+    def verify_request(self, request, client_address):
+        """Verify the request.
+
+        Return True if we should proceed with this request, False if we should
+        not even touch a single byte in the socket !
         """
-        sock, addr = self.socket.accept()
-        sslconn = ssl.wrap_socket(sock, server_side=True,
-                                  keyfile=self.key_file,
-                                  certfile=self.cert_file)
-        return sslconn, addr
-
+        serving = self.serving is not None and self.serving.isSet()
+        if serving:
+            request.do_handshake()
+        return serving
 
 class TestingHTTPSServer(TestingHTTPSServerMixin,
                          http_server.TestingHTTPServer):
@@ -52,6 +58,10 @@
         http_server.TestingHTTPServer.__init__(
             self, server_address, request_handler_class, test_case_server)
 
+    def _get_request (self):
+        sock, addr = http_server.TestingHTTPServer._get_request(self)
+        return self._get_ssl_request(sock, addr)
+
 
 class TestingThreadingHTTPSServer(TestingHTTPSServerMixin,
                                   http_server.TestingThreadingHTTPServer):
@@ -62,6 +72,10 @@
         http_server.TestingThreadingHTTPServer.__init__(
             self, server_address, request_handler_class, test_case_server)
 
+    def _get_request (self):
+        sock, addr = http_server.TestingThreadingHTTPServer._get_request(self)
+        return self._get_ssl_request(sock, addr)
+
 
 class HTTPSServer(http_server.HttpServer):
 
@@ -73,7 +87,7 @@
                          }
 
     # Provides usable defaults since an https server requires both a
-    # private key and certificate to work.
+    # private key and a certificate to work.
     def __init__(self, request_handler=http_server.TestingHTTPRequestHandler,
                  protocol_version=None,
                  key_file=ssl_certs.build_path('server_without_pass.key'),

=== modified file 'bzrlib/tests/stub_sftp.py'
--- a/bzrlib/tests/stub_sftp.py	2010-05-14 09:34:16 +0000
+++ b/bzrlib/tests/stub_sftp.py	2010-05-25 12:47:13 +0000
@@ -527,7 +527,7 @@
                 raise
         except Exception, e:
             # This typically seems to happen during interpreter shutdown, so
-            # most of the useful ways to report this error are won't work.
+            # most of the useful ways to report this error won't work.
             # Writing the exception type, and then the text of the exception,
             # seems to be the best we can do.
             import sys

=== modified file 'bzrlib/tests/test_http.py'
--- a/bzrlib/tests/test_http.py	2010-03-18 23:11:15 +0000
+++ b/bzrlib/tests/test_http.py	2010-05-24 01:05:14 +0000
@@ -226,35 +226,62 @@
         self._thread = threading.Thread(target=self._accept_read_and_reply)
         self._thread.setDaemon(True)
         self._thread.start()
-        self._ready.wait(5)
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Thread started: %s' % (self._thread.ident,)
+        self._ready.wait()
 
     def _accept_read_and_reply(self):
         self._sock.listen(1)
+        self._sock.settimeout(5)
         self._ready.set()
-        self._sock.settimeout(5)
         try:
             conn, address = self._sock.accept()
             # On win32, the accepted connection will be non-blocking to start
             # with because we're using settimeout.
             conn.setblocking(True)
-            while not self.received_bytes.endswith(self._expect_body_tail):
-                self.received_bytes += conn.recv(4096)
-            conn.sendall('HTTP/1.1 200 OK\r\n')
+            if self._expect_body_tail is not None:
+                while not self.received_bytes.endswith(self._expect_body_tail):
+                    self.received_bytes += conn.recv(4096)
+                conn.sendall('HTTP/1.1 200 OK\r\n')
         except socket.timeout:
             # Make sure the client isn't stuck waiting for us to e.g. accept.
+            pass
+        try:
             self._sock.close()
         except socket.error:
             # The client may have already closed the socket.
             pass
 
+    def connect_socket(self):
+        err = socket.error('getaddrinfo returns an empty list')
+        for res in socket.getaddrinfo(self.host, self.port):
+            af, socktype, proto, canonname, sa = res
+            sock = None
+            try:
+                sock = socket.socket(af, socktype, proto)
+                sock.connect(sa)
+                return sock
+
+            except socket.error, err:
+                # err is now the most recent error
+                if sock is not None:
+                    sock.close()
+        raise err
+
     def stop_server(self):
         try:
-            self._sock.close()
+            # Issue a fake connection to wake up the server and allow it to
+            # finish quickly
+            fake_conn = self.connect_socket()
+            fake_conn.close()
         except socket.error:
             # We might have already closed it.  We don't care.
             pass
         self.host = None
         self.port = None
+        self._thread.join()
+        if 'threads' in tests.selftest_debug_flags:
+            print 'Thread  joined: %s' % (self._thread.ident,)
 
 
 class TestAuthHeader(tests.TestCase):
@@ -323,10 +350,11 @@
         server = http_server.HttpServer()
         server.start_server()
         try:
-            self.assertTrue(server._http_running)
+            self.assertTrue(server._httpd is not None)
+            self.assertTrue(server._httpd.serving is not None)
+            self.assertTrue(server._httpd.serving.isSet())
         finally:
             server.stop_server()
-        self.assertFalse(server._http_running)
 
     def test_create_http_server_one_zero(self):
         class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
@@ -600,7 +628,7 @@
 class WallRequestHandler(http_server.TestingHTTPRequestHandler):
     """Whatever request comes in, close the connection"""
 
-    def handle_one_request(self):
+    def _handle_one_request(self):
         """Handle a single HTTP request, by abruptly closing the connection"""
         self.close_connection = 1
 
@@ -1896,7 +1924,7 @@
     line.
     """
 
-    def handle_one_request(self):
+    def _handle_one_request(self):
         tcs = self.server.test_case_server
         requestline = self.rfile.readline()
         headers = self.MessageClass(self.rfile, 0)
@@ -2086,7 +2114,7 @@
 '''
         t = self.get_transport()
         # We must send a single line of body bytes, see
-        # PredefinedRequestHandler.handle_one_request
+        # PredefinedRequestHandler._handle_one_request
         code, f = t._post('abc def end-of-body\n')
         self.assertEqual('lalala whatever as long as itsssss\n', f.read())
         self.assertActivitiesMatch()

=== modified file 'bzrlib/tests/test_server.py'
--- a/bzrlib/tests/test_server.py	2010-02-11 09:21:45 +0000
+++ b/bzrlib/tests/test_server.py	2010-05-24 01:05:14 +0000
@@ -1,4 +1,4 @@
-# Copyright (C) 2005, 2006, 2007, 2008, 2010 Canonical Ltd
+# Copyright (C) 2010 Canonical Ltd
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -14,6 +14,10 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
+import socket
+import select
+
+
 from bzrlib import (
     transport,
     urlutils,
@@ -233,6 +237,9 @@
         super(SmartTCPServer_for_testing, self).__init__(None)
         self.client_path_extra = None
         self.thread_name_suffix = thread_name_suffix
+        # We collect the sockets/threads used by the clients so we can
+        # close/join them when shutting down
+        self.clients = []
 
     def get_backing_transport(self, backing_transport_server):
         """Get a backing transport from a server we are decorating."""
@@ -265,8 +272,41 @@
         self.root_client_path = self.client_path_extra = client_path_extra
         self.start_background_thread(self.thread_name_suffix)
 
+    def serve_conn(self, conn, thread_name_suffix):
+        conn_thread = super(SmartTCPServer_for_testing, self).serve_conn(
+            conn, thread_name_suffix)
+        self.clients.append((conn, conn_thread))
+        return conn_thread
+
+    def shutdown_client(self, client_socket):
+        """Properly shutdown a client socket.
+
+        Under some circumstances (as in bug #383920), we need to force the
+        shutdown as python delays it until gc occur otherwise and the client
+        may hang.
+
+        This should be called only when no other thread is trying to use the
+        socket.
+        """
+        try:
+            # The request process has been completed, the thread is about to
+            # die, let's shutdown the socket if we can.
+            client_socket.shutdown(socket.SHUT_RDWR)
+        except (socket.error, select.error), e:
+            if e[0] in (errno.EBADF, errno.ENOTCONN):
+                # Right, the socket is already down
+                pass
+            else:
+                raise
+
     def stop_server(self):
         self.stop_background_thread()
+        # Let's close all our pending clients too
+        for sock, thread in self.clients:
+            self.shutdown_client(sock)
+            thread.join()
+            del thread
+        self.clients = []
         self.chroot_server.stop_server()
 
     def get_url(self):



More information about the bazaar-commits mailing list