Rev 4063: Implement the separate source component for fetch - repository.StreamSource. in http://people.ubuntu.com/~robertc/baz2.0/branch.roundtrips
Robert Collins
robertc at robertcollins.net
Fri Feb 27 13:05:39 GMT 2009
At http://people.ubuntu.com/~robertc/baz2.0/branch.roundtrips
------------------------------------------------------------
revno: 4063
revision-id: robertc at robertcollins.net-20090227130536-wsqoyhyt3n11nc8d
parent: robertc at robertcollins.net-20090227035439-35kt6cqzacgjugwr
committer: Robert Collins <robertc at robertcollins.net>
branch nick: branch.roundtrips
timestamp: Sat 2009-02-28 00:05:36 +1100
message:
Implement the separate source component for fetch - repository.StreamSource.
=== modified file 'bzrlib/fetch.py'
--- a/bzrlib/fetch.py 2009-02-27 01:02:40 +0000
+++ b/bzrlib/fetch.py 2009-02-27 13:05:36 +0000
@@ -67,7 +67,6 @@
if set, try to limit to the data this revision references.
after running:
- count_copied -- number of revisions copied
This should not be used directly, it's essential a object to encapsulate
the logic in InterRepository.fetch().
@@ -83,8 +82,8 @@
like to remove this parameter.
"""
# result variables.
+ self.count_copied = 0
self.failed_revisions = []
- self.count_copied = 0
if to_repository.has_same_location(from_repository):
# repository.fetch should be taking care of this case.
raise errors.BzrError('RepoFetcher run '
@@ -146,14 +145,21 @@
# item_keys_introduced_by should have a richer API than it does at the
# moment, so that it can feed the progress information back to this
# function?
+ if (self.from_repository._format.rich_root_data and
+ not self.to_repository._format.rich_root_data):
+ raise errors.IncompatibleRepositories(
+ self.from_repository, self.to_repository,
+ "different rich-root support")
self.pb = bzrlib.ui.ui_factory.nested_progress_bar()
try:
+ source = self.from_repository._get_source(
+ self.to_repository._format)
+ stream = source.get_stream(search)
from_format = self.from_repository._format
- stream = self.get_stream(search, pp)
resume_tokens, missing_keys = self.sink.insert_stream(
stream, from_format, [])
if missing_keys:
- stream = self.get_stream_for_missing_keys(missing_keys)
+ stream = source.get_stream_for_missing_keys(missing_keys)
resume_tokens, missing_keys = self.sink.insert_stream(
stream, from_format, resume_tokens)
if missing_keys:
@@ -165,92 +171,11 @@
"second push failed to commit the fetch %r." % (
resume_tokens,))
self.sink.finished()
+ self.count_copied = source.count_copied
finally:
if self.pb is not None:
self.pb.finished()
- def get_stream(self, search, pp):
- phase = 'file'
- revs = search.get_keys()
- graph = self.from_repository.get_graph()
- revs = list(graph.iter_topo_order(revs))
- data_to_fetch = self.from_repository.item_keys_introduced_by(
- revs, self.pb)
- text_keys = []
- for knit_kind, file_id, revisions in data_to_fetch:
- if knit_kind != phase:
- phase = knit_kind
- # Make a new progress bar for this phase
- self.pb.finished()
- pp.next_phase()
- self.pb = bzrlib.ui.ui_factory.nested_progress_bar()
- if knit_kind == "file":
- # Accumulate file texts
- text_keys.extend([(file_id, revision) for revision in
- revisions])
- elif knit_kind == "inventory":
- # Now copy the file texts.
- from_texts = self.from_repository.texts
- yield ('texts', from_texts.get_record_stream(
- text_keys, self.to_repository._format._fetch_order,
- not self.to_repository._format._fetch_uses_deltas))
- # Cause an error if a text occurs after we have done the
- # copy.
- text_keys = None
- # Before we process the inventory we generate the root
- # texts (if necessary) so that the inventories references
- # will be valid.
- for _ in self._generate_root_texts(revs):
- yield _
- # NB: This currently reopens the inventory weave in source;
- # using a single stream interface instead would avoid this.
- self.pb.update("fetch inventory", 0, 1)
- from_weave = self.from_repository.inventories
- # we fetch only the referenced inventories because we do not
- # know for unselected inventories whether all their required
- # texts are present in the other repository - it could be
- # corrupt.
- yield ('inventories', from_weave.get_record_stream(
- [(rev_id,) for rev_id in revs],
- self.inventory_fetch_order(),
- not self.delta_on_metadata()))
- elif knit_kind == "signatures":
- # Nothing to do here; this will be taken care of when
- # _fetch_revision_texts happens.
- pass
- elif knit_kind == "revisions":
- for _ in self._fetch_revision_texts(revs, self.pb):
- yield _
- else:
- raise AssertionError("Unknown knit kind %r" % knit_kind)
- self.count_copied += len(revs)
-
- def get_stream_for_missing_keys(self, missing_keys):
- # missing keys can only occur when we are byte copying and not
- # translating (because translation means we don't send
- # unreconstructable deltas ever).
- keys = {}
- keys['texts'] = set()
- keys['revisions'] = set()
- keys['inventories'] = set()
- keys['signatures'] = set()
- for key in missing_keys:
- keys[key[0]].add(key[1:])
- if len(keys['revisions']):
- # If we allowed copying revisions at this point, we could end up
- # copying a revision without copying its required texts: a
- # violation of the requirements for repository integrity.
- raise AssertionError(
- 'cannot copy revisions to fill in missing deltas %s' % (
- keys['revisions'],))
- for substream_kind, keys in keys.iteritems():
- vf = getattr(self.from_repository, substream_kind)
- # Ask for full texts always so that we don't need more round trips
- # after this stream.
- stream = vf.get_record_stream(keys,
- self.to_repository._format._fetch_order, True)
- yield substream_kind, stream
-
def _revids_to_fetch(self):
"""Determines the exact revisions needed from self.from_repository to
install self._last_revision in self.to_repository.
@@ -271,44 +196,6 @@
except errors.NoSuchRevision, e:
raise InstallFailed([self._last_revision])
- def _fetch_revision_texts(self, revs, pb):
- # fetch signatures first and then the revision texts
- # may need to be a InterRevisionStore call here.
- from_sf = self.from_repository.signatures
- # A missing signature is just skipped.
- keys = [(rev_id,) for rev_id in revs]
- signatures = filter_absent(from_sf.get_record_stream(
- keys,
- self.to_repository._format._fetch_order,
- not self.to_repository._format._fetch_uses_deltas))
- # If a revision has a delta, this is actually expanded inside the
- # insert_record_stream code now, which is an alternate fix for
- # bug #261339
- from_rf = self.from_repository.revisions
- revisions = from_rf.get_record_stream(
- keys,
- self.to_repository._format._fetch_order,
- not self.delta_on_metadata())
- return [('signatures', signatures), ('revisions', revisions)]
-
- def _generate_root_texts(self, revs):
- """This will be called by __fetch between fetching weave texts and
- fetching the inventory weave.
-
- Subclasses should override this if they need to generate root texts
- after fetching weave texts.
- """
- return []
-
- def inventory_fetch_order(self):
- return self.to_repository._format._fetch_order
-
- def delta_on_metadata(self):
- src_serializer = self.from_repository._format._serializer
- target_serializer = self.to_repository._format._serializer
- return (self.to_repository._format._fetch_uses_deltas and
- src_serializer == target_serializer)
-
class Inter1and2Helper(object):
"""Helper for operations that convert data from model 1 and 2
@@ -397,21 +284,3 @@
rev_id_to_root_id.get(parent, root_id) == root_id)
yield FulltextContentFactory(key, parent_keys, None, '')
return [('texts', yield_roots())]
-
-
-class Model1toKnit2Fetcher(RepoFetcher):
- """Fetch from a Model1 repository into a Knit2 repository
- """
- def __init__(self, to_repository, from_repository, last_revision=None,
- pb=None, find_ghosts=True):
- self.helper = Inter1and2Helper(from_repository)
- RepoFetcher.__init__(self, to_repository, from_repository,
- last_revision, pb, find_ghosts)
-
- def _generate_root_texts(self, revs):
- return self.helper.generate_root_texts(revs)
-
- def inventory_fetch_order(self):
- return 'topological'
-
-Knit1to2Fetcher = Model1toKnit2Fetcher
=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py 2009-02-27 01:21:59 +0000
+++ b/bzrlib/remote.py 2009-02-27 13:05:36 +0000
@@ -624,6 +624,10 @@
"""See Repository._get_sink()."""
return RemoteStreamSink(self)
+ def _get_source(self, to_format):
+ """Return a source for streaming from this repository."""
+ return RemoteStreamSource(self, to_format)
+
def has_revision(self, revision_id):
"""See Repository.has_revision()."""
if revision_id == NULL_REVISION:
@@ -1405,9 +1409,6 @@
class RemoteStreamSink(repository.StreamSink):
- def __init__(self, target_repo):
- repository.StreamSink.__init__(self, target_repo)
-
def _insert_real(self, stream, src_format, resume_tokens):
self.target_repo._ensure_real()
sink = self.target_repo._real_repository._get_sink()
@@ -1491,6 +1492,10 @@
yield b
+class RemoteStreamSource(repository.StreamSource):
+ """Stream data from a remote server."""
+
+
class RemoteBranchLockableFiles(LockableFiles):
"""A 'LockableFiles' implementation that talks to a smart server.
=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py 2009-02-27 03:54:39 +0000
+++ b/bzrlib/repository.py 2009-02-27 13:05:36 +0000
@@ -1226,6 +1226,10 @@
"""Return a sink for streaming into this repository."""
return StreamSink(self)
+ def _get_source(self, to_format):
+ """Return a source for streaming from this repository."""
+ return StreamSource(self, to_format)
+
@needs_read_lock
def has_revision(self, revision_id):
"""True if this repository has a copy of the revision."""
@@ -2559,8 +2563,21 @@
self.target_get_graph = self.target.get_graph
self.target_get_parent_map = self.target.get_parent_map
+ @needs_write_lock
def copy_content(self, revision_id=None):
- raise NotImplementedError(self.copy_content)
+ """Make a complete copy of the content in self into destination.
+
+ This is a destructive operation! Do not use it on existing
+ repositories.
+
+ :param revision_id: Only copy the content needed to construct
+ revision_id and its parents.
+ """
+ try:
+ self.target.set_make_working_trees(self.source.make_working_trees())
+ except NotImplementedError:
+ pass
+ self.target.fetch(self.source, revision_id=revision_id)
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
"""Fetch the content required to construct revision_id.
@@ -2574,14 +2591,15 @@
:returns: (copied_revision_count, failures).
"""
- # Normally we should find a specific InterRepository subclass to do
- # the fetch; if nothing else then at least InterSameDataRepository.
- # If none of them is suitable it looks like fetching is not possible;
- # we try to give a good message why. _assert_same_model will probably
- # give a helpful message; otherwise a generic one.
- self._assert_same_model(self.source, self.target)
- raise errors.IncompatibleRepositories(self.source, self.target,
- "no suitableInterRepository found")
+ from bzrlib.fetch import RepoFetcher
+ mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
+ self.source, self.source._format, self.target,
+ self.target._format)
+ f = RepoFetcher(to_repository=self.target,
+ from_repository=self.source,
+ last_revision=revision_id,
+ pb=pb, find_ghosts=find_ghosts)
+ return f.count_copied, f.failed_revisions
def _walk_to_common_revisions(self, revision_ids):
"""Walk out from revision_ids in source to revisions target has.
@@ -2727,42 +2745,6 @@
def is_compatible(source, target):
return InterRepository._same_model(source, target)
- @needs_write_lock
- def copy_content(self, revision_id=None):
- """Make a complete copy of the content in self into destination.
-
- This copies both the repository's revision data, and configuration information
- such as the make_working_trees setting.
-
- This is a destructive operation! Do not use it on existing
- repositories.
-
- :param revision_id: Only copy the content needed to construct
- revision_id and its parents.
- """
- try:
- self.target.set_make_working_trees(self.source.make_working_trees())
- except NotImplementedError:
- pass
- # but don't bother fetching if we have the needed data now.
- if (revision_id not in (None, _mod_revision.NULL_REVISION) and
- self.target.has_revision(revision_id)):
- return
- self.target.fetch(self.source, revision_id=revision_id)
-
- @needs_write_lock
- def fetch(self, revision_id=None, pb=None, find_ghosts=False):
- """See InterRepository.fetch()."""
- from bzrlib.fetch import RepoFetcher
- mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
- self.source, self.source._format, self.target,
- self.target._format)
- f = RepoFetcher(to_repository=self.target,
- from_repository=self.source,
- last_revision=revision_id,
- pb=pb, find_ghosts=find_ghosts)
- return f.count_copied, f.failed_revisions
-
class InterWeaveRepo(InterSameDataRepository):
"""Optimised code paths between Weave based repositories.
@@ -2997,7 +2979,6 @@
return fetcher.count_copied, fetcher.failed_revisions
mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
self.source, self.source._format, self.target, self.target._format)
- self.count_copied = 0
if revision_id is None:
# TODO:
# everything to do - use pack logic
@@ -3096,123 +3077,6 @@
return self.source.revision_ids_to_search_result(result_set)
-class InterModel1and2(InterRepository):
-
- @classmethod
- def _get_repo_format_to_test(self):
- return None
-
- @staticmethod
- def is_compatible(source, target):
- if not source.supports_rich_root() and target.supports_rich_root():
- return True
- else:
- return False
-
- @needs_write_lock
- def fetch(self, revision_id=None, pb=None, find_ghosts=False):
- """See InterRepository.fetch()."""
- from bzrlib.fetch import Model1toKnit2Fetcher
- f = Model1toKnit2Fetcher(to_repository=self.target,
- from_repository=self.source,
- last_revision=revision_id,
- pb=pb, find_ghosts=find_ghosts)
- return f.count_copied, f.failed_revisions
-
- @needs_write_lock
- def copy_content(self, revision_id=None):
- """Make a complete copy of the content in self into destination.
-
- This is a destructive operation! Do not use it on existing
- repositories.
-
- :param revision_id: Only copy the content needed to construct
- revision_id and its parents.
- """
- try:
- self.target.set_make_working_trees(self.source.make_working_trees())
- except NotImplementedError:
- pass
- # but don't bother fetching if we have the needed data now.
- if (revision_id not in (None, _mod_revision.NULL_REVISION) and
- self.target.has_revision(revision_id)):
- return
- self.target.fetch(self.source, revision_id=revision_id)
-
-
-class InterKnit1and2(InterKnitRepo):
-
- @classmethod
- def _get_repo_format_to_test(self):
- return None
-
- @staticmethod
- def is_compatible(source, target):
- """Be compatible with Knit1 source and Knit3 target"""
- try:
- from bzrlib.repofmt.knitrepo import (
- RepositoryFormatKnit1,
- RepositoryFormatKnit3,
- )
- from bzrlib.repofmt.pack_repo import (
- RepositoryFormatKnitPack1,
- RepositoryFormatKnitPack3,
- RepositoryFormatKnitPack4,
- RepositoryFormatKnitPack5,
- RepositoryFormatKnitPack5RichRoot,
- RepositoryFormatKnitPack6,
- RepositoryFormatKnitPack6RichRoot,
- RepositoryFormatPackDevelopment2,
- RepositoryFormatPackDevelopment2Subtree,
- )
- norichroot = (
- RepositoryFormatKnit1, # no rr, no subtree
- RepositoryFormatKnitPack1, # no rr, no subtree
- RepositoryFormatPackDevelopment2, # no rr, no subtree
- RepositoryFormatKnitPack5, # no rr, no subtree
- RepositoryFormatKnitPack6, # no rr, no subtree
- )
- richroot = (
- RepositoryFormatKnit3, # rr, subtree
- RepositoryFormatKnitPack3, # rr, subtree
- RepositoryFormatKnitPack4, # rr, no subtree
- RepositoryFormatKnitPack5RichRoot,# rr, no subtree
- RepositoryFormatKnitPack6RichRoot,# rr, no subtree
- RepositoryFormatPackDevelopment2Subtree, # rr, subtree
- )
- for format in norichroot:
- if format.rich_root_data:
- raise AssertionError('Format %s is a rich-root format'
- ' but is included in the non-rich-root list'
- % (format,))
- for format in richroot:
- if not format.rich_root_data:
- raise AssertionError('Format %s is not a rich-root format'
- ' but is included in the rich-root list'
- % (format,))
- # TODO: One alternative is to just check format.rich_root_data,
- # instead of keeping membership lists. However, the formats
- # *also* have to use the same 'Knit' style of storage
- # (line-deltas, fulltexts, etc.)
- return (isinstance(source._format, norichroot) and
- isinstance(target._format, richroot))
- except AttributeError:
- return False
-
- @needs_write_lock
- def fetch(self, revision_id=None, pb=None, find_ghosts=False):
- """See InterRepository.fetch()."""
- from bzrlib.fetch import Knit1to2Fetcher
- mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
- self.source, self.source._format, self.target,
- self.target._format)
- f = Knit1to2Fetcher(to_repository=self.target,
- from_repository=self.source,
- last_revision=revision_id,
- pb=pb, find_ghosts=find_ghosts)
- return f.count_copied, f.failed_revisions
-
-
class InterDifferingSerializer(InterKnitRepo):
@classmethod
@@ -3507,8 +3371,6 @@
InterRepository.register_optimiser(InterSameDataRepository)
InterRepository.register_optimiser(InterWeaveRepo)
InterRepository.register_optimiser(InterKnitRepo)
-InterRepository.register_optimiser(InterModel1and2)
-InterRepository.register_optimiser(InterKnit1and2)
InterRepository.register_optimiser(InterPackRepo)
InterRepository.register_optimiser(InterOtherToRemote)
InterRepository.register_optimiser(InterRemoteToOther)
@@ -3784,3 +3646,147 @@
if self.target_repo._format._fetch_reconcile:
self.target_repo.reconcile()
+
+class StreamSource(object):
+ """A source of a stream for fetching between repositories.
+
+ :ivar count_copied: number of revisions streamed.
+ """
+
+ def __init__(self, from_repository, to_format):
+ """Create a StreamSource streaming from from_repository."""
+ self.from_repository = from_repository
+ self.to_format = to_format
+ self.count_copied = 0
+
+ def delta_on_metadata(self):
+ """Return True if delta's are permitted on metadata streams.
+
+ That is on revisions and signatures.
+ """
+ src_serializer = self.from_repository._format._serializer
+ target_serializer = self.to_format._serializer
+ return (self.to_format._fetch_uses_deltas and
+ src_serializer == target_serializer)
+
+ def _fetch_revision_texts(self, revs):
+ # fetch signatures first and then the revision texts
+ # may need to be a InterRevisionStore call here.
+ from_sf = self.from_repository.signatures
+ # A missing signature is just skipped.
+ keys = [(rev_id,) for rev_id in revs]
+ signatures = filter_absent(from_sf.get_record_stream(
+ keys,
+ self.to_format._fetch_order,
+ not self.to_format._fetch_uses_deltas))
+ # If a revision has a delta, this is actually expanded inside the
+ # insert_record_stream code now, which is an alternate fix for
+ # bug #261339
+ from_rf = self.from_repository.revisions
+ revisions = from_rf.get_record_stream(
+ keys,
+ self.to_format._fetch_order,
+ not self.delta_on_metadata())
+ return [('signatures', signatures), ('revisions', revisions)]
+
+ def _generate_root_texts(self, revs):
+ """This will be called by __fetch between fetching weave texts and
+ fetching the inventory weave.
+
+ Subclasses should override this if they need to generate root texts
+ after fetching weave texts.
+ """
+ if self._rich_root_upgrade():
+ import bzrlib.fetch
+ return bzrlib.fetch.Inter1and2Helper(
+ self.from_repository).generate_root_texts(revs)
+ else:
+ return []
+
+ def get_stream(self, search):
+ phase = 'file'
+ revs = search.get_keys()
+ graph = self.from_repository.get_graph()
+ revs = list(graph.iter_topo_order(revs))
+ data_to_fetch = self.from_repository.item_keys_introduced_by(revs)
+ text_keys = []
+ for knit_kind, file_id, revisions in data_to_fetch:
+ if knit_kind != phase:
+ phase = knit_kind
+ # Make a new progress bar for this phase
+ if knit_kind == "file":
+ # Accumulate file texts
+ text_keys.extend([(file_id, revision) for revision in
+ revisions])
+ elif knit_kind == "inventory":
+ # Now copy the file texts.
+ from_texts = self.from_repository.texts
+ yield ('texts', from_texts.get_record_stream(
+ text_keys, self.to_format._fetch_order,
+ not self.to_format._fetch_uses_deltas))
+ # Cause an error if a text occurs after we have done the
+ # copy.
+ text_keys = None
+ # Before we process the inventory we generate the root
+ # texts (if necessary) so that the inventories references
+ # will be valid.
+ for _ in self._generate_root_texts(revs):
+ yield _
+ # NB: This currently reopens the inventory weave in source;
+ # using a single stream interface instead would avoid this.
+ from_weave = self.from_repository.inventories
+ # we fetch only the referenced inventories because we do not
+ # know for unselected inventories whether all their required
+ # texts are present in the other repository - it could be
+ # corrupt.
+ yield ('inventories', from_weave.get_record_stream(
+ [(rev_id,) for rev_id in revs],
+ self.inventory_fetch_order(),
+ not self.delta_on_metadata()))
+ elif knit_kind == "signatures":
+ # Nothing to do here; this will be taken care of when
+ # _fetch_revision_texts happens.
+ pass
+ elif knit_kind == "revisions":
+ for record in self._fetch_revision_texts(revs):
+ yield record
+ else:
+ raise AssertionError("Unknown knit kind %r" % knit_kind)
+ self.count_copied += len(revs)
+
+ def get_stream_for_missing_keys(self, missing_keys):
+ # missing keys can only occur when we are byte copying and not
+ # translating (because translation means we don't send
+ # unreconstructable deltas ever).
+ keys = {}
+ keys['texts'] = set()
+ keys['revisions'] = set()
+ keys['inventories'] = set()
+ keys['signatures'] = set()
+ for key in missing_keys:
+ keys[key[0]].add(key[1:])
+ if len(keys['revisions']):
+ # If we allowed copying revisions at this point, we could end up
+ # copying a revision without copying its required texts: a
+ # violation of the requirements for repository integrity.
+ raise AssertionError(
+ 'cannot copy revisions to fill in missing deltas %s' % (
+ keys['revisions'],))
+ for substream_kind, keys in keys.iteritems():
+ vf = getattr(self.from_repository, substream_kind)
+ # Ask for full texts always so that we don't need more round trips
+ # after this stream.
+ stream = vf.get_record_stream(keys,
+ self.to_format._fetch_order, True)
+ yield substream_kind, stream
+
+ def inventory_fetch_order(self):
+ if self._rich_root_upgrade():
+ return 'topological'
+ else:
+ return self.to_format._fetch_order
+
+ def _rich_root_upgrade(self):
+ return (not self.from_repository._format.rich_root_data and
+ self.to_format.rich_root_data)
+
=== modified file 'bzrlib/tests/interrepository_implementations/__init__.py'
--- a/bzrlib/tests/interrepository_implementations/__init__.py 2009-01-17 01:30:58 +0000
+++ b/bzrlib/tests/interrepository_implementations/__init__.py 2009-02-27 13:05:36 +0000
@@ -33,8 +33,6 @@
from bzrlib.repository import (
InterKnitRepo,
- InterKnit1and2,
- InterModel1and2,
InterRepository,
)
from bzrlib.tests import (
@@ -100,14 +98,14 @@
result.append((optimiser_class,
format_to_test, format_to_test))
# if there are specific combinations we want to use, we can add them
- # here.
- result.append((InterModel1and2,
+ # here. We want to test rich root upgrading.
+ result.append((InterRepository,
weaverepo.RepositoryFormat5(),
knitrepo.RepositoryFormatKnit3()))
- result.append((InterModel1and2,
+ result.append((InterRepository,
knitrepo.RepositoryFormatKnit1(),
knitrepo.RepositoryFormatKnit3()))
- result.append((InterKnit1and2,
+ result.append((InterRepository,
knitrepo.RepositoryFormatKnit1(),
knitrepo.RepositoryFormatKnit3()))
result.append((InterKnitRepo,
More information about the bazaar-commits
mailing list