Rev 5271: Replace SocketListener by TestingTCPServerInAThread and fallouts, in file:///home/vila/src/bzr/experimental/leaking-tests/

Vincent Ladeuil v.ladeuil+lp at free.fr
Wed Jun 23 09:17:20 BST 2010


At file:///home/vila/src/bzr/experimental/leaking-tests/

------------------------------------------------------------
revno: 5271
revision-id: v.ladeuil+lp at free.fr-20100623081720-sfej05j25aopdpvy
parent: v.ladeuil+lp at free.fr-20100620111905-nrrk5qtd1i8lksza
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: sftp-leaks
timestamp: Wed 2010-06-23 10:17:20 +0200
message:
  Replace SocketListener by TestingTCPServerInAThread and fallouts,
  only 1 test still failing, relevant tests run now in 15s instead
  of 45s.
  
  * bzrlib/tests/stub_sftp.py:
  (StubServer): It's the test case server we're accessing here not
  the test case itself.
  (TestingSFTPConnectionHandler)
  (TestingSFTPWithoutSSHConnectionHandler, TestingSFTPServer): New
  helpers classes to split the previous implementation.
  (SFTPServer): Reimplemnted based on TestingTCPServerInAThread.
-------------- next part --------------
=== modified file 'bzrlib/tests/stub_sftp.py'
--- a/bzrlib/tests/stub_sftp.py	2010-06-17 09:23:27 +0000
+++ b/bzrlib/tests/stub_sftp.py	2010-06-23 08:17:20 +0000
@@ -23,6 +23,7 @@
 import paramiko
 import select
 import socket
+import SocketServer
 import sys
 import threading
 import time
@@ -38,24 +39,24 @@
 from bzrlib.tests import test_server
 
 
-class StubServer (paramiko.ServerInterface):
+class StubServer(paramiko.ServerInterface):
 
-    def __init__(self, test_case):
+    def __init__(self, test_case_server):
         paramiko.ServerInterface.__init__(self)
-        self._test_case = test_case
+        self.log = test_case_server.log
 
     def check_auth_password(self, username, password):
         # all are allowed
-        self._test_case.log('sftpserver - authorizing: %s' % (username,))
+        self.log('sftpserver - authorizing: %s' % (username,))
         return paramiko.AUTH_SUCCESSFUL
 
     def check_channel_request(self, kind, chanid):
-        self._test_case.log(
-            'sftpserver - channel request: %s, %s' % (kind, chanid))
+        self.log('sftpserver - channel request: %s, %s' % (kind, chanid))
         return paramiko.OPEN_SUCCEEDED
 
 
