Rev 4064: Streaming fetch from remote servers. in http://people.ubuntu.com/~robertc/baz2.0/branch.roundtrips

Robert Collins robertc at robertcollins.net
Mon Mar 2 03:38:16 GMT 2009


At http://people.ubuntu.com/~robertc/baz2.0/branch.roundtrips

------------------------------------------------------------
revno: 4064
revision-id: robertc at robertcollins.net-20090302033807-af1b7awmfueyawts
parent: robertc at robertcollins.net-20090227130536-wsqoyhyt3n11nc8d
committer: Robert Collins <robertc at robertcollins.net>
branch nick: branch.roundtrips
timestamp: Mon 2009-03-02 14:38:07 +1100
message:
  Streaming fetch from remote servers.
=== modified file 'NEWS'
--- a/NEWS	2009-02-27 02:44:10 +0000
+++ b/NEWS	2009-03-02 03:38:07 +0000
@@ -117,8 +117,9 @@
       rather than the sometimes-absent disk label. (Robert Collins)
 
     * ``bzrlib.fetch`` is now composed of a sender and a sink component
-      allowing for decoupling over a network connection. Fetching into
-      a RemoteRepository uses this to stream the operation.
+      allowing for decoupling over a network connection. Fetching from
+      or into a RemoteRepository with a 1.13 server will use this to
+      stream the operation.
       (Andrew Bennetts, Robert Collins)
 
     * ``bzrlib.tests.run_suite`` accepts a runner_class parameter

=== modified file 'bzrlib/fetch.py'
--- a/bzrlib/fetch.py	2009-02-27 13:05:36 +0000
+++ b/bzrlib/fetch.py	2009-03-02 03:38:07 +0000
@@ -40,7 +40,7 @@
 from bzrlib.tsort import topo_sort
 from bzrlib.trace import mutter
 import bzrlib.ui
-from bzrlib.versionedfile import filter_absent, FulltextContentFactory
+from bzrlib.versionedfile import FulltextContentFactory
 
 # TODO: Avoid repeatedly opening weaves so many times.
 

=== modified file 'bzrlib/pack.py'
--- a/bzrlib/pack.py	2009-01-17 01:30:58 +0000
+++ b/bzrlib/pack.py	2009-03-02 03:38:07 +0000
@@ -411,10 +411,15 @@
             self._state_handler()
             cur_buffer_length = len(self._buffer)
 
