Rev 5401: probably broken 9-ways, however lots of debugging code. in http://bazaar.launchpad.net/~jameinel/bzr/2.3-hackish-bzr-connect

John Arbash Meinel john at arbash-meinel.com
Wed Sep 1 19:07:35 BST 2010


At http://bazaar.launchpad.net/~jameinel/bzr/2.3-hackish-bzr-connect

------------------------------------------------------------
revno: 5401
revision-id: john at arbash-meinel.com-20100901180726-fbv8sz2c1osty518
parent: v.ladeuil+lp at free.fr-20100831141226-1vwk0zw410ti6dkn
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.3-hackish-bzr-connect
timestamp: Wed 2010-09-01 13:07:26 -0500
message:
  probably broken 9-ways, however lots of debugging code.
-------------- next part --------------
=== modified file 'bzrlib/tests/stub_sftp.py'
--- a/bzrlib/tests/stub_sftp.py	2010-08-31 14:12:26 +0000
+++ b/bzrlib/tests/stub_sftp.py	2010-09-01 18:07:26 +0000
@@ -19,6 +19,7 @@
 Adapted from the one in paramiko's unit tests.
 """
 
+import logging
 import os
 import paramiko
 import select
@@ -339,27 +340,47 @@
         return bytes_sent
 
 
+class TransportWithStart(paramiko.Transport):
+    # Could we do something with multiple inheritance w/ ThreadWithException?
+
+    def __init__(self, sock, started_event):
+        super(TransportWithStart, self).__init__(sock)
+        self.started = started_event
+
+    def run(self):
+        self.started.set()
+        trace.mutter('%s started' % (self.name,))
+        return super(TransportWithStart, self).run()
+
+
 class TestingSFTPConnectionHandler(SocketServer.BaseRequestHandler):
 
     def setup(self):
         self.wrap_for_latency()
         tcs = self.server.test_case_server
-        ssh_server = paramiko.Transport(self.request)
+        started = threading.Event()
+        ssh_server = TransportWithStart(self.request, started)
         ssh_server.add_server_key(tcs.get_host_key())
         ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
                                          StubSFTPServer, root=tcs._root,
                                          home=tcs._server_homedir)
         server = tcs._server_interface(tcs)
-        ssh_server.start_server(None, server)
-        # FIXME: Long story short:
-        # bt.test_transport.TestSSHConnections.test_bzr_connect_to_bzr_ssh
-        # fails if we wait less than 0.2 seconds... paramiko uses a lot of
-        # timeouts internally which probably mask a synchronisation
-        # problem. Note that this is the only test that requires this hack and
-        # the test may need to be fixed instead, but it's late and the test is
-        # horrible as mentioned in its comments :) -- vila 20100623
-        import time
-        time.sleep(0.5)
+        trace.mutter('%s start_server: %s' % (ssh_server.name,
+            hex(long(id(ssh_server)))))
+        # This blocks until the connection is closed
+        completion_event = threading.Event()
+        ssh_server.start_server(completion_event, server)
+        # time.sleep(5.0)
+        completion_event.wait(5.0)
+        trace.mutter('%s: Initial completion finished'
+                     % (ssh_server.name,))
+        completion_event.clear()
+        completion_event.wait(5.0)
+        trace.mutter('%s stopped: active: %s completed: %s'
+                     % (ssh_server.name, ssh_server.active,
+                        ssh_server.completion_event.isSet()))
+        # The first run through,
+
 
     def wrap_for_latency(self):
         tcs = self.server.test_case_server
@@ -507,6 +528,32 @@
         s.bind(('localhost', 0))
         return 'sftp://%s:%s/' % s.getsockname()
 
+    def process_request(self, request, client_address):
+        """Start a new thread to process the request."""
+        started = threading.Event()
+        stopped = threading.Event()
+        t = ThreadWithException(
+            event=stopped,
+            name='%s -> %s' % (client_address, self.server_address),
+            target = self.process_request_thread,
+            args = (started, stopped, request, client_address))
+        trace.mutter("%s: processing_request_in_thread: %s"
+                     % (self.__class__.__name__, t.name))
+        # Update the client description
+        self.clients.pop()
+        self.clients.append((request, client_address, t))
+        # Propagate the exception handler since we must use the same one for
+        # connections running in their own threads than TestingTCPServer.
+        t.set_ignored_exceptions(self.ignored_exceptions)
+        t.start()
+        started.wait()
+        trace.mutter("%s: %s, started"
+                     % (self.__class__.__name__, t.name))
+        if debug_threads():
+            sys.stderr.write('Client thread %s started\n' % (t.name,))
+        # If an exception occured during the thread start, it will get raised.
+        t.pending_exception()
+
 
 class SFTPFullAbsoluteServer(SFTPServer):
     """A test server for sftp transports, using absolute urls and ssh."""

=== modified file 'bzrlib/tests/test_server.py'
--- a/bzrlib/tests/test_server.py	2010-08-24 16:07:33 +0000
+++ b/bzrlib/tests/test_server.py	2010-09-01 18:07:26 +0000
@@ -24,6 +24,7 @@
 
 from bzrlib import (
     osutils,
+    trace,
     transport,
     urlutils,
     )
@@ -358,9 +359,11 @@
     separate thread.
     """
 
