Rev 2204: Remove more duplicate code from bzrlib/transport/remote.py that has been moved elsewhere. in sftp://bazaar.launchpad.net/%7Ebzr/bzr/hpss/

Andrew Bennetts andrew.bennetts at canonical.com
Thu Apr 5 06:42:14 BST 2007


At sftp://bazaar.launchpad.net/%7Ebzr/bzr/hpss/

------------------------------------------------------------
revno: 2204
revision-id: andrew.bennetts at canonical.com-20070405054113-e41edg4i5mop40nd
parent: andrew.bennetts at canonical.com-20070405044649-6w32o4u1im56i7kr
committer: Andrew Bennetts <andrew.bennetts at canonical.com>
branch nick: hpss
timestamp: Thu 2007-04-05 15:41:13 +1000
message:
  Remove more duplicate code from bzrlib/transport/remote.py that has been moved elsewhere.
modified:
  bzrlib/smart/server.py         server.py-20061110062051-chzu10y32vx8gvur-1
  bzrlib/transport/remote.py     ssh.py-20060608202016-c25gvf1ob7ypbus6-1
=== modified file 'bzrlib/smart/server.py'
--- a/bzrlib/smart/server.py	2007-03-27 09:46:24 +0000
+++ b/bzrlib/smart/server.py	2007-04-05 05:41:13 +0000
@@ -132,11 +132,6 @@
         self.stop_background_thread()
         self.chroot_server.tearDown()
 
-    def get_url(self):
-        """Return the url of the server"""
-        host, port = self._server_socket.getsockname()
-        return "bzr://%s:%d/" % (host, port)
-
     def get_bogus_url(self):
         """Return a URL which will fail to connect"""
         return 'bzr://127.0.0.1:1/'

=== modified file 'bzrlib/transport/remote.py'
--- a/bzrlib/transport/remote.py	2007-04-05 04:46:49 +0000
+++ b/bzrlib/transport/remote.py	2007-04-05 05:41:13 +0000
@@ -23,24 +23,14 @@
 __all__ = ['RemoteTransport', 'SmartTCPTransport', 'SmartSSHTransport']
 
 from cStringIO import StringIO
-import os
-import socket
-import sys
-import tempfile
-import threading
 import urllib
 import urlparse
 
 from bzrlib import (
-    bzrdir,
     errors,
-    revision,
-    trace,
     transport,
-    urlutils,
     )
-from bzrlib.bundle.serializer import write_bundle
-from bzrlib.smart import client, medium, protocol, SmartServerRequestHandler
+from bzrlib.smart import client, medium, protocol
 
 # must do this otherwise urllib can't parse the urls properly :(
 for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
@@ -52,474 +42,6 @@
 BZR_DEFAULT_PORT = 4155
 
 