-    def read_pending_records(self):
-        records = self._parsed_records
-        self._parsed_records = []
-        return records
+    def read_pending_records(self, max=None):
+        if max:
+            records = self._parsed_records[:max]
+            del self._parsed_records[:max]
+            return records
+        else:
+            records = self._parsed_records
+            self._parsed_records = []
+            return records
 
     def _consume_line(self):
         """Take a line out of the buffer, and return the line.

=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py	2009-02-27 13:05:36 +0000
+++ b/bzrlib/remote.py	2009-03-02 03:38:07 +0000
@@ -40,11 +40,10 @@
     SmartProtocolError,
     )
 from bzrlib.lockable_files import LockableFiles
-from bzrlib.smart import client, vfs
+from bzrlib.smart import client, vfs, repository as smart_repo
 from bzrlib.revision import ensure_null, NULL_REVISION
 from bzrlib.trace import mutter, note, warning
 from bzrlib.util import bencode
-from bzrlib.versionedfile import record_to_fulltext_bytes
 
 
 class _RpcHelper(object):
@@ -1434,14 +1433,15 @@
             # do not fallback when actually pushing the stream. A cleanup patch
             # is going to look at rewinding/restarting the stream/partial
             # buffering etc.
-            byte_stream = self._stream_to_byte_stream([], src_format)
+            byte_stream = smart_repo._stream_to_byte_stream([], src_format)
             try:
                 response = client.call_with_body_stream(
                     ('Repository.insert_stream', path, ''), byte_stream)
             except errors.UnknownSmartMethod:
                 medium._remember_remote_is_before((1,13))
                 return self._insert_real(stream, src_format, resume_tokens)
-        byte_stream = self._stream_to_byte_stream(stream, src_format)
+        byte_stream = smart_repo._stream_to_byte_stream(
+            stream, src_format)
         resume_tokens = ' '.join(resume_tokens)
         response = client.call_with_body_stream(
             ('Repository.insert_stream', path, resume_tokens), byte_stream)
@@ -1459,42 +1459,45 @@
                     collection.reload_pack_names()
             return [], set()
 
-    def _stream_to_byte_stream(self, stream, src_format):
-        bytes = []
-        pack_writer = pack.ContainerWriter(bytes.append)
-        pack_writer.begin()
-        pack_writer.add_bytes_record(src_format.network_name(), '')
-        adapters = {}
-        def get_adapter(adapter_key):
-            try:
-                return adapters[adapter_key]
-            except KeyError:
-                adapter_factory = adapter_registry.get(adapter_key)
-                adapter = adapter_factory(self)
-                adapters[adapter_key] = adapter
-                return adapter
-        for substream_type, substream in stream:
-            for record in substream:
-                if record.storage_kind in ('chunked', 'fulltext'):
-                    serialised = record_to_fulltext_bytes(record)
-                else:
-                    serialised = record.get_bytes_as(record.storage_kind)
-                if serialised:
-                    # Some streams embed the whole stream into the wire
-                    # representation of the first record, which means that
-                    # later records have no wire representation: we skip them.
-                    pack_writer.add_bytes_record(serialised, [(substream_type,)])
-                for b in bytes:
-                    yield b
-                del bytes[:]
-        pack_writer.end()
-        for b in bytes:
-            yield b
-
 
 class RemoteStreamSource(repository.StreamSource):
     """Stream data from a remote server."""
 
+    def get_stream(self, search):
+        # streaming with fallback repositories is not well defined yet: The
+        # remote repository cannot see the fallback repositories, and thus
+        # cannot satisfy the entire search in the general case. Likewise the
+        # fallback repositories cannot reify the search to determine what they
+        # should send. It likely needs a return value in the stream listing the
+        # edge of the search to resume from in fallback repositories.
+        if self.from_repository._fallback_repositories:
+            return repository.StreamSource.get_stream(self, search)
+        repo = self.from_repository
+        client = repo._client
+        medium = client._medium
+        if medium._is_remote_before((1, 13)):
+            # No possible way this can work.
+            return repository.StreamSource.get_stream(self, search)
+        path = repo.bzrdir._path_for_remote_call(client)
+        try:
+            recipe = repo._serialise_search_recipe(search._recipe)
+            response = repo._call_with_body_bytes_expecting_body(
+                'Repository.StreamSource.get_stream',
+                (path, self.to_format.network_name()), recipe)
+            response_tuple, response_handler = response
+        except errors.UnknownSmartMethod:
+            medium._remember_remote_is_before((1,13))
+            return repository.StreamSource.get_stream(self, search)
+        if response_tuple[0] != 'ok':
+            raise errors.UnexpectedSmartServerResponse(response_tuple)
+        byte_stream = response_handler.read_streamed_body()
+        src_format, stream = smart_repo._byte_stream_to_stream(byte_stream)
+        if src_format.network_name() != repo._format.network_name():
+            raise AssertionError(
+                "Mismatched RemoteRepository and stream src %r, %r" % (
+                src_format.network_name(), repo._format.network_name()))
+        return stream
+
 
 class RemoteBranchLockableFiles(LockableFiles):
     """A 'LockableFiles' implementation that talks to a smart server.

=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py	2009-02-27 13:05:36 +0000
+++ b/bzrlib/repository.py	2009-03-02 03:38:07 +0000
@@ -3675,7 +3675,7 @@
         from_sf = self.from_repository.signatures
         # A missing signature is just skipped.
         keys = [(rev_id,) for rev_id in revs]