+    request_queue_size = 1
+
     def __init__(self):
         self.started = threading.Event()
-        self.serving = None
+        self.should_shutdown = threading.Event()
         self.stopped = threading.Event()
         # We collect the resources used by the clients so we can release them
         # when shutting down
@@ -372,16 +375,21 @@
         self.server_address = self.socket.getsockname()
 
     def serve(self):
-        self.serving = True
+        self.should_shutdown.clear()
         self.stopped.clear()
         # We are listening and ready to accept connections
         self.started.set()
         try:
-            while self.serving:
+            while not self.should_shutdown.isSet():
+                trace.mutter("%s: serving" % (self.__class__.__name__))
                 # Really a connection but the python framework is generic and
                 # call them requests
-                self.handle_request()
+                readable, _, _ = select.select([self.socket], [], [], 0.1)
+                if readable:
+                    trace.mutter("%s: readable" % (self.__class__.__name__))
+                    self.handle_request()
             # Let's close the listening socket
+            trace.mutter("%s: closing" % (self.__class__.__name__))
             self.server_close()
         finally:
             self.stopped.set()
@@ -393,6 +401,8 @@
         timeout, so we override it to better control the server behavior.
         """
         request, client_address = self.get_request()
+        trace.mutter("%s: got_request: %s" % (self.__class__.__name__,
+                                              client_address))
         if self.verify_request(request, client_address):
             try:
                 self.process_request(request, client_address)
@@ -410,11 +420,11 @@
         not even touch a single byte in the socket ! This is useful when we
         stop the server with a dummy last connection.
         """
-        return self.serving
+        return not self.should_shutdown.isSet()
 
     def handle_error(self, request, client_address):
         # Stop serving and re-raise the last exception seen
-        self.serving = False
+        self.should_shutdown.set()
         # The following can be used for debugging purposes, it will display the
         # exception and the traceback just when it occurs instead of waiting
         # for the thread to be joined.
@@ -515,10 +525,12 @@
         started = threading.Event()
         stopped = threading.Event()
         t = ThreadWithException(
-            event=stopped,
+            event=started,
             name='%s -> %s' % (client_address, self.server_address),
             target = self.process_request_thread,
             args = (started, stopped, request, client_address))
+        trace.mutter("%s: processing_request_in_thread: %s"
+                     % (self.__class__.__name__, t.name))
         # Update the client description
         self.clients.pop()
         self.clients.append((request, client_address, t))
@@ -527,10 +539,13 @@
         t.set_ignored_exceptions(self.ignored_exceptions)
         t.start()
         started.wait()
+        trace.mutter("%s: %s, started"
+                     % (self.__class__.__name__, t.name))
         if debug_threads():
             sys.stderr.write('Client thread %s started\n' % (t.name,))
         # If an exception occured during the thread start, it will get raised.
         t.pending_exception()
+        t.set_ready_event(stopped)
 
     # The following methods are called by the main thread
 
@@ -612,7 +627,7 @@
             # one to get out of the blocking listen.
             self.set_ignored_exceptions(
                 self.server.ignored_exceptions_during_shutdown)
-            self.server.serving = False
+            self.server.should_shutdown.set()
             if debug_threads():
                 sys.stderr.write('Server thread %s will be joined\n'
                                  % (self._server_thread.name,))

=== modified file 'bzrlib/tests/test_transport.py'
--- a/bzrlib/tests/test_transport.py	2010-08-31 14:12:26 +0000
+++ b/bzrlib/tests/test_transport.py	2010-09-01 18:07:26 +0000
@@ -16,6 +16,7 @@
 
 
 from cStringIO import StringIO
+import logging
 import os
 import subprocess
 import sys
@@ -910,6 +911,9 @@
         # override the interface (doesn't change self._vendor).
         # Note that this does encryption, so can be slow.
         from bzrlib.tests import stub_sftp
+        logger = logging.getLogger('paramiko')
+        logger.setLevel(logging.DEBUG)
+        logger.handlers[:] = logging.getLogger('bzr').handlers[:]
 
         # Start an SSH server
         self.command_executed = []
@@ -993,3 +997,4 @@
         # And the rest are threads
         for t in started[1:]:
             t.join()
+        self.fail('failing cause I want the logs.')



More information about the bazaar-commits mailing list