Rev 2384: Overhaul the SmartTCPServer connect-thread logic to synchronise on startup and shutdown and notify the server if it is in accept. in file:///home/robertc/source/baz/hpss-hooks/
Robert Collins
robertc at robertcollins.net
Thu Apr 5 01:39:06 BST 2007
At file:///home/robertc/source/baz/hpss-hooks/
------------------------------------------------------------
revno: 2384
revision-id: robertc at robertcollins.net-20070405003903-u1ys8t2lo5gs6b35
parent: robertc at robertcollins.net-20070404085028-abgpe4zylhsdoaqd
committer: Robert Collins <robertc at robertcollins.net>
branch nick: hpss-hooks
timestamp: Thu 2007-04-05 10:39:03 +1000
message:
Overhaul the SmartTCPServer connect-thread logic to synchronise on startup and shutdown and notify the server if it is in accept.
modified:
bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
bzrlib/transport/smart.py ssh.py-20060608202016-c25gvf1ob7ypbus6-1
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- a/bzrlib/tests/test_smart_transport.py 2007-04-04 05:19:38 +0000
+++ b/bzrlib/tests/test_smart_transport.py 2007-04-05 00:39:03 +0000
@@ -1,4 +1,4 @@
-# Copyright (C) 2006 Canonical Ltd
+# Copyright (C) 2006, 2007 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -822,30 +822,20 @@
class TestServerSocketUsage(SmartTCPTests):
- def test_server_closes_listening_sock_on_shutdown(self):
+ def test_server_setup_teardown(self):
+ """It should be safe to teardown the server with no requests."""
+ self.setUpServer()
+ server = self.server
+ transport = smart.SmartTCPTransport(self.server.get_url())
+ self.tearDownServer()
+ self.assertRaises(errors.ConnectionError, transport.has, '.')
+
+ def test_server_closes_listening_sock_on_shutdown_after_request(self):
"""The server should close its listening socket when it's stopped."""
self.setUpServer()
- # clean up the server and initial transport (which wont have connected):
- # force a connection, which uses the listening socket to synchronise
- # with the server thread, so that when we shut it down it has already
- # executed the 'self._should_terminate = False' line in the server
- # method.
server = self.server
self.transport.has('.')
self.tearDownServer()
- # make a new connection to break out the inner loop in the server.
- transport = smart.SmartTCPTransport(server.get_url())
- # force the connection
- transport.has('.')
- # and close it.
- transport.disconnect()
- # this del probably is not needed, but I wanted to be clear about what
- # we are testing: having objects hanging around is not part of the test.
- del transport
- while server._server_thread.isAlive():
- # this is fugly: we should have an event for the server we can
- # wait for.
- import time; time.sleep(0.001)
# if the listening socket has closed, we should get a BADFD error
# when connecting, rather than a hang.
transport = smart.SmartTCPTransport(server.get_url())
@@ -963,24 +953,15 @@
self.capture_server_call)
self.setUpServer()
result = [(self.backing_transport.base, self.transport.base)]
- # check the stopping message isn't emitted up front, this also
- # has the effect of synchronising with the server, so that
- # when we shut it down it has already executed the
- # 'self._should_terminate = False' line in the server method.
+ # check the stopping message isn't emitted up front.
+ self.assertEqual([], self.hook_calls)
+ # nor after a single message
self.transport.has('.')
self.assertEqual([], self.hook_calls)
# clean up the server
server = self.server
self.tearDownServer()
- # make a new connection to break out the inner loop in the server.
- transport = smart.SmartTCPTransport(result[0][1])
- transport.has('.')
- transport.disconnect()
- del transport
- while server._server_thread.isAlive():
- # this is fugly: we should have an event for the server we can
- # wait for.
- import time; time.sleep(0.001)
+ # now it should have fired.
self.assertEqual(result, self.hook_calls)
# TODO: test that when the server suffers an exception that it calls the
=== modified file 'bzrlib/transport/smart.py'
--- a/bzrlib/transport/smart.py 2007-04-04 08:50:28 +0000
+++ b/bzrlib/transport/smart.py 2007-04-05 00:39:03 +0000
@@ -195,6 +195,7 @@
#
from cStringIO import StringIO
+import errno
import os
import socket
import sys
@@ -835,6 +836,8 @@
:param host: Name of the interface to listen on.
:param port: TCP port to listen on, or 0 to allocate a transient port.
"""
+ from socket import error as socket_error
+ self._socket_error = socket_error
self._server_socket = socket.socket()
self._server_socket.bind((host, port))
self._sockname = self._server_socket.getsockname()
@@ -842,27 +845,34 @@
self._server_socket.listen(1)
self._server_socket.settimeout(1)
self.backing_transport = backing_transport
+ self._started = threading.Event()
+ self._stopped = threading.Event()
def serve(self):
# let connections timeout so that we get a chance to terminate
# Keep a reference to the exceptions we want to catch because the socket
# module's globals get set to None during interpreter shutdown.
from socket import timeout as socket_timeout
- from socket import error as socket_error
self._should_terminate = False
for hook in SmartTCPServer.hooks['server_started']:
hook(self.backing_transport.base, self.get_url())
+ self._started.set()
try:
try:
while not self._should_terminate:
try:
- self.accept_and_serve()
+ conn, client_addr = self._server_socket.accept()
except socket_timeout:
# just check if we're asked to stop
pass
- except socket_error, e:
- trace.warning("client disconnected: %s", e)
- pass
+ except self._socket_error, e:
+ # if the socket is closed by stop_background_thread
+ # we might get a EBADF here, any other socket errors
+ # should get logged.
+ if e.args[0] != errno.EBADF:
+ trace.warning("listening socket error: %s", e)
+ else:
+ self.serve_conn(conn)
except KeyboardInterrupt:
# dont log when CTRL-C'd.
raise
@@ -871,9 +881,11 @@
trace.log_exception_quietly()
raise
finally:
+ self._stopped.set()
try:
+ # ensure the server socket is closed.
self._server_socket.close()
- except socket_error:
+ except self._socket_error:
# ignore errors on close
pass
for hook in SmartTCPServer.hooks['server_stopped']:
@@ -883,8 +895,7 @@
"""Return the url of the server"""
return "bzr://%s:%d/" % self._sockname
- def accept_and_serve(self):
- conn, client_addr = self._server_socket.accept()
+ def serve_conn(self, conn):
# For WIN32, where the timeout value from the listening socket
# propogates to the newly accepted socket.
conn.setblocking(True)
@@ -895,20 +906,38 @@
connection_thread.start()
def start_background_thread(self):
+ self._started.clear()
self._server_thread = threading.Thread(None,
self.serve,
name='server-' + self.get_url())
self._server_thread.setDaemon(True)
self._server_thread.start()
+ self._started.wait()
def stop_background_thread(self):
+ self._stopped.clear()
+ # tell the main loop to quit on the next iteration.
self._should_terminate = True
- # At one point we would wait to join the threads here, but it looks
- # like they don't actually exit. So now we just leave them running
- # and expect to terminate the process. -- mbp 20070215
- # self._server_socket.close()
- ## sys.stderr.write("waiting for server thread to finish...")
- ## self._server_thread.join()
+ # close the socket - gives error to connections from here on in,
+ # rather than a connection reset error to connections made during
+ # the period between setting _should_terminate = True and
+ # the current request completing/aborting. It may also break out the
+ # main loop if it was currently in accept() (on some platforms).
+ try:
+ self._server_socket.close()
+ except self._socket_error:
+ # ignore errors on close
+ pass
+ if not self._stopped.isSet():
+ # server has not stopped (though it may be stopping)
+ # its likely in accept(), so give it a connection
+ temp_socket = socket.socket()
+ temp_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ if not temp_socket.connect_ex(self._sockname):
+ # and close it immediately: we dont choose to send any requests.
+ temp_socket.close()
+ self._stopped.wait()
+ self._server_thread.join()
class SmartServerHooks(Hooks):
More information about the bazaar-commits
mailing list