-        signatures = filter_absent(from_sf.get_record_stream(
+        signatures = versionedfile.filter_absent(from_sf.get_record_stream(
             keys,
             self.to_format._fetch_order,
             not self.to_format._fetch_uses_deltas))

=== modified file 'bzrlib/smart/repository.py'
--- a/bzrlib/smart/repository.py	2009-02-25 00:31:09 +0000
+++ b/bzrlib/smart/repository.py	2009-03-02 03:38:07 +0000
@@ -39,7 +39,7 @@
 from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
 from bzrlib import revision as _mod_revision
 from bzrlib.util import bencode
-from bzrlib.versionedfile import NetworkRecordStream
+from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
 
 
 class SmartServerRepositoryRequest(SmartServerRequest):
@@ -333,6 +333,104 @@
         return SuccessfulSmartServerResponse(('ok', token))
 
 
+class SmartServerRepositoryStreamSourceGetStream(SmartServerRepositoryRequest):
+
+    def do_repository_request(self, repository, to_network_name):
+        """Get a stream for inserting into a to_format repository.
+
+        :param repository: The repository to stream from.
+        :param to_network_name: The network name of the format of the target
+            repository.
+        """
+        self._to_format = network_format_registry.get(to_network_name)
+        return None # Signal that we want a body.
+
+    def do_body(self, body_bytes):
+        repository = self._repository
+        repository.lock_read()
+        try:
+            search, error = self.recreate_search(repository, body_bytes)
+            if error is not None:
+                repository.unlock()
+                return error
+            search = search.get_result()
+            source = repository._get_source(self._to_format)
+            stream = source.get_stream(search)
+        except Exception:
+            exc_info = sys.exc_info()
+            try:
+                # On non-error, unlocking is done by the body stream handler.
+                repository.unlock()
+            finally:
+                raise exc_info[0], exc_info[1], exc_info[2]
+        return SuccessfulSmartServerResponse(('ok',),
+            body_stream=self.body_stream(stream, repository))
+
+    def body_stream(self, stream, repository):
+        byte_stream = _stream_to_byte_stream(stream, repository._format)
+        try:
+            for bytes in byte_stream:
+                yield bytes
+        except errors.RevisionNotPresent, e:
+            # This shouldn't be able to happen, but as we don't buffer
+            # everything it can in theory happen.
+            repository.unlock()
+            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
+        else:
+            repository.unlock()
+
+
+def _stream_to_byte_stream(stream, src_format):
+    """Convert a record stream to a self delimited byte stream."""
+    pack_writer = pack.ContainerSerialiser()
+    yield pack_writer.begin()
+    yield pack_writer.bytes_record(src_format.network_name(), '')
+    for substream_type, substream in stream:
+        for record in substream:
+            if record.storage_kind in ('chunked', 'fulltext'):
+                serialised = record_to_fulltext_bytes(record)
+            else:
+                serialised = record.get_bytes_as(record.storage_kind)
+            if serialised:
+                # Some streams embed the whole stream into the wire
+                # representation of the first record, which means that
+                # later records have no wire representation: we skip them.
+                yield pack_writer.bytes_record(serialised, [(substream_type,)])
+    yield pack_writer.end()
+
+
+def _byte_stream_to_stream(byte_stream):
+    """Convert a byte stream into a format and a StreamSource stream.
+
+    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
+    :return: (RepositoryFormat, stream_generator)
+    """
+    stream_decoder = pack.ContainerPushParser()
+    def record_stream():
+        """Closure to return the substreams."""
+        # May have fully parsed records already.
+        for record in stream_decoder.read_pending_records():
+            record_names, record_bytes = record
+            record_name, = record_names
+            substream_type = record_name[0]
+            substream = NetworkRecordStream([record_bytes])
+            yield substream_type, substream.read()
+        for bytes in byte_stream:
+            stream_decoder.accept_bytes(bytes)
+            for record in stream_decoder.read_pending_records():
+                record_names, record_bytes = record
+                record_name, = record_names
+                substream_type = record_name[0]
+                substream = NetworkRecordStream([record_bytes])
+                yield substream_type, substream.read()
+    for bytes in byte_stream:
+        stream_decoder.accept_bytes(bytes)
+        for record in stream_decoder.read_pending_records(max=1):
+            record_names, src_format_name = record
+            src_format = network_format_registry.get(src_format_name)
+            return src_format, record_stream()
+
+
 class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
 
     def do_repository_request(self, repository, token):
@@ -415,6 +513,12 @@
 
 
 class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
+    """Insert a record stream from a RemoteSink into a repository.
+
+    This gets bytes pushed to it by the network infrastructure and turns that
+    into a bytes iterator using a thread. That is then processed by
+    _byte_stream_to_stream.
+    """
 
     def do_repository_request(self, repository, resume_tokens):
         """StreamSink.insert_stream for a remote repository."""
@@ -422,44 +526,31 @@
         tokens = [token for token in resume_tokens.split(' ') if token]
         self.tokens = tokens
         self.repository = repository
-        self.stream_decoder = pack.ContainerPushParser()
-        self.src_format = None
         self.queue = Queue.Queue()
-        self.insert_thread = None
+        self.insert_thread = threading.Thread(target=self._inserter_thread)
+        self.insert_thread.start()
 
     def do_chunk(self, body_stream_chunk):
-        self.stream_decoder.accept_bytes(body_stream_chunk)
-        for record in self.stream_decoder.read_pending_records():
-            record_names, record_bytes = record
-            if self.src_format is None:
-                src_format_name = record_bytes
-                src_format = network_format_registry.get(src_format_name)
-                self.src_format = src_format
-                self.insert_thread = threading.Thread(target=self._inserter_thread)
-                self.insert_thread.start()
-            else:
-                record_name, = record_names
-                substream_type = record_name[0]
-                stream = NetworkRecordStream([record_bytes])
-                for record in stream.read():
-                    self.queue.put((substream_type, [record]))
+        self.queue.put(body_stream_chunk)
 
     def _inserter_thread(self):
         try:
+            src_format, stream = _byte_stream_to_stream(
+                self.blocking_byte_stream())
             self.insert_result = self.repository._get_sink().insert_stream(
-                self.blocking_read_stream(), self.src_format, self.tokens)
+                stream, src_format, self.tokens)
             self.insert_ok = True
         except:
             self.insert_exception = sys.exc_info()
             self.insert_ok = False
 
-    def blocking_read_stream(self):
+    def blocking_byte_stream(self):
         while True:
-            item = self.queue.get()
-            if item is StopIteration:
+            bytes = self.queue.get()
+            if bytes is StopIteration:
                 return
             else:
-                yield item
+                yield bytes
 
     def do_end(self):
         self.queue.put(StopIteration)

=== modified file 'bzrlib/smart/request.py'
--- a/bzrlib/smart/request.py	2009-02-26 04:25:00 +0000
+++ b/bzrlib/smart/request.py	2009-03-02 03:38:07 +0000
@@ -465,6 +465,9 @@
 request_handlers.register_lazy(
     'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock')
 request_handlers.register_lazy(
+    'Repository.StreamSource.get_stream', 'bzrlib.smart.repository',
+    'SmartServerRepositoryStreamSourceGetStream')
+request_handlers.register_lazy(
     'Repository.tarball', 'bzrlib.smart.repository',
     'SmartServerRepositoryTarball')
 request_handlers.register_lazy(

=== modified file 'bzrlib/tests/blackbox/test_branch.py'
--- a/bzrlib/tests/blackbox/test_branch.py	2009-02-27 03:54:39 +0000
+++ b/bzrlib/tests/blackbox/test_branch.py	2009-03-02 03:38:07 +0000
@@ -259,6 +259,22 @@
 
 class TestSmartServerBranching(ExternalBase):
 
+    def test_branch_from_trivial_branch_to_same_server_branch_acceptance(self):
+        self.setup_smart_server_with_call_log()
+        t = self.make_branch_and_tree('from')
+        for count in range(9):
+            t.commit(message='commit %d' % count)
+        self.reset_smart_call_log()
+        out, err = self.run_bzr(['branch', self.get_url('from'),
+            self.get_url('target')])
+        rpc_count = len(self.hpss_calls)
+        # This figure represent the amount of work to perform this use case. It
+        # is entirely ok to reduce this number if a test fails due to rpc_count
+        # being too low. If rpc_count increases, more network roundtrips have
+        # become necessary for this use case. Please do not adjust this number
+        # upwards without agreement from bzr's network support maintainers.
+        self.assertEqual(99, rpc_count)
+
     def test_branch_from_trivial_branch_streaming_acceptance(self):
         self.setup_smart_server_with_call_log()
         t = self.make_branch_and_tree('from')
@@ -273,7 +289,7 @@
         # being too low. If rpc_count increases, more network roundtrips have
         # become necessary for this use case. Please do not adjust this number
         # upwards without agreement from bzr's network support maintainers.
-        self.assertEqual(78, rpc_count)
+        self.assertEqual(25, rpc_count)
 
 
 class TestRemoteBranch(TestCaseWithSFTPServer):

=== modified file 'bzrlib/tests/branch_implementations/test_branch.py'
--- a/bzrlib/tests/branch_implementations/test_branch.py	2009-02-24 08:09:17 +0000
+++ b/bzrlib/tests/branch_implementations/test_branch.py	2009-03-02 03:38:07 +0000
@@ -109,7 +109,7 @@
         wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
         b2 = self.make_branch('b2')
-        self.assertEqual((1, []), b2.fetch(b1))
+        b2.fetch(b1)
 
         rev = b2.repository.get_revision('revision-1')
         tree = b2.repository.revision_tree('revision-1')

=== modified file 'bzrlib/tests/bzrdir_implementations/test_bzrdir.py'
--- a/bzrlib/tests/bzrdir_implementations/test_bzrdir.py	2009-02-23 15:29:35 +0000
+++ b/bzrlib/tests/bzrdir_implementations/test_bzrdir.py	2009-03-02 03:38:07 +0000
@@ -553,7 +553,7 @@
         # Ensure no format data is cached
         a_dir = bzrlib.branch.Branch.open_from_transport(
             self.get_transport('source')).bzrdir
-        target_transport = a_dir.root_transport.clone('..').clone('target')
+        target_transport = self.get_transport('target')
         target_bzrdir = a_dir.clone_on_transport(target_transport)
         target_repo = target_bzrdir.open_repository()
         source_branch = bzrlib.branch.Branch.open(
@@ -655,7 +655,7 @@
 
     def test_clone_respects_stacked(self):
         branch = self.make_branch('parent')
-        child_transport = branch.bzrdir.root_transport.clone('../child')
+        child_transport = self.get_transport('child')
         child = branch.bzrdir.clone_on_transport(child_transport,
                                                  stacked_on=branch.base)
         self.assertEqual(child.open_branch().get_stacked_on_url(), branch.base)

=== modified file 'bzrlib/tests/per_repository/test_fetch.py'
--- a/bzrlib/tests/per_repository/test_fetch.py	2009-01-17 01:30:58 +0000
+++ b/bzrlib/tests/per_repository/test_fetch.py	2009-03-02 03:38:07 +0000
@@ -46,7 +46,7 @@
                    revision_id=None)
                    ## pb=bzrlib.progress.DummyProgress())
 
-    def test_fetch_knit3(self):
+    def test_fetch_to_knit3(self):
         # create a repository of the sort we are testing.
         tree_a = self.make_branch_and_tree('a')
         self.build_tree(['a/foo'])
@@ -80,7 +80,10 @@
         try:
             tree_b = b_bzrdir.create_workingtree()
         except errors.NotLocalUrl:
-            raise TestSkipped("cannot make working tree with transport %r"
+            try:
+                tree_b = b_branch.create_checkout('b', lightweight=True)
+            except errors.NotLocalUrl:
+                raise TestSkipped("cannot make working tree with transport %r"
                               % b_bzrdir.transport)
         tree_b.commit('no change', rev_id='rev2')
         rev2_tree = knit3_repo.revision_tree('rev2')

=== modified file 'bzrlib/tests/test_smart.py'
--- a/bzrlib/tests/test_smart.py	2009-02-27 01:02:40 +0000
+++ b/bzrlib/tests/test_smart.py	2009-03-02 03:38:07 +0000
@@ -1201,6 +1201,9 @@
             smart.request.request_handlers.get('Repository.lock_write'),
             smart.repository.SmartServerRepositoryLockWrite)
         self.assertEqual(
+            smart.request.request_handlers.get('Repository.StreamSource.get_stream'),
+            smart.repository.SmartServerRepositoryStreamSourceGetStream)
+        self.assertEqual(
             smart.request.request_handlers.get('Repository.tarball'),
             smart.repository.SmartServerRepositoryTarball)
         self.assertEqual(

=== modified file 'bzrlib/versionedfile.py'
--- a/bzrlib/versionedfile.py	2009-02-23 15:29:35 +0000
+++ b/bzrlib/versionedfile.py	2009-03-02 03:38:07 +0000
@@ -1534,19 +1534,19 @@
 def fulltext_network_to_record(kind, bytes, line_end):
     """Convert a network fulltext record to record."""
     meta_len, = struct.unpack('!L', bytes[line_end:line_end+4])
-    record_meta = record_bytes[line_end+4:line_end+4+meta_len]
+    record_meta = bytes[line_end+4:line_end+4+meta_len]
     key, parents = bencode.bdecode_as_tuple(record_meta)
     if parents == 'nil':
         parents = None
-    fulltext = record_bytes[line_end+4+meta_len:]
-    return FulltextContentFactory(key, parents, None, fulltext)
+    fulltext = bytes[line_end+4+meta_len:]
+    return [FulltextContentFactory(key, parents, None, fulltext)]
 
 
 def _length_prefix(bytes):
     return struct.pack('!L', len(bytes))
 
 
-def record_to_fulltext_bytes(self, record):
+def record_to_fulltext_bytes(record):
     if record.parents is None:
         parents = 'nil'
     else:




More information about the bazaar-commits mailing list