Rev 4030: Support streaming push to stacked branches. in http://people.ubuntu.com/~robertc/baz2.0/fetch.RemoteSink
Robert Collins
robertc at robertcollins.net
Sun Feb 22 23:59:01 GMT 2009
At http://people.ubuntu.com/~robertc/baz2.0/fetch.RemoteSink
------------------------------------------------------------
revno: 4030
revision-id: robertc at robertcollins.net-20090222235854-tznrapt5n0laqqzs
parent: pqm at pqm.ubuntu.com-20090221005219-fbvxjf43dzxujri8
committer: Robert Collins <robertc at robertcollins.net>
branch nick: fetch.RemoteSink
timestamp: Mon 2009-02-23 10:58:54 +1100
message:
Support streaming push to stacked branches.
=== modified file 'NEWS'
--- a/NEWS 2009-02-20 10:48:51 +0000
+++ b/NEWS 2009-02-22 23:58:54 +0000
@@ -33,6 +33,10 @@
* Bazaar will now point out ``bzr+ssh://`` to the user when they
use ssh://. (Jelmer Vernooij, #330535)
+ * ``bzr push`` to a ``bzr`` url (``bzr://``, ``bzr+ssh://`` etc) will
+ stream if the server is version 1.13 or greater, reducing roundtrips
+ significantly. (Andrew Bennetts, Robert Collins)
+
* Support for GSSAPI authentication when using HTTP or HTTPS.
(Jelmer Vernooij)
@@ -63,8 +67,9 @@
INTERNALS:
* ``bzrlib.fetch`` is now composed of a sender and a sink component
- allowing for decoupling over a network connection. (Andrew Bennetts,
- Robert Collins)
+ allowing for decoupling over a network connection. Fetching into
+ a RemoteRepository uses this to stream the operation.
+ (Andrew Bennetts, Robert Collins)
* There is a RemoteSink object which handles pushing to smart servers.
(Andrew Bennetts, Robert Collins)
=== modified file 'bzrlib/fetch.py'
--- a/bzrlib/fetch.py 2009-02-20 05:28:24 +0000
+++ b/bzrlib/fetch.py 2009-02-22 23:58:54 +0000
@@ -161,12 +161,19 @@
try:
from_format = self.from_repository._format
stream = self.get_stream(search, pp)
- self.sink.insert_stream(stream, from_format)
+ missing_keys = self.sink.insert_stream(stream, from_format)
+ if missing_keys:
+ stream = self.get_stream_for_missing_keys(missing_keys)
+ missing_keys = self.sink.insert_stream(stream, from_format)
+ if missing_keys:
+ raise AssertionError(
+ "second push failed to complete a fetch %r." % (
+ missing_keys,))
self.sink.finished()
finally:
if self.pb is not None:
self.pb.finished()
-
+
def get_stream(self, search, pp):
phase = 'file'
revs = search.get_keys()
@@ -224,6 +231,32 @@
raise AssertionError("Unknown knit kind %r" % knit_kind)
self.count_copied += len(revs)
+ def get_stream_for_missing_keys(self, missing_keys):
+ # missing keys can only occur when we are byte copying and not
+ # translating (because translation means we don't send
+ # unreconstructable deltas ever).
+ keys = {}
+ keys['texts'] = set()
+ keys['revisions'] = set()
+ keys['inventories'] = set()
+ keys['signatures'] = set()
+ for key in missing_keys:
+ keys[key[0]].add(key[1:])
+ if len(keys['revisions']):
+ # If we allowed copying revisions at this point, we could end up
+ # copying a revision without copying its required texts: a
+ # violation of the requirements for repository integrity.
+ raise AssertionError(
+ 'cannot copy revisions to fill in missing deltas %s' % (
+ keys['revisions'],))
+ for substream_kind, keys in keys.iteritems():
+ vf = getattr(self.from_repository, substream_kind)
+ # Ask for full texts always so that we don't need more round trips
+ # after this stream.
+ stream = vf.get_record_stream(keys,
+ self.to_repository._fetch_order, True)
+ yield substream_kind, stream
+
def _revids_to_fetch(self):
"""Determines the exact revisions needed from self.from_repository to
install self._last_revision in self.to_repository.
=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py 2009-02-20 10:48:51 +0000
+++ b/bzrlib/knit.py 2009-02-22 23:58:54 +0000
@@ -51,14 +51,6 @@
"""
-# TODOS:
-# 10:16 < lifeless> make partial index writes safe
-# 10:16 < lifeless> implement 'knit.check()' like weave.check()
-# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave
-# always' approach.
-# move sha1 out of the content so that join is faster at verifying parents
-# record content length ?
-
from cStringIO import StringIO
from itertools import izip, chain
=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py 2009-02-21 00:05:58 +0000
+++ b/bzrlib/remote.py 2009-02-22 23:58:54 +0000
@@ -43,6 +43,7 @@
from bzrlib.smart import client, vfs
from bzrlib.revision import ensure_null, NULL_REVISION
from bzrlib.trace import mutter, note, warning
+from bzrlib.util import bencode
from bzrlib.versionedfile import record_to_fulltext_bytes
@@ -1345,42 +1346,60 @@
class RemoteStreamSink(repository.StreamSink):
+ def __init__(self, target_repo):
+ repository.StreamSink.__init__(self, target_repo)
+ self._resume_tokens = []
+
def _insert_real(self, stream, src_format):
self.target_repo._ensure_real()
sink = self.target_repo._real_repository._get_sink()
- return sink.insert_stream(stream, src_format)
+ result = sink.insert_stream(stream, src_format)
+ if not result:
+ self.target_repo.autopack()
+ return result
def insert_stream(self, stream, src_format):
repo = self.target_repo
- # Until we can handle deltas in stack repositories we can't hand all
- # the processing off to a remote server.
- if self.target_repo._fallback_repositories:
- return self._insert_real(stream, src_format)
client = repo._client
medium = client._medium
- if medium._is_remote_before((1,13)):
+ if medium._is_remote_before((1, 13)):
# No possible way this can work.
return self._insert_real(stream, src_format)
path = repo.bzrdir._path_for_remote_call(client)
- # XXX: Ugly but important for correctness, *will* be fixed during 1.13
- # cycle. Pushing a stream that is interrupted results in a fallback to
- # the _real_repositories sink *with a partial stream*. Thats bad
- # because we insert less data than bzr expected. To avoid this we do a
- # trial push to make sure the verb is accessible, and do not fallback
- # when actually pushing the stream. A cleanup patch is going to look at
- # rewinding/restarting the stream/partial buffering etc.
- byte_stream = self._stream_to_byte_stream([], src_format)
- try:
- response = client.call_with_body_stream(
- ('Repository.insert_stream', path), byte_stream)
- except errors.UnknownSmartMethod:
- medium._remember_remote_is_before((1,13))
- return self._insert_real(stream, src_format)
+ if not self._resume_tokens:
+ # XXX: Ugly but important for correctness, *will* be fixed during
+ # 1.13 cycle. Pushing a stream that is interrupted results in a
+ # fallback to the _real_repositories sink *with a partial stream*.
+ # Thats bad because we insert less data than bzr expected. To avoid
+ # this we do a trial push to make sure the verb is accessible, and
+ # do not fallback when actually pushing the stream. A cleanup patch
+ # is going to look at rewinding/restarting the stream/partial
+ # buffering etc.
+ byte_stream = self._stream_to_byte_stream([], src_format)
+ try:
+ resume_tokens = ''
+ response = client.call_with_body_stream(
+ ('Repository.insert_stream', path, resume_tokens), byte_stream)
+ except errors.UnknownSmartMethod:
+ medium._remember_remote_is_before((1,13))
+ return self._insert_real(stream, src_format)
byte_stream = self._stream_to_byte_stream(stream, src_format)
+ resume_tokens = ' '.join(self._resume_tokens)
response = client.call_with_body_stream(
- ('Repository.insert_stream', path), byte_stream)
- if response[0][0] not in ('ok', ):
+ ('Repository.insert_stream', path, resume_tokens), byte_stream)
+ if response[0][0] not in ('ok', 'missing-basis'):
raise errors.UnexpectedSmartServerResponse(response)
+ if response[0][0] == 'missing-basis':
+ tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
+ self._resume_tokens = tokens
+ return missing_keys
+ else:
+ if self.target_repo._real_repository is not None:
+ collection = getattr(self.target_repo._real_repository,
+ '_pack_collection', None)
+ if collection is not None:
+ collection.reload_pack_names()
+ return []
def _stream_to_byte_stream(self, stream, src_format):
bytes = []
=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py 2009-02-20 10:48:51 +0000
+++ b/bzrlib/repository.py 2009-02-22 23:58:54 +0000
@@ -3496,7 +3496,6 @@
from bzrlib.fetch import RepoFetcher
fetcher = RepoFetcher(self.target, self.source, revision_id,
pb, find_ghosts)
- self.target.autopack()
return fetcher.count_copied, fetcher.failed_revisions
def _get_target_pack_collection(self):
=== modified file 'bzrlib/smart/repository.py'
--- a/bzrlib/smart/repository.py 2009-02-20 08:26:50 +0000
+++ b/bzrlib/smart/repository.py 2009-02-22 23:58:54 +0000
@@ -413,10 +413,14 @@
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
- def do_repository_request(self, repository):
+ def do_repository_request(self, repository, resume_tokens):
"""StreamSink.insert_stream for a remote repository."""
repository.lock_write()
- repository.start_write_group()
+ tokens = [token for token in resume_tokens.split(' ') if token]
+ if tokens:
+ repository.resume_write_group(tokens)
+ else:
+ repository.start_write_group()
self.repository = repository
self.stream_decoder = pack.ContainerPushParser()
self.src_format = None
@@ -456,8 +460,32 @@
self.queue.put(StopIteration)
if self.insert_thread is not None:
self.insert_thread.join()
+ try:
+ missing_keys = set()
+ for prefix, versioned_file in (
+ ('texts', self.repository.texts),
+ ('inventories', self.repository.inventories),
+ ('revisions', self.repository.revisions),
+ ('signatures', self.repository.signatures),
+ ):
+ missing_keys.update((prefix,) + key for key in
+ versioned_file.get_missing_compression_parent_keys())
+ except NotImplementedError:
+ # cannot even attempt suspending.
+ pass
+ else:
+ if missing_keys:
+ # suspend the write group and tell the caller what we is
+ # missing. We know we can suspend or else we would not have
+ # entered this code path. (All repositories that can handle
+ # missing keys can handle suspending a write group).
+ write_group_tokens = self.repository.suspend_write_group()
+ # bzip needed? missing keys should typically be a small set.
+ # Should this be a streaming body response ?
+ missing_keys = sorted(missing_keys)
+ bytes = bencode.bencode((write_group_tokens, missing_keys))
+ return SuccessfulSmartServerResponse(('missing-basis', bytes))
+ # All finished.
self.repository.commit_write_group()
self.repository.unlock()
return SuccessfulSmartServerResponse(('ok', ))
-
-
=== modified file 'bzrlib/tests/blackbox/test_push.py'
--- a/bzrlib/tests/blackbox/test_push.py 2009-02-21 00:05:58 +0000
+++ b/bzrlib/tests/blackbox/test_push.py 2009-02-22 23:58:54 +0000
@@ -202,7 +202,7 @@
# being too low. If rpc_count increases, more network roundtrips have
# become necessary for this use case. Please do not adjust this number
# upwards without agreement from bzr's network support maintainers.
- self.assertEqual(61, rpc_count)
+ self.assertEqual(60, rpc_count)
def test_push_smart_stacked_streaming_acceptance(self):
self.setup_smart_server_with_call_log()
@@ -219,7 +219,7 @@
# being too low. If rpc_count increases, more network roundtrips have
# become necessary for this use case. Please do not adjust this number
# upwards without agreement from bzr's network support maintainers.
- self.assertEqual(101, rpc_count)
+ self.assertEqual(85, rpc_count)
remote = Branch.open('public')
self.assertEndsWith(remote.get_stacked_on_url(), '/parent')
=== modified file 'bzrlib/tests/test_pack_repository.py'
--- a/bzrlib/tests/test_pack_repository.py 2009-02-20 06:16:22 +0000
+++ b/bzrlib/tests/test_pack_repository.py 2009-02-22 23:58:54 +0000
@@ -816,7 +816,7 @@
def get_format(self):
return bzrdir.format_registry.make_bzrdir(self.format_name)
- def test_autopack_rpc_is_used_when_using_hpss(self):
+ def test_autopack_or_streaming_rpc_is_used_when_using_hpss(self):
# Make local and remote repos
tree = self.make_branch_and_tree('local', format=self.get_format())
self.make_branch_and_tree('remote', format=self.get_format())
@@ -831,7 +831,21 @@
self.hpss_calls = []
tree.commit('commit triggering pack')
tree.branch.push(remote_branch)
- self.assertTrue('PackRepository.autopack' in self.hpss_calls)
+ autopack_calls = len([call for call in self.hpss_calls if call ==
+ 'PackRepository.autopack'])
+ streaming_calls = len([call for call in self.hpss_calls if call ==
+ 'Repository.insert_stream'])
+ if autopack_calls:
+ # Non streaming server
+ self.assertEqual(1, autopack_calls)
+ self.assertEqual(0, streaming_calls)
+ else:
+ # Streaming was used, which autopacks on the remote end.
+ self.assertEqual(0, autopack_calls)
+ # NB: The 2 calls are because of the sanity check that the server
+ # supports the verb (see remote.py:RemoteSink.insert_stream for
+ # details).
+ self.assertEqual(2, streaming_calls)
def load_tests(basic_tests, module, test_loader):
More information about the bazaar-commits
mailing list