Rev 4030: Support streaming push to stacked branches. in http://people.ubuntu.com/~robertc/baz2.0/fetch.RemoteSink

Robert Collins robertc at robertcollins.net
Sun Feb 22 23:59:01 GMT 2009


At http://people.ubuntu.com/~robertc/baz2.0/fetch.RemoteSink

------------------------------------------------------------
revno: 4030
revision-id: robertc at robertcollins.net-20090222235854-tznrapt5n0laqqzs
parent: pqm at pqm.ubuntu.com-20090221005219-fbvxjf43dzxujri8
committer: Robert Collins <robertc at robertcollins.net>
branch nick: fetch.RemoteSink
timestamp: Mon 2009-02-23 10:58:54 +1100
message:
  Support streaming push to stacked branches.
=== modified file 'NEWS'
--- a/NEWS	2009-02-20 10:48:51 +0000
+++ b/NEWS	2009-02-22 23:58:54 +0000
@@ -33,6 +33,10 @@
     * Bazaar will now point out ``bzr+ssh://`` to the user when they 
       use ssh://. (Jelmer Vernooij, #330535)
 
+    * ``bzr push`` to a ``bzr`` url (``bzr://``, ``bzr+ssh://`` etc) will
+      stream if the server is version 1.13 or greater, reducing roundtrips
+      significantly. (Andrew Bennetts, Robert Collins)
+
     * Support for GSSAPI authentication when using HTTP or HTTPS. 
       (Jelmer Vernooij)
 
@@ -63,8 +67,9 @@
   INTERNALS:
 
     * ``bzrlib.fetch`` is now composed of a sender and a sink component
-      allowing for decoupling over a network connection. (Andrew Bennetts,
-      Robert Collins)
+      allowing for decoupling over a network connection. Fetching into
+      a RemoteRepository uses this to stream the operation.
+      (Andrew Bennetts, Robert Collins)
 
     * There is a RemoteSink object which handles pushing to smart servers.
       (Andrew Bennetts, Robert Collins)

=== modified file 'bzrlib/fetch.py'
--- a/bzrlib/fetch.py	2009-02-20 05:28:24 +0000
+++ b/bzrlib/fetch.py	2009-02-22 23:58:54 +0000
@@ -161,12 +161,19 @@
         try:
             from_format = self.from_repository._format
             stream = self.get_stream(search, pp)
-            self.sink.insert_stream(stream, from_format)
+            missing_keys = self.sink.insert_stream(stream, from_format)
+            if missing_keys:
+                stream = self.get_stream_for_missing_keys(missing_keys)
+                missing_keys = self.sink.insert_stream(stream, from_format)
+            if missing_keys:
+                raise AssertionError(
+                    "second push failed to complete a fetch %r." % (
+                        missing_keys,))
             self.sink.finished()
         finally:
             if self.pb is not None:
                 self.pb.finished()
-        
+
     def get_stream(self, search, pp):
         phase = 'file'
         revs = search.get_keys()
@@ -224,6 +231,32 @@
                 raise AssertionError("Unknown knit kind %r" % knit_kind)
         self.count_copied += len(revs)
 
+    def get_stream_for_missing_keys(self, missing_keys):
+        # missing keys can only occur when we are byte copying and not
+        # translating (because translation means we don't send
+        # unreconstructable deltas ever).
+        keys = {}
+        keys['texts'] = set()
+        keys['revisions'] = set()
+        keys['inventories'] = set()
+        keys['signatures'] = set()
+        for key in missing_keys:
+            keys[key[0]].add(key[1:])
+        if len(keys['revisions']):
+            # If we allowed copying revisions at this point, we could end up
+            # copying a revision without copying its required texts: a
+            # violation of the requirements for repository integrity.
+            raise AssertionError(
+                'cannot copy revisions to fill in missing deltas %s' % (
+                    keys['revisions'],))
+        for substream_kind, keys in keys.iteritems():
+            vf = getattr(self.from_repository, substream_kind)
+            # Ask for full texts always so that we don't need more round trips
+            # after this stream.
+            stream = vf.get_record_stream(keys,
+                self.to_repository._fetch_order, True)
+            yield substream_kind, stream
+
     def _revids_to_fetch(self):
         """Determines the exact revisions needed from self.from_repository to
         install self._last_revision in self.to_repository.

=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py	2009-02-20 10:48:51 +0000
+++ b/bzrlib/knit.py	2009-02-22 23:58:54 +0000
@@ -51,14 +51,6 @@
 
 """
 
-# TODOS:
-# 10:16 < lifeless> make partial index writes safe
-# 10:16 < lifeless> implement 'knit.check()' like weave.check()
-# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave 
-#                    always' approach.
-# move sha1 out of the content so that join is faster at verifying parents
-# record content length ?
-                  
 
 from cStringIO import StringIO
 from itertools import izip, chain

=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py	2009-02-21 00:05:58 +0000
+++ b/bzrlib/remote.py	2009-02-22 23:58:54 +0000
@@ -43,6 +43,7 @@
 from bzrlib.smart import client, vfs
 from bzrlib.revision import ensure_null, NULL_REVISION
 from bzrlib.trace import mutter, note, warning
+from bzrlib.util import bencode
 from bzrlib.versionedfile import record_to_fulltext_bytes
 
 
@@ -1345,42 +1346,60 @@
 
 class RemoteStreamSink(repository.StreamSink):
 
+    def __init__(self, target_repo):
+        repository.StreamSink.__init__(self, target_repo)
+        self._resume_tokens = []
+
     def _insert_real(self, stream, src_format):
         self.target_repo._ensure_real()
         sink = self.target_repo._real_repository._get_sink()
-        return sink.insert_stream(stream, src_format)
+        result = sink.insert_stream(stream, src_format)
+        if not result:
+            self.target_repo.autopack()
+        return result
 
     def insert_stream(self, stream, src_format):
         repo = self.target_repo
-        # Until we can handle deltas in stack repositories we can't hand all
-        # the processing off to a remote server.
-        if self.target_repo._fallback_repositories:
-            return self._insert_real(stream, src_format)
         client = repo._client
         medium = client._medium
-        if medium._is_remote_before((1,13)):
+        if medium._is_remote_before((1, 13)):
             # No possible way this can work.
             return self._insert_real(stream, src_format)
         path = repo.bzrdir._path_for_remote_call(client)
-        # XXX: Ugly but important for correctness, *will* be fixed during 1.13
-        # cycle. Pushing a stream that is interrupted results in a fallback to
-        # the _real_repositories sink *with a partial stream*. Thats bad
-        # because we insert less data than bzr expected. To avoid this we do a
-        # trial push to make sure the verb is accessible, and do not fallback
-        # when actually pushing the stream. A cleanup patch is going to look at
-        # rewinding/restarting the stream/partial buffering etc.
-        byte_stream = self._stream_to_byte_stream([], src_format)
-        try:
-            response = client.call_with_body_stream(
-                ('Repository.insert_stream', path), byte_stream)
-        except errors.UnknownSmartMethod:
-            medium._remember_remote_is_before((1,13))
-            return self._insert_real(stream, src_format)
+        if not self._resume_tokens:
+            # XXX: Ugly but important for correctness, *will* be fixed during
+            # 1.13 cycle. Pushing a stream that is interrupted results in a
+            # fallback to the _real_repositories sink *with a partial stream*.
+            # Thats bad because we insert less data than bzr expected. To avoid
+            # this we do a trial push to make sure the verb is accessible, and
+            # do not fallback when actually pushing the stream. A cleanup patch
+            # is going to look at rewinding/restarting the stream/partial
+            # buffering etc.
+            byte_stream = self._stream_to_byte_stream([], src_format)
+            try:
+                resume_tokens = ''
+                response = client.call_with_body_stream(
+                    ('Repository.insert_stream', path, resume_tokens), byte_stream)
+            except errors.UnknownSmartMethod:
+                medium._remember_remote_is_before((1,13))
+                return self._insert_real(stream, src_format)
         byte_stream = self._stream_to_byte_stream(stream, src_format)
+        resume_tokens = ' '.join(self._resume_tokens)
         response = client.call_with_body_stream(
-            ('Repository.insert_stream', path), byte_stream)
-        if response[0][0] not in ('ok', ):
+            ('Repository.insert_stream', path, resume_tokens), byte_stream)
+        if response[0][0] not in ('ok', 'missing-basis'):
             raise errors.UnexpectedSmartServerResponse(response)
