Rev 5401: probably broken 9-ways, however lots of debugging code. in http://bazaar.launchpad.net/~jameinel/bzr/2.3-hackish-bzr-connect
John Arbash Meinel
john at arbash-meinel.com
Wed Sep 1 19:07:35 BST 2010
At http://bazaar.launchpad.net/~jameinel/bzr/2.3-hackish-bzr-connect
------------------------------------------------------------
revno: 5401
revision-id: john at arbash-meinel.com-20100901180726-fbv8sz2c1osty518
parent: v.ladeuil+lp at free.fr-20100831141226-1vwk0zw410ti6dkn
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.3-hackish-bzr-connect
timestamp: Wed 2010-09-01 13:07:26 -0500
message:
probably broken 9-ways, however lots of debugging code.
-------------- next part --------------
=== modified file 'bzrlib/tests/stub_sftp.py'
--- a/bzrlib/tests/stub_sftp.py 2010-08-31 14:12:26 +0000
+++ b/bzrlib/tests/stub_sftp.py 2010-09-01 18:07:26 +0000
@@ -19,6 +19,7 @@
Adapted from the one in paramiko's unit tests.
"""
+import logging
import os
import paramiko
import select
@@ -339,27 +340,47 @@
return bytes_sent
+class TransportWithStart(paramiko.Transport):
+ # Could we do something with multiple inheritance w/ ThreadWithException?
+
+ def __init__(self, sock, started_event):
+ super(TransportWithStart, self).__init__(sock)
+ self.started = started_event
+
+ def run(self):
+ self.started.set()
+ trace.mutter('%s started' % (self.name,))
+ return super(TransportWithStart, self).run()
+
+
class TestingSFTPConnectionHandler(SocketServer.BaseRequestHandler):
def setup(self):
self.wrap_for_latency()
tcs = self.server.test_case_server
- ssh_server = paramiko.Transport(self.request)
+ started = threading.Event()
+ ssh_server = TransportWithStart(self.request, started)
ssh_server.add_server_key(tcs.get_host_key())
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
StubSFTPServer, root=tcs._root,
home=tcs._server_homedir)
server = tcs._server_interface(tcs)
- ssh_server.start_server(None, server)
- # FIXME: Long story short:
- # bt.test_transport.TestSSHConnections.test_bzr_connect_to_bzr_ssh
- # fails if we wait less than 0.2 seconds... paramiko uses a lot of
- # timeouts internally which probably mask a synchronisation
- # problem. Note that this is the only test that requires this hack and
- # the test may need to be fixed instead, but it's late and the test is
- # horrible as mentioned in its comments :) -- vila 20100623
- import time
- time.sleep(0.5)
+ trace.mutter('%s start_server: %s' % (ssh_server.name,
+ hex(long(id(ssh_server)))))
+ # This blocks until the connection is closed
+ completion_event = threading.Event()
+ ssh_server.start_server(completion_event, server)
+ # time.sleep(5.0)
+ completion_event.wait(5.0)
+ trace.mutter('%s: Initial completion finished'
+ % (ssh_server.name,))
+ completion_event.clear()
+ completion_event.wait(5.0)
+ trace.mutter('%s stopped: active: %s completed: %s'
+ % (ssh_server.name, ssh_server.active,
+ ssh_server.completion_event.isSet()))
+ # The first run through,
+
def wrap_for_latency(self):
tcs = self.server.test_case_server
@@ -507,6 +528,32 @@
s.bind(('localhost', 0))
return 'sftp://%s:%s/' % s.getsockname()
+ def process_request(self, request, client_address):
+ """Start a new thread to process the request."""
+ started = threading.Event()
+ stopped = threading.Event()
+ t = ThreadWithException(
+ event=stopped,
+ name='%s -> %s' % (client_address, self.server_address),
+ target = self.process_request_thread,
+ args = (started, stopped, request, client_address))
+ trace.mutter("%s: processing_request_in_thread: %s"
+ % (self.__class__.__name__, t.name))
+ # Update the client description
+ self.clients.pop()
+ self.clients.append((request, client_address, t))
+ # Propagate the exception handler since we must use the same one for
+ # connections running in their own threads than TestingTCPServer.
+ t.set_ignored_exceptions(self.ignored_exceptions)
+ t.start()
+ started.wait()
+ trace.mutter("%s: %s, started"
+ % (self.__class__.__name__, t.name))
+ if debug_threads():
+ sys.stderr.write('Client thread %s started\n' % (t.name,))
+ # If an exception occured during the thread start, it will get raised.
+ t.pending_exception()
+
class SFTPFullAbsoluteServer(SFTPServer):
"""A test server for sftp transports, using absolute urls and ssh."""
=== modified file 'bzrlib/tests/test_server.py'
--- a/bzrlib/tests/test_server.py 2010-08-24 16:07:33 +0000
+++ b/bzrlib/tests/test_server.py 2010-09-01 18:07:26 +0000
@@ -24,6 +24,7 @@
from bzrlib import (
osutils,
+ trace,
transport,
urlutils,
)
@@ -358,9 +359,11 @@
separate thread.
"""
+ request_queue_size = 1
+
def __init__(self):
self.started = threading.Event()
- self.serving = None
+ self.should_shutdown = threading.Event()
self.stopped = threading.Event()
# We collect the resources used by the clients so we can release them
# when shutting down
@@ -372,16 +375,21 @@
self.server_address = self.socket.getsockname()
def serve(self):
- self.serving = True
+ self.should_shutdown.clear()
self.stopped.clear()
# We are listening and ready to accept connections
self.started.set()
try:
- while self.serving:
+ while not self.should_shutdown.isSet():
+ trace.mutter("%s: serving" % (self.__class__.__name__))
# Really a connection but the python framework is generic and
# call them requests
- self.handle_request()
+ readable, _, _ = select.select([self.socket], [], [], 0.1)
+ if readable:
+ trace.mutter("%s: readable" % (self.__class__.__name__))
+ self.handle_request()
# Let's close the listening socket
+ trace.mutter("%s: closing" % (self.__class__.__name__))
self.server_close()
finally:
self.stopped.set()
@@ -393,6 +401,8 @@
timeout, so we override it to better control the server behavior.
"""
request, client_address = self.get_request()
+ trace.mutter("%s: got_request: %s" % (self.__class__.__name__,
+ client_address))
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
@@ -410,11 +420,11 @@
not even touch a single byte in the socket ! This is useful when we
stop the server with a dummy last connection.
"""
- return self.serving
+ return not self.should_shutdown.isSet()
def handle_error(self, request, client_address):
# Stop serving and re-raise the last exception seen
- self.serving = False
+ self.should_shutdown.set()
# The following can be used for debugging purposes, it will display the
# exception and the traceback just when it occurs instead of waiting
# for the thread to be joined.
@@ -515,10 +525,12 @@
started = threading.Event()
stopped = threading.Event()
t = ThreadWithException(
- event=stopped,
+ event=started,
name='%s -> %s' % (client_address, self.server_address),
target = self.process_request_thread,
args = (started, stopped, request, client_address))
+ trace.mutter("%s: processing_request_in_thread: %s"
+ % (self.__class__.__name__, t.name))
# Update the client description
self.clients.pop()
self.clients.append((request, client_address, t))
@@ -527,10 +539,13 @@
t.set_ignored_exceptions(self.ignored_exceptions)
t.start()
started.wait()
+ trace.mutter("%s: %s, started"
+ % (self.__class__.__name__, t.name))
if debug_threads():
sys.stderr.write('Client thread %s started\n' % (t.name,))
# If an exception occured during the thread start, it will get raised.
t.pending_exception()
+ t.set_ready_event(stopped)
# The following methods are called by the main thread
@@ -612,7 +627,7 @@
# one to get out of the blocking listen.
self.set_ignored_exceptions(
self.server.ignored_exceptions_during_shutdown)
- self.server.serving = False
+ self.server.should_shutdown.set()
if debug_threads():
sys.stderr.write('Server thread %s will be joined\n'
% (self._server_thread.name,))
=== modified file 'bzrlib/tests/test_transport.py'
--- a/bzrlib/tests/test_transport.py 2010-08-31 14:12:26 +0000
+++ b/bzrlib/tests/test_transport.py 2010-09-01 18:07:26 +0000
@@ -16,6 +16,7 @@
from cStringIO import StringIO
+import logging
import os
import subprocess
import sys
@@ -910,6 +911,9 @@
# override the interface (doesn't change self._vendor).
# Note that this does encryption, so can be slow.
from bzrlib.tests import stub_sftp
+ logger = logging.getLogger('paramiko')
+ logger.setLevel(logging.DEBUG)
+ logger.handlers[:] = logging.getLogger('bzr').handlers[:]
# Start an SSH server
self.command_executed = []
@@ -993,3 +997,4 @@
# And the rest are threads
for t in started[1:]:
t.join()
+ self.fail('failing cause I want the logs.')
More information about the bazaar-commits
mailing list