Rev 2204: Remove more duplicate code from bzrlib/transport/remote.py that has been moved elsewhere. in sftp://bazaar.launchpad.net/%7Ebzr/bzr/hpss/
Andrew Bennetts
andrew.bennetts at canonical.com
Thu Apr 5 06:42:14 BST 2007
At sftp://bazaar.launchpad.net/%7Ebzr/bzr/hpss/
------------------------------------------------------------
revno: 2204
revision-id: andrew.bennetts at canonical.com-20070405054113-e41edg4i5mop40nd
parent: andrew.bennetts at canonical.com-20070405044649-6w32o4u1im56i7kr
committer: Andrew Bennetts <andrew.bennetts at canonical.com>
branch nick: hpss
timestamp: Thu 2007-04-05 15:41:13 +1000
message:
Remove more duplicate code from bzrlib/transport/remote.py that has been moved elsewhere.
modified:
bzrlib/smart/server.py server.py-20061110062051-chzu10y32vx8gvur-1
bzrlib/transport/remote.py ssh.py-20060608202016-c25gvf1ob7ypbus6-1
=== modified file 'bzrlib/smart/server.py'
--- a/bzrlib/smart/server.py 2007-03-27 09:46:24 +0000
+++ b/bzrlib/smart/server.py 2007-04-05 05:41:13 +0000
@@ -132,11 +132,6 @@
self.stop_background_thread()
self.chroot_server.tearDown()
- def get_url(self):
- """Return the url of the server"""
- host, port = self._server_socket.getsockname()
- return "bzr://%s:%d/" % (host, port)
-
def get_bogus_url(self):
"""Return a URL which will fail to connect"""
return 'bzr://127.0.0.1:1/'
=== modified file 'bzrlib/transport/remote.py'
--- a/bzrlib/transport/remote.py 2007-04-05 04:46:49 +0000
+++ b/bzrlib/transport/remote.py 2007-04-05 05:41:13 +0000
@@ -23,24 +23,14 @@
__all__ = ['RemoteTransport', 'SmartTCPTransport', 'SmartSSHTransport']
from cStringIO import StringIO
-import os
-import socket
-import sys
-import tempfile
-import threading
import urllib
import urlparse
from bzrlib import (
- bzrdir,
errors,
- revision,
- trace,
transport,
- urlutils,
)
-from bzrlib.bundle.serializer import write_bundle
-from bzrlib.smart import client, medium, protocol, SmartServerRequestHandler
+from bzrlib.smart import client, medium, protocol
# must do this otherwise urllib can't parse the urls properly :(
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
@@ -52,474 +42,6 @@
BZR_DEFAULT_PORT = 4155
-def _recv_tuple(from_file):
- req_line = from_file.readline()
- return _decode_tuple(req_line)
-
-
-def _decode_tuple(req_line):
- if req_line == None or req_line == '':
- return None
- if req_line[-1] != '\n':
- raise errors.SmartProtocolError("request %r not terminated" % req_line)
- return tuple(req_line[:-1].split('\x01'))
-
-
-def _encode_tuple(args):
- """Encode the tuple args to a bytestream."""
- return '\x01'.join(args) + '\n'
-
-
-class SmartProtocolBase(object):
- """Methods common to client and server"""
-
- # TODO: this only actually accomodates a single block; possibly should
- # support multiple chunks?
- def _encode_bulk_data(self, body):
- """Encode body as a bulk data chunk."""
- return ''.join(('%d\n' % len(body), body, 'done\n'))
-
- def _serialise_offsets(self, offsets):
- """Serialise a readv offset list."""
- txt = []
- for start, length in offsets:
- txt.append('%d,%d' % (start, length))
- return '\n'.join(txt)
-
-
-class SmartServerRequestProtocolOne(SmartProtocolBase):
- """Server-side encoding and decoding logic for smart version 1."""
-
- def __init__(self, backing_transport, write_func):
- self._backing_transport = backing_transport
- self.excess_buffer = ''
- self._finished = False
- self.in_buffer = ''
- self.has_dispatched = False
- self.request = None
- self._body_decoder = None
- self._write_func = write_func
-
- def accept_bytes(self, bytes):
- """Take bytes, and advance the internal state machine appropriately.
-
- :param bytes: must be a byte string
- """
- assert isinstance(bytes, str)
- self.in_buffer += bytes
- if not self.has_dispatched:
- if '\n' not in self.in_buffer:
- # no command line yet
- return
- self.has_dispatched = True
- try:
- first_line, self.in_buffer = self.in_buffer.split('\n', 1)
- first_line += '\n'
- req_args = _decode_tuple(first_line)
- self.request = request.SmartServerRequestHandler(
- self._backing_transport)
- self.request.dispatch_command(req_args[0], req_args[1:])
- if self.request.finished_reading:
- # trivial request
- self.excess_buffer = self.in_buffer
- self.in_buffer = ''
- self._send_response(self.request.response.args,
- self.request.response.body)
- except KeyboardInterrupt:
- raise
- except Exception, exception:
- # everything else: pass to client, flush, and quit
- self._send_response(('error', str(exception)))
- return
-
- if self.has_dispatched:
- if self._finished:
- # nothing to do.XXX: this routine should be a single state
- # machine too.
- self.excess_buffer += self.in_buffer
- self.in_buffer = ''
- return
- if self._body_decoder is None:
- self._body_decoder = LengthPrefixedBodyDecoder()
- self._body_decoder.accept_bytes(self.in_buffer)
- self.in_buffer = self._body_decoder.unused_data
- body_data = self._body_decoder.read_pending_data()
- self.request.accept_body(body_data)
- if self._body_decoder.finished_reading:
- self.request.end_of_body()
- assert self.request.finished_reading, \
- "no more body, request not finished"
- if self.request.response is not None:
- self._send_response(self.request.response.args,
- self.request.response.body)
- self.excess_buffer = self.in_buffer
- self.in_buffer = ''
- else:
- assert not self.request.finished_reading, \
- "no response and we have finished reading."
-
- def _send_response(self, args, body=None):
- """Send a smart server response down the output stream."""
- assert not self._finished, 'response already sent'
- self._finished = True
- self._write_func(_encode_tuple(args))
- if body is not None:
- assert isinstance(body, str), 'body must be a str'
- bytes = self._encode_bulk_data(body)
- self._write_func(bytes)
-
- def next_read_size(self):
- if self._finished:
- return 0
- if self._body_decoder is None:
- return 1
- else:
- return self._body_decoder.next_read_size()
-
-
-class LengthPrefixedBodyDecoder(object):
- """Decodes the length-prefixed bulk data."""
-
- def __init__(self):
- self.bytes_left = None
- self.finished_reading = False
- self.unused_data = ''
- self.state_accept = self._state_accept_expecting_length
- self.state_read = self._state_read_no_data
- self._in_buffer = ''
- self._trailer_buffer = ''
-
- def accept_bytes(self, bytes):
- """Decode as much of bytes as possible.
-
- If 'bytes' contains too much data it will be appended to
- self.unused_data.
-
- finished_reading will be set when no more data is required. Further
- data will be appended to self.unused_data.
- """
- # accept_bytes is allowed to change the state
- current_state = self.state_accept
- self.state_accept(bytes)
- while current_state != self.state_accept:
- current_state = self.state_accept
- self.state_accept('')
-
- def next_read_size(self):
- if self.bytes_left is not None:
- # Ideally we want to read all the remainder of the body and the
- # trailer in one go.
- return self.bytes_left + 5
- elif self.state_accept == self._state_accept_reading_trailer:
- # Just the trailer left
- return 5 - len(self._trailer_buffer)
- elif self.state_accept == self._state_accept_expecting_length:
- # There's still at least 6 bytes left ('\n' to end the length, plus
- # 'done\n').
- return 6
- else:
- # Reading excess data. Either way, 1 byte at a time is fine.
- return 1
-
- def read_pending_data(self):
- """Return any pending data that has been decoded."""
- return self.state_read()
-
- def _state_accept_expecting_length(self, bytes):
- self._in_buffer += bytes
- pos = self._in_buffer.find('\n')
- if pos == -1:
- return
- self.bytes_left = int(self._in_buffer[:pos])
- self._in_buffer = self._in_buffer[pos+1:]
- self.bytes_left -= len(self._in_buffer)
- self.state_accept = self._state_accept_reading_body
- self.state_read = self._state_read_in_buffer
-
- def _state_accept_reading_body(self, bytes):
- self._in_buffer += bytes
- self.bytes_left -= len(bytes)
- if self.bytes_left <= 0:
- # Finished with body
- if self.bytes_left != 0:
- self._trailer_buffer = self._in_buffer[self.bytes_left:]
- self._in_buffer = self._in_buffer[:self.bytes_left]
- self.bytes_left = None
- self.state_accept = self._state_accept_reading_trailer
-
- def _state_accept_reading_trailer(self, bytes):
- self._trailer_buffer += bytes
- # TODO: what if the trailer does not match "done\n"? Should this raise
- # a ProtocolViolation exception?
- if self._trailer_buffer.startswith('done\n'):
- self.unused_data = self._trailer_buffer[len('done\n'):]
- self.state_accept = self._state_accept_reading_unused
- self.finished_reading = True
-
- def _state_accept_reading_unused(self, bytes):
- self.unused_data += bytes
-
- def _state_read_no_data(self):
- return ''
-
- def _state_read_in_buffer(self):
- result = self._in_buffer
- self._in_buffer = ''
- return result
-
-
-class SmartServerStreamMedium(object):
- """Handles smart commands coming over a stream.
-
- The stream may be a pipe connected to sshd, or a tcp socket, or an
- in-process fifo for testing.
-
- One instance is created for each connected client; it can serve multiple
- requests in the lifetime of the connection.
-
- The server passes requests through to an underlying backing transport,
- which will typically be a LocalTransport looking at the server's filesystem.
- """
-
- def __init__(self, backing_transport):
- """Construct new server.
-
- :param backing_transport: Transport for the directory served.
- """
- # backing_transport could be passed to serve instead of __init__
- self.backing_transport = backing_transport
- self.finished = False
-
- def serve(self):
- """Serve requests until the client disconnects."""
- # Keep a reference to stderr because the sys module's globals get set to
- # None during interpreter shutdown.
- from sys import stderr
- try:
- while not self.finished:
- protocol = SmartServerRequestProtocolOne(self.backing_transport,
- self._write_out)
- self._serve_one_request(protocol)
- except Exception, e:
- stderr.write("%s terminating on exception %s\n" % (self, e))
- raise
-
- def _serve_one_request(self, protocol):
- """Read one request from input, process, send back a response.
-
- :param protocol: a SmartServerRequestProtocol.
- """
- try:
- self._serve_one_request_unguarded(protocol)
- except KeyboardInterrupt:
- raise
- except Exception, e:
- self.terminate_due_to_error()
-
- def terminate_due_to_error(self):
- """Called when an unhandled exception from the protocol occurs."""
- raise NotImplementedError(self.terminate_due_to_error)
-
-
-class SmartServerSocketStreamMedium(SmartServerStreamMedium):
-
- def __init__(self, sock, backing_transport):
- """Constructor.
-
- :param sock: the socket the server will read from. It will be put
- into blocking mode.
- """
- SmartServerStreamMedium.__init__(self, backing_transport)
- self.push_back = ''
- sock.setblocking(True)
- self.socket = sock
-
- def _serve_one_request_unguarded(self, protocol):
- while protocol.next_read_size():
- if self.push_back:
- protocol.accept_bytes(self.push_back)
- self.push_back = ''
- else:
- bytes = self.socket.recv(4096)
- if bytes == '':
- self.finished = True
- return
- protocol.accept_bytes(bytes)
-
- self.push_back = protocol.excess_buffer
-
- def terminate_due_to_error(self):
- """Called when an unhandled exception from the protocol occurs."""
- # TODO: This should log to a server log file, but no such thing
- # exists yet. Andrew Bennetts 2006-09-29.
- self.socket.close()
- self.finished = True
-
- def _write_out(self, bytes):
- self.socket.sendall(bytes)
-
-
-class SmartServerPipeStreamMedium(SmartServerStreamMedium):
-
- def __init__(self, in_file, out_file, backing_transport):
- """Construct new server.
-
- :param in_file: Python file from which requests can be read.
- :param out_file: Python file to write responses.
- :param backing_transport: Transport for the directory served.
- """
- SmartServerStreamMedium.__init__(self, backing_transport)
- if sys.platform == 'win32':
- # force binary mode for files
- import msvcrt
- for f in (in_file, out_file):
- fileno = getattr(f, 'fileno', None)
- if fileno:
- msvcrt.setmode(fileno(), os.O_BINARY)
- self._in = in_file
- self._out = out_file
-
- def _serve_one_request_unguarded(self, protocol):
- while True:
- bytes_to_read = protocol.next_read_size()
- if bytes_to_read == 0:
- # Finished serving this request.
- self._out.flush()
- return
- bytes = self._in.read(bytes_to_read)
- if bytes == '':
- # Connection has been closed.
- self.finished = True
- self._out.flush()
- return
- protocol.accept_bytes(bytes)
-
- def terminate_due_to_error(self):
- # TODO: This should log to a server log file, but no such thing
- # exists yet. Andrew Bennetts 2006-09-29.
- self._out.close()
- self.finished = True
-
- def _write_out(self, bytes):
- self._out.write(bytes)
-
-
-class SmartServerResponse(object):
- """Response generated by SmartServerRequestHandler."""
-
- def __init__(self, args, body=None):
- self.args = args
- self.body = body
-
-
-class SmartTCPServer(object):
- """Listens on a TCP socket and accepts connections from smart clients"""
-
- def __init__(self, backing_transport, host='127.0.0.1', port=0):
- """Construct a new server.
-
- To actually start it running, call either start_background_thread or
- serve.
-
- :param host: Name of the interface to listen on.
- :param port: TCP port to listen on, or 0 to allocate a transient port.
- """
- self._server_socket = socket.socket()
- self._server_socket.bind((host, port))
- self.port = self._server_socket.getsockname()[1]
- self._server_socket.listen(1)
- self._server_socket.settimeout(1)
- self.backing_transport = backing_transport
-
- def serve(self):
- # let connections timeout so that we get a chance to terminate
- # Keep a reference to the exceptions we want to catch because the socket
- # module's globals get set to None during interpreter shutdown.
- from socket import timeout as socket_timeout
- from socket import error as socket_error
- self._should_terminate = False
- while not self._should_terminate:
- try:
- self.accept_and_serve()
- except socket_timeout:
- # just check if we're asked to stop
- pass
- except socket_error, e:
- trace.warning("client disconnected: %s", e)
- pass
-
- def get_url(self):
- """Return the url of the server"""
- return "bzr://%s:%d/" % self._server_socket.getsockname()
-
- def accept_and_serve(self):
- conn, client_addr = self._server_socket.accept()
- # For WIN32, where the timeout value from the listening socket
- # propogates to the newly accepted socket.
- conn.setblocking(True)
- conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
- connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
- connection_thread.setDaemon(True)
- connection_thread.start()
-
- def start_background_thread(self):
- self._server_thread = threading.Thread(None,
- self.serve,
- name='server-' + self.get_url())
- self._server_thread.setDaemon(True)
- self._server_thread.start()
-
- def stop_background_thread(self):
- self._should_terminate = True
- # At one point we would wait to join the threads here, but it looks
- # like they don't actually exit. So now we just leave them running
- # and expect to terminate the process. -- mbp 20070215
- # self._server_socket.close()
- ## sys.stderr.write("waiting for server thread to finish...")
- ## self._server_thread.join()
-
-
-class SmartTCPServer_for_testing(SmartTCPServer):
- """Server suitable for use by transport tests.
-
- This server is backed by the process's cwd.
- """
-
- def __init__(self):
- self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
- # The server is set up by default like for ssh access: the client
- # passes filesystem-absolute paths; therefore the server must look
- # them up relative to the root directory. it might be better to act
- # a public server and have the server rewrite paths into the test
- # directory.
- SmartTCPServer.__init__(self,
- transport.get_transport(urlutils.local_path_to_url('/')))
-
- def get_backing_transport(self, backing_transport_server):
- """Get a backing transport from a server we are decorating."""
- return transport.get_transport(backing_transport_server.get_url())
-
- def setUp(self, backing_transport_server=None):
- """Set up server for testing"""
- from bzrlib.transport.chroot import TestingChrootServer
- if backing_transport_server is None:
- from bzrlib.transport.local import LocalURLServer
- backing_transport_server = LocalURLServer()
- self.chroot_server = TestingChrootServer()
- self.chroot_server.setUp(backing_transport_server)
- self.backing_transport = transport.get_transport(
- self.chroot_server.get_url())
- self.start_background_thread()
-
- def tearDown(self):
- self.stop_background_thread()
-
- def get_bogus_url(self):
- """Return a URL which will fail to connect"""
- return 'bzr://127.0.0.1:1/'
-
-
class _SmartStat(object):
def __init__(self, size, mode):
More information about the bazaar-commits
mailing list