+        if response[0][0] == 'missing-basis':
+            tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
+            self._resume_tokens = tokens
+            return missing_keys
+        else:
+            if self.target_repo._real_repository is not None:
+                collection = getattr(self.target_repo._real_repository,
+                    '_pack_collection', None)
+                if collection is not None:
+                    collection.reload_pack_names()
+            return []
             
     def _stream_to_byte_stream(self, stream, src_format):
         bytes = []

=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py	2009-02-20 10:48:51 +0000
+++ b/bzrlib/repository.py	2009-02-22 23:58:54 +0000
@@ -3496,7 +3496,6 @@
         from bzrlib.fetch import RepoFetcher
         fetcher = RepoFetcher(self.target, self.source, revision_id,
                               pb, find_ghosts)
-        self.target.autopack()
         return fetcher.count_copied, fetcher.failed_revisions
         
     def _get_target_pack_collection(self):

=== modified file 'bzrlib/smart/repository.py'
--- a/bzrlib/smart/repository.py	2009-02-20 08:26:50 +0000
+++ b/bzrlib/smart/repository.py	2009-02-22 23:58:54 +0000
@@ -413,10 +413,14 @@
 
 class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
 
-    def do_repository_request(self, repository):
+    def do_repository_request(self, repository, resume_tokens):
         """StreamSink.insert_stream for a remote repository."""
         repository.lock_write()