-class StubSFTPHandle (paramiko.SFTPHandle):
+class StubSFTPHandle(paramiko.SFTPHandle):
+
     def stat(self):
         try:
             return paramiko.SFTPAttributes.from_stat(
@@ -73,7 +74,7 @@
             return paramiko.SFTPServer.convert_errno(e.errno)
 
 
-class StubSFTPServer (paramiko.SFTPServerInterface):
+class StubSFTPServer(paramiko.SFTPServerInterface):
 
     def __init__(self, server, root, home=None):
         paramiko.SFTPServerInterface.__init__(self, server)
@@ -90,7 +91,7 @@
             self.home = home[len(self.root):]
         if self.home.startswith('/'):
             self.home = self.home[1:]
-        server._test_case.log('sftpserver - new connection')
+        server.log('sftpserver - new connection')
 
     def _realpath(self, path):
         # paths returned from self.canonicalize() always start with
@@ -241,6 +242,7 @@
     # removed: chattr, symlink, readlink
     # (nothing in bzr's sftp transport uses those)
 
+
 # ------------- server test implementation --------------
 
 STUB_SERVER_KEY = """
@@ -262,56 +264,6 @@
 """
 
 
-class SocketListener(threading.Thread):
-
-    def __init__(self, connection_callback):
-        threading.Thread.__init__(self)
-        self._connection_callback = connection_callback
-        self._socket = socket.socket()
-        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        self._socket.bind(('localhost', 0))
-        self._socket.listen(1)
-        self.host, self.port = self._socket.getsockname()[:2]
-        self._stop_event = threading.Event()
-
-    def stop(self):
-        # called from outside this thread
-        self._stop_event.set()
-        # use a timeout here, because if the test fails, the server thread may
-        # never notice the stop_event.
-        self.join(5.0)
-        self._socket.close()
-
-    def run(self):
-        trace.mutter('SocketListener %r has started', self)
-        while True:
-            readable, writable_unused, exception_unused = \
-                select.select([self._socket], [], [], 0.1)
-            if self._stop_event.isSet():
-                trace.mutter('SocketListener %r has stopped', self)
-                return
-            if len(readable) == 0:
-                continue
-            try:
-                s, addr_unused = self._socket.accept()
-                trace.mutter('SocketListener %r has accepted connection %r',
-                    self, s)
-                # because the loopback socket is inline, and transports are
-                # never explicitly closed, best to launch a new thread.
-                threading.Thread(target=self._connection_callback,
-                                 args=(s,)).start()
-            except socket.error, x:
-                sys.excepthook(*sys.exc_info())
-                trace.warning('Socket error during accept() '
-                              'within unit test server thread: %r' % x)
-            except Exception, x:
-                # probably a failed test; unit test thread will log the
-                # failure/error
-                sys.excepthook(*sys.exc_info())
-                trace.warning(
-                    'Exception from within unit test server thread: %r' % x)
-
-
 class SocketDelay(object):
     """A socket decorator to make TCP appear slower.
 
@@ -387,58 +339,119 @@
         return bytes_sent
 
 
-class SFTPServer(test_server.TestServer):
+class TestingSFTPConnectionHandler(SocketServer.BaseRequestHandler):
+
+    def setup(self):
+        self.wrap_for_latency()
+        tcs = self.server.test_case_server
+        ssh_server = paramiko.Transport(self.request)
+        # FIXME: The server key file should be created only once, not for each
+        # connection -- vila 20100623
+        key_file = osutils.pathjoin(tcs._homedir, 'test_rsa.key')
+        f = open(key_file, 'w')
+        f.write(STUB_SERVER_KEY)
+        f.close()
+        host_key = paramiko.RSAKey.from_private_key_file(key_file)
+        ssh_server.add_server_key(host_key)
+        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
+                                         StubSFTPServer, root=tcs._root,
+                                         home=tcs._server_homedir)
+        server = tcs._server_interface(tcs)
+        ssh_server.start_server(None, server)
+
+    def wrap_for_latency(self):
+        tcs = self.server.test_case_server
+        if tcs.add_latency:
+            # Give the socket (which the request really is) a latency adding
+            # decorator.
+            self.request = SocketDelay(self.request, tcs.add_latency)
+
+
+class TestingSFTPWithoutSSHConnectionHandler(TestingSFTPConnectionHandler):
+
+    def setup(self):
+        self.wrap_for_latency()
+        # Re-import these as locals, so that they're still accessible during
+        # interpreter shutdown (when all module globals get set to None, leading
+        # to confusing errors like "'NoneType' object has no attribute 'error'".
+        class FakeChannel(object):
+            def get_transport(self):
+                return self
+            def get_log_channel(self):
+                return 'paramiko'
+            def get_name(self):
+                return '1'
+            def get_hexdump(self):
+                return False
+            def close(self):
+                pass
+
+        tcs = self.server.test_case_server
+        server = paramiko.SFTPServer(
+            FakeChannel(), 'sftp', StubServer(tcs), StubSFTPServer,
+            root=tcs._root, home=tcs._server_homedir)
+        try:
+            server.start_subsystem(
+                'sftp', None, ssh.SocketAsChannelAdapter(self.request))
+        except socket.error, e:
+            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
+                # it's okay for the client to disconnect abruptly
+                # (bug in paramiko 1.6: it should absorb this exception)
+                pass
+            else:
+                raise
+        except Exception, e:
+            # This typically seems to happen during interpreter shutdown, so
+            # most of the useful ways to report this error won't work.
+            # Writing the exception type, and then the text of the exception,
+            # seems to be the best we can do.
+            # FIXME: All interpreter shutdown errors should have been related
+            # to daemon threads, cleanup needed -- vila 20100623
+            import sys
+            sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
+            sys.stderr.write('%s\n\n' % (e,))
+        server.finish_subsystem()
+
+
+class TestingSFTPServer(test_server.TestingThreadingTCPServer):
+
+    def __init__(self, server_address, request_handler_class, test_case_server):
+        test_server.TestingThreadingTCPServer.__init__(
+            self, server_address, request_handler_class)
+        self.test_case_server = test_case_server
+
+
+class SFTPServer(test_server.TestingTCPServerInAThread):
     """Common code for SFTP server facilities."""
 
     def __init__(self, server_interface=StubServer):
+        self.host = '127.0.0.1'
+        self.port = 0
+        super(SFTPServer, self).__init__((self.host, self.port),
+                                         TestingSFTPServer,
+                                         TestingSFTPConnectionHandler)
         self._original_vendor = None
-        self._homedir = None
-        self._server_homedir = None
-        self._listener = None
-        self._root = None
         self._vendor = ssh.ParamikoVendor()
         self._server_interface = server_interface
-        # sftp server logs
         self.logs = []
         self.add_latency = 0
+        self._homedir = None
+        self._server_homedir = None
+        self._root = None
 
     def _get_sftp_url(self, path):
         """Calculate an sftp url to this server for path."""
-        return 'sftp://foo:bar@%s:%d/%s' % (self._listener.host,
-                                            self._listener.port, path)
+        return "sftp://foo:bar@%s:%s/%s" % (self.host, self.port, path)
 
     def log(self, message):
         """StubServer uses this to log when a new server is created."""
         self.logs.append(message)
 
-    def _run_server_entry(self, sock):
-        """Entry point for all implementations of _run_server.
-
-        If self.add_latency is > 0.000001 then sock is given a latency adding
-        decorator.
-        """
-        if self.add_latency > 0.000001:
-            # FIXME: We appear to use SocketDelay for the server socket only, I
-            # don't think that gets magically propagated to client
-            # sockets... -- vila 20100526
-            sock = SocketDelay(sock, self.add_latency)
-        return self._run_server(sock)
-
-    def _run_server(self, s):
-        ssh_server = paramiko.Transport(s)
-        key_file = osutils.pathjoin(self._homedir, 'test_rsa.key')
-        f = open(key_file, 'w')
-        f.write(STUB_SERVER_KEY)
-        f.close()
-        host_key = paramiko.RSAKey.from_private_key_file(key_file)
-        ssh_server.add_server_key(host_key)
-        server = self._server_interface(self)
-        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
-                                         StubSFTPServer, root=self._root,
-                                         home=self._server_homedir)
-        ready = threading.Event()
-        ssh_server.start_server(ready, server)
-        ready.wait(5.0)
+    def create_server(self):
+        server = self.server_class((self.host, self.port),
+                                   self.request_handler_class,
+                                   self)
+        return server
 
     def start_server(self, backing_server=None):
         # XXX: TODO: make sftpserver back onto backing_server rather than local
@@ -463,17 +476,17 @@
         self._root = '/'
         if sys.platform == 'win32':
             self._root = ''
-        self._listener = SocketListener(self._run_server_entry)
-        self._listener.setDaemon(True)
-        self._listener.start()
+        super(SFTPServer, self).start_server()
 
     def stop_server(self):
-        self._listener.stop()
-        ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
+        try:
+            super(SFTPServer, self).stop_server()
+        finally:
+            ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
 
     def get_bogus_url(self):
         """See bzrlib.transport.Server.get_bogus_url."""
-        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
+        # this is chosen to try to prevent trouble with proxies, weird dns, etc
         # we bind a random socket, so that we get a guaranteed unused port
         # we just never listen on that port
         s = socket.socket()
@@ -499,45 +512,7 @@
     def __init__(self):
         super(SFTPServerWithoutSSH, self).__init__()
         self._vendor = ssh.LoopbackVendor()
-
-    def _run_server(self, sock):
-        # Re-import these as locals, so that they're still accessible during
-        # interpreter shutdown (when all module globals get set to None, leading
-        # to confusing errors like "'NoneType' object has no attribute 'error'".
-        class FakeChannel(object):
-            def get_transport(self):
-                return self
-            def get_log_channel(self):
-                return 'paramiko'
-            def get_name(self):
-                return '1'
-            def get_hexdump(self):
-                return False
-            def close(self):
-                pass
-
-        server = paramiko.SFTPServer(
-            FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
-            root=self._root, home=self._server_homedir)
-        try:
-            server.start_subsystem(
-                'sftp', None, ssh.SocketAsChannelAdapter(sock))
-        except socket.error, e:
-            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
-                # it's okay for the client to disconnect abruptly
-                # (bug in paramiko 1.6: it should absorb this exception)
-                pass
-            else:
-                raise
-        except Exception, e:
-            # This typically seems to happen during interpreter shutdown, so
-            # most of the useful ways to report this error won't work.
-            # Writing the exception type, and then the text of the exception,
-            # seems to be the best we can do.
-            import sys
-            sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
-            sys.stderr.write('%s\n\n' % (e,))
-        server.finish_subsystem()
+        self.request_handler_class = TestingSFTPWithoutSSHConnectionHandler
 
 
 class SFTPAbsoluteServer(SFTPServerWithoutSSH):
@@ -566,7 +541,9 @@
     It does this by serving from a deeply-nested directory that doesn't exist.
     """
 
-    def start_server(self, backing_server=None):
-        self._server_homedir = '/dev/noone/runs/tests/here'
-        super(SFTPSiblingAbsoluteServer, self).start_server(backing_server)
+    def create_server(self):
+        # FIXME: Can't we do that in a cleaner way ? -- vila 20100623
+        server = super(SFTPSiblingAbsoluteServer, self).create_server()
+        server._server_homedir = '/dev/noone/runs/tests/here'
+        return server
 

=== modified file 'bzrlib/tests/test_sftp_transport.py'
--- a/bzrlib/tests/test_sftp_transport.py	2010-06-20 11:18:38 +0000
+++ b/bzrlib/tests/test_sftp_transport.py	2010-06-23 08:17:20 +0000
@@ -504,7 +504,7 @@
     """Test that AuthenticationConfig can supply default usernames."""
 
     def get_transport_for_connection(self, set_config):
-        port = self.get_server()._listener.port
+        port = self.get_server().port
         if set_config:
             conf = config.AuthenticationConfig()
             conf._get_config().update(



More information about the bazaar-commits mailing list