-def _recv_tuple(from_file):
-    req_line = from_file.readline()
-    return _decode_tuple(req_line)
-
-
-def _decode_tuple(req_line):
-    if req_line == None or req_line == '':
-        return None
-    if req_line[-1] != '\n':
-        raise errors.SmartProtocolError("request %r not terminated" % req_line)
-    return tuple(req_line[:-1].split('\x01'))
-
-
-def _encode_tuple(args):
-    """Encode the tuple args to a bytestream."""
-    return '\x01'.join(args) + '\n'
-
-
-class SmartProtocolBase(object):
-    """Methods common to client and server"""
-
-    # TODO: this only actually accomodates a single block; possibly should
-    # support multiple chunks?
-    def _encode_bulk_data(self, body):
-        """Encode body as a bulk data chunk."""
-        return ''.join(('%d\n' % len(body), body, 'done\n'))
-
-    def _serialise_offsets(self, offsets):
-        """Serialise a readv offset list."""
-        txt = []
-        for start, length in offsets:
-            txt.append('%d,%d' % (start, length))
-        return '\n'.join(txt)
-        
-
-class SmartServerRequestProtocolOne(SmartProtocolBase):
-    """Server-side encoding and decoding logic for smart version 1."""
-    
-    def __init__(self, backing_transport, write_func):
-        self._backing_transport = backing_transport
-        self.excess_buffer = ''
-        self._finished = False
-        self.in_buffer = ''
-        self.has_dispatched = False
-        self.request = None
-        self._body_decoder = None
-        self._write_func = write_func
-
-    def accept_bytes(self, bytes):
-        """Take bytes, and advance the internal state machine appropriately.
-        
-        :param bytes: must be a byte string
-        """
-        assert isinstance(bytes, str)
-        self.in_buffer += bytes
-        if not self.has_dispatched:
-            if '\n' not in self.in_buffer:
-                # no command line yet
-                return
-            self.has_dispatched = True
-            try:
-                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
-                first_line += '\n'
-                req_args = _decode_tuple(first_line)
-                self.request = request.SmartServerRequestHandler(
-                    self._backing_transport)
-                self.request.dispatch_command(req_args[0], req_args[1:])
-                if self.request.finished_reading:
-                    # trivial request
-                    self.excess_buffer = self.in_buffer
-                    self.in_buffer = ''
-                    self._send_response(self.request.response.args,
-                        self.request.response.body)
-            except KeyboardInterrupt:
-                raise
-            except Exception, exception:
-                # everything else: pass to client, flush, and quit
-                self._send_response(('error', str(exception)))
-                return
-
-        if self.has_dispatched:
-            if self._finished:
-                # nothing to do.XXX: this routine should be a single state 
-                # machine too.
-                self.excess_buffer += self.in_buffer
-                self.in_buffer = ''
-                return
-            if self._body_decoder is None:
-                self._body_decoder = LengthPrefixedBodyDecoder()
-            self._body_decoder.accept_bytes(self.in_buffer)
-            self.in_buffer = self._body_decoder.unused_data
-            body_data = self._body_decoder.read_pending_data()
-            self.request.accept_body(body_data)
-            if self._body_decoder.finished_reading:
-                self.request.end_of_body()
-                assert self.request.finished_reading, \
-                    "no more body, request not finished"
-            if self.request.response is not None:
-                self._send_response(self.request.response.args,
-                    self.request.response.body)
-                self.excess_buffer = self.in_buffer
-                self.in_buffer = ''
-            else:
-                assert not self.request.finished_reading, \
-                    "no response and we have finished reading."
-
-    def _send_response(self, args, body=None):
-        """Send a smart server response down the output stream."""
-        assert not self._finished, 'response already sent'
-        self._finished = True
-        self._write_func(_encode_tuple(args))
-        if body is not None:
-            assert isinstance(body, str), 'body must be a str'
-            bytes = self._encode_bulk_data(body)
-            self._write_func(bytes)
-
-    def next_read_size(self):
-        if self._finished:
-            return 0
-        if self._body_decoder is None:
-            return 1
-        else:
-            return self._body_decoder.next_read_size()
-
-
-class LengthPrefixedBodyDecoder(object):
-    """Decodes the length-prefixed bulk data."""
-    
-    def __init__(self):
-        self.bytes_left = None
-        self.finished_reading = False
-        self.unused_data = ''
-        self.state_accept = self._state_accept_expecting_length
-        self.state_read = self._state_read_no_data
-        self._in_buffer = ''
-        self._trailer_buffer = ''
-    
-    def accept_bytes(self, bytes):
-        """Decode as much of bytes as possible.
-
-        If 'bytes' contains too much data it will be appended to
-        self.unused_data.
-
-        finished_reading will be set when no more data is required.  Further
-        data will be appended to self.unused_data.
-        """
-        # accept_bytes is allowed to change the state
-        current_state = self.state_accept
-        self.state_accept(bytes)
-        while current_state != self.state_accept:
-            current_state = self.state_accept
-            self.state_accept('')
-
-    def next_read_size(self):
-        if self.bytes_left is not None:
-            # Ideally we want to read all the remainder of the body and the
-            # trailer in one go.
-            return self.bytes_left + 5
-        elif self.state_accept == self._state_accept_reading_trailer:
-            # Just the trailer left
-            return 5 - len(self._trailer_buffer)
-        elif self.state_accept == self._state_accept_expecting_length:
-            # There's still at least 6 bytes left ('\n' to end the length, plus
-            # 'done\n').
-            return 6
-        else:
-            # Reading excess data.  Either way, 1 byte at a time is fine.
-            return 1
-        
-    def read_pending_data(self):
-        """Return any pending data that has been decoded."""
-        return self.state_read()
-
-    def _state_accept_expecting_length(self, bytes):
-        self._in_buffer += bytes
-        pos = self._in_buffer.find('\n')
-        if pos == -1:
-            return
-        self.bytes_left = int(self._in_buffer[:pos])
-        self._in_buffer = self._in_buffer[pos+1:]
-        self.bytes_left -= len(self._in_buffer)
-        self.state_accept = self._state_accept_reading_body
-        self.state_read = self._state_read_in_buffer
-
-    def _state_accept_reading_body(self, bytes):
-        self._in_buffer += bytes
-        self.bytes_left -= len(bytes)
-        if self.bytes_left <= 0:
-            # Finished with body
-            if self.bytes_left != 0:
-                self._trailer_buffer = self._in_buffer[self.bytes_left:]
-                self._in_buffer = self._in_buffer[:self.bytes_left]
-            self.bytes_left = None
-            self.state_accept = self._state_accept_reading_trailer
-        
-    def _state_accept_reading_trailer(self, bytes):
-        self._trailer_buffer += bytes
-        # TODO: what if the trailer does not match "done\n"?  Should this raise
-        # a ProtocolViolation exception?
-        if self._trailer_buffer.startswith('done\n'):
-            self.unused_data = self._trailer_buffer[len('done\n'):]
-            self.state_accept = self._state_accept_reading_unused
-            self.finished_reading = True
-    
-    def _state_accept_reading_unused(self, bytes):
-        self.unused_data += bytes
-
-    def _state_read_no_data(self):
-        return ''
-
-    def _state_read_in_buffer(self):
-        result = self._in_buffer
-        self._in_buffer = ''
-        return result
-
-
-class SmartServerStreamMedium(object):
-    """Handles smart commands coming over a stream.
-
-    The stream may be a pipe connected to sshd, or a tcp socket, or an
-    in-process fifo for testing.
-
-    One instance is created for each connected client; it can serve multiple
-    requests in the lifetime of the connection.
-
-    The server passes requests through to an underlying backing transport, 
-    which will typically be a LocalTransport looking at the server's filesystem.
-    """
-
-    def __init__(self, backing_transport):
-        """Construct new server.
-
-        :param backing_transport: Transport for the directory served.
-        """
-        # backing_transport could be passed to serve instead of __init__
-        self.backing_transport = backing_transport
-        self.finished = False
-
-    def serve(self):
-        """Serve requests until the client disconnects."""
-        # Keep a reference to stderr because the sys module's globals get set to
-        # None during interpreter shutdown.
-        from sys import stderr
-        try:
-            while not self.finished:
-                protocol = SmartServerRequestProtocolOne(self.backing_transport,
-                                                         self._write_out)
-                self._serve_one_request(protocol)
-        except Exception, e:
-            stderr.write("%s terminating on exception %s\n" % (self, e))
-            raise
-
-    def _serve_one_request(self, protocol):
-        """Read one request from input, process, send back a response.
-        
-        :param protocol: a SmartServerRequestProtocol.
-        """
-        try:
-            self._serve_one_request_unguarded(protocol)
-        except KeyboardInterrupt:
-            raise
-        except Exception, e:
-            self.terminate_due_to_error()
-
-    def terminate_due_to_error(self):
-        """Called when an unhandled exception from the protocol occurs."""
-        raise NotImplementedError(self.terminate_due_to_error)
-
-
-class SmartServerSocketStreamMedium(SmartServerStreamMedium):
-
-    def __init__(self, sock, backing_transport):
-        """Constructor.
-
-        :param sock: the socket the server will read from.  It will be put
-            into blocking mode.
-        """
-        SmartServerStreamMedium.__init__(self, backing_transport)
-        self.push_back = ''
-        sock.setblocking(True)
-        self.socket = sock
-
-    def _serve_one_request_unguarded(self, protocol):
-        while protocol.next_read_size():
-            if self.push_back:
-                protocol.accept_bytes(self.push_back)
-                self.push_back = ''
-            else:
-                bytes = self.socket.recv(4096)
-                if bytes == '':
-                    self.finished = True
-                    return
-                protocol.accept_bytes(bytes)
-        
-        self.push_back = protocol.excess_buffer
-    
-    def terminate_due_to_error(self):
-        """Called when an unhandled exception from the protocol occurs."""
-        # TODO: This should log to a server log file, but no such thing
-        # exists yet.  Andrew Bennetts 2006-09-29.
-        self.socket.close()
-        self.finished = True
-
-    def _write_out(self, bytes):
-        self.socket.sendall(bytes)
-
-
-class SmartServerPipeStreamMedium(SmartServerStreamMedium):
-
-    def __init__(self, in_file, out_file, backing_transport):
-        """Construct new server.
-
-        :param in_file: Python file from which requests can be read.
-        :param out_file: Python file to write responses.
-        :param backing_transport: Transport for the directory served.
-        """
-        SmartServerStreamMedium.__init__(self, backing_transport)
-        if sys.platform == 'win32':
-            # force binary mode for files
-            import msvcrt
-            for f in (in_file, out_file):
-                fileno = getattr(f, 'fileno', None)
-                if fileno:
-                    msvcrt.setmode(fileno(), os.O_BINARY)
-        self._in = in_file
-        self._out = out_file
-
-    def _serve_one_request_unguarded(self, protocol):
-        while True:
-            bytes_to_read = protocol.next_read_size()
-            if bytes_to_read == 0:
-                # Finished serving this request.
-                self._out.flush()
-                return
-            bytes = self._in.read(bytes_to_read)
-            if bytes == '':
-                # Connection has been closed.
-                self.finished = True
-                self._out.flush()
-                return
-            protocol.accept_bytes(bytes)
-
-    def terminate_due_to_error(self):
-        # TODO: This should log to a server log file, but no such thing
-        # exists yet.  Andrew Bennetts 2006-09-29.
-        self._out.close()
-        self.finished = True
-
-    def _write_out(self, bytes):
-        self._out.write(bytes)
-
-
-class SmartServerResponse(object):
-    """Response generated by SmartServerRequestHandler."""
-
-    def __init__(self, args, body=None):
-        self.args = args
-        self.body = body
-
-
-class SmartTCPServer(object):
-    """Listens on a TCP socket and accepts connections from smart clients"""
-
-    def __init__(self, backing_transport, host='127.0.0.1', port=0):
-        """Construct a new server.
-
-        To actually start it running, call either start_background_thread or
-        serve.
-
-        :param host: Name of the interface to listen on.
-        :param port: TCP port to listen on, or 0 to allocate a transient port.
-        """
-        self._server_socket = socket.socket()
-        self._server_socket.bind((host, port))
-        self.port = self._server_socket.getsockname()[1]
-        self._server_socket.listen(1)
-        self._server_socket.settimeout(1)
-        self.backing_transport = backing_transport
-
-    def serve(self):
-        # let connections timeout so that we get a chance to terminate
-        # Keep a reference to the exceptions we want to catch because the socket
-        # module's globals get set to None during interpreter shutdown.
-        from socket import timeout as socket_timeout
-        from socket import error as socket_error
-        self._should_terminate = False
-        while not self._should_terminate:
-            try:
-                self.accept_and_serve()
-            except socket_timeout:
-                # just check if we're asked to stop
-                pass
-            except socket_error, e:
-                trace.warning("client disconnected: %s", e)
-                pass
-
-    def get_url(self):
-        """Return the url of the server"""
-        return "bzr://%s:%d/" % self._server_socket.getsockname()
-
-    def accept_and_serve(self):
-        conn, client_addr = self._server_socket.accept()
-        # For WIN32, where the timeout value from the listening socket
-        # propogates to the newly accepted socket.
-        conn.setblocking(True)
-        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
-        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
-        connection_thread.setDaemon(True)
-        connection_thread.start()
-
-    def start_background_thread(self):
-        self._server_thread = threading.Thread(None,
-                self.serve,
-                name='server-' + self.get_url())
-        self._server_thread.setDaemon(True)
-        self._server_thread.start()
-
-    def stop_background_thread(self):
-        self._should_terminate = True
-        # At one point we would wait to join the threads here, but it looks
-        # like they don't actually exit.  So now we just leave them running
-        # and expect to terminate the process. -- mbp 20070215
-        # self._server_socket.close()
-        ## sys.stderr.write("waiting for server thread to finish...")
-        ## self._server_thread.join()
-
-
-class SmartTCPServer_for_testing(SmartTCPServer):
-    """Server suitable for use by transport tests.
-    
-    This server is backed by the process's cwd.
-    """
-
-    def __init__(self):
-        self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
-        # The server is set up by default like for ssh access: the client
-        # passes filesystem-absolute paths; therefore the server must look
-        # them up relative to the root directory.  it might be better to act
-        # a public server and have the server rewrite paths into the test
-        # directory.
-        SmartTCPServer.__init__(self,
-            transport.get_transport(urlutils.local_path_to_url('/')))
-        
-    def get_backing_transport(self, backing_transport_server):
-        """Get a backing transport from a server we are decorating."""
-        return transport.get_transport(backing_transport_server.get_url())
-
-    def setUp(self, backing_transport_server=None):
-        """Set up server for testing"""
-        from bzrlib.transport.chroot import TestingChrootServer
-        if backing_transport_server is None:
-            from bzrlib.transport.local import LocalURLServer
-            backing_transport_server = LocalURLServer()
-        self.chroot_server = TestingChrootServer()
-        self.chroot_server.setUp(backing_transport_server)
-        self.backing_transport = transport.get_transport(
-            self.chroot_server.get_url())
-        self.start_background_thread()
-
-    def tearDown(self):
-        self.stop_background_thread()
-
-    def get_bogus_url(self):
-        """Return a URL which will fail to connect"""
-        return 'bzr://127.0.0.1:1/'
-
-
 class _SmartStat(object):
 
     def __init__(self, size, mode):




More information about the bazaar-commits mailing list