-        repository.start_write_group()
+        tokens = [token for token in resume_tokens.split(' ') if token]
+        if tokens:
+            repository.resume_write_group(tokens)
+        else:
+            repository.start_write_group()
         self.repository = repository
         self.stream_decoder = pack.ContainerPushParser()
         self.src_format = None
@@ -456,8 +460,32 @@
         self.queue.put(StopIteration)
         if self.insert_thread is not None:
             self.insert_thread.join()
+        try:
+            missing_keys = set()
+            for prefix, versioned_file in (
+                ('texts', self.repository.texts),
+                ('inventories', self.repository.inventories),
+                ('revisions', self.repository.revisions),
+                ('signatures', self.repository.signatures),
+                ):
+                missing_keys.update((prefix,) + key for key in
+                    versioned_file.get_missing_compression_parent_keys())
+        except NotImplementedError:
+            # cannot even attempt suspending.
+            pass
+        else:
+            if missing_keys:
+                # suspend the write group and tell the caller what we is
+                # missing. We know we can suspend or else we would not have
+                # entered this code path. (All repositories that can handle
+                # missing keys can handle suspending a write group).
+                write_group_tokens = self.repository.suspend_write_group()
+                # bzip needed? missing keys should typically be a small set.
+                # Should this be a streaming body response ?
+                missing_keys = sorted(missing_keys)
+                bytes = bencode.bencode((write_group_tokens, missing_keys))
+                return SuccessfulSmartServerResponse(('missing-basis', bytes))
+        # All finished.
         self.repository.commit_write_group()
         self.repository.unlock()
         return SuccessfulSmartServerResponse(('ok', ))
-
-

=== modified file 'bzrlib/tests/blackbox/test_push.py'
--- a/bzrlib/tests/blackbox/test_push.py	2009-02-21 00:05:58 +0000
+++ b/bzrlib/tests/blackbox/test_push.py	2009-02-22 23:58:54 +0000
@@ -202,7 +202,7 @@
         # being too low. If rpc_count increases, more network roundtrips have
         # become necessary for this use case. Please do not adjust this number
         # upwards without agreement from bzr's network support maintainers.
-        self.assertEqual(61, rpc_count)
+        self.assertEqual(60, rpc_count)
 
     def test_push_smart_stacked_streaming_acceptance(self):
         self.setup_smart_server_with_call_log()
@@ -219,7 +219,7 @@
         # being too low. If rpc_count increases, more network roundtrips have
         # become necessary for this use case. Please do not adjust this number
         # upwards without agreement from bzr's network support maintainers.
-        self.assertEqual(101, rpc_count)
+        self.assertEqual(85, rpc_count)
         remote = Branch.open('public')
         self.assertEndsWith(remote.get_stacked_on_url(), '/parent')
 

=== modified file 'bzrlib/tests/test_pack_repository.py'
--- a/bzrlib/tests/test_pack_repository.py	2009-02-20 06:16:22 +0000
+++ b/bzrlib/tests/test_pack_repository.py	2009-02-22 23:58:54 +0000
@@ -816,7 +816,7 @@
     def get_format(self):
         return bzrdir.format_registry.make_bzrdir(self.format_name)
 
-    def test_autopack_rpc_is_used_when_using_hpss(self):
+    def test_autopack_or_streaming_rpc_is_used_when_using_hpss(self):
         # Make local and remote repos
         tree = self.make_branch_and_tree('local', format=self.get_format())
         self.make_branch_and_tree('remote', format=self.get_format())
@@ -831,7 +831,21 @@
         self.hpss_calls = []
         tree.commit('commit triggering pack')
         tree.branch.push(remote_branch)
-        self.assertTrue('PackRepository.autopack' in self.hpss_calls)
+        autopack_calls = len([call for call in self.hpss_calls if call ==
+            'PackRepository.autopack'])
+        streaming_calls = len([call for call in self.hpss_calls if call ==
+            'Repository.insert_stream'])
+        if autopack_calls:
+            # Non streaming server
+            self.assertEqual(1, autopack_calls)
+            self.assertEqual(0, streaming_calls)
+        else:
+            # Streaming was used, which autopacks on the remote end.
+            self.assertEqual(0, autopack_calls)
+            # NB: The 2 calls are because of the sanity check that the server
+            # supports the verb (see remote.py:RemoteSink.insert_stream for
+            # details).
+            self.assertEqual(2, streaming_calls)
 
 
 def load_tests(basic_tests, module, test_loader):




More information about the bazaar-commits mailing list