Rev 4023: Refactoring of fetch to have a sender and sink component enabling splitting the logic over a network stream. (Robert Collins, Andrew Bennetts) in http://people.ubuntu.com/~robertc/baz2.0/fetch.sinks
Robert Collins
robertc at robertcollins.net
Fri Feb 20 05:05:28 GMT 2009
At http://people.ubuntu.com/~robertc/baz2.0/fetch.sinks
------------------------------------------------------------
revno: 4023
revision-id: robertc at robertcollins.net-20090220050525-o3i6wve7yigkx6i9
parent: pqm at pqm.ubuntu.com-20090220022509-leun2dkfewbwcgn7
committer: Robert Collins <robertc at robertcollins.net>
branch nick: fetch.sinks
timestamp: Fri 2009-02-20 16:05:25 +1100
message:
Refactoring of fetch to have a sender and sink component enabling splitting the logic over a network stream. (Robert Collins, Andrew Bennetts)
=== modified file 'NEWS'
--- a/NEWS 2009-02-20 02:25:09 +0000
+++ b/NEWS 2009-02-20 05:05:25 +0000
@@ -62,6 +62,10 @@
INTERNALS:
+ * ``bzrlib.fetch`` is now composed of a sender and a sink component
+ allowing for decoupling over a network connection. (Andrew Bennetts,
+ Robert Collins)
+
* ``bzrlib.tests.run_suite`` accepts a runner_class parameter
supporting the use of different runners. (Robert Collins)
@@ -72,6 +76,10 @@
* ``RepositoryFormat`` objects now have a ``network_name`` for passing
the format across RPC calls. (Robert Collins, Andrew Bennetts)
+ * ``RepositoryFormat`` objects now all have a new attribute
+ ``_serializer`` used by fetch when reserialising is required.
+ (Robert Collins, Andrew Bennetts)
+
* Some methods have been pulled up from ``BzrBranch`` to ``Branch``
to aid branch types that are not bzr branch objects (like
RemoteBranch). (Robert Collins, Andrew Bennetts)
=== modified file 'bzrlib/fetch.py'
--- a/bzrlib/fetch.py 2009-02-02 05:56:34 +0000
+++ b/bzrlib/fetch.py 2009-02-20 05:05:25 +0000
@@ -36,10 +36,7 @@
import bzrlib.errors as errors
from bzrlib.errors import InstallFailed
from bzrlib.progress import ProgressPhase
-from bzrlib.revision import is_null, NULL_REVISION
-from bzrlib.symbol_versioning import (deprecated_function,
- deprecated_method,
- )
+from bzrlib.revision import NULL_REVISION
from bzrlib.tsort import topo_sort
from bzrlib.trace import mutter
import bzrlib.ui
@@ -95,6 +92,7 @@
'%r and %r' % (to_repository, from_repository))
self.to_repository = to_repository
self.from_repository = from_repository
+ self.sink = to_repository._get_sink()
# must not mutate self._last_revision as its potentially a shared instance
self._last_revision = last_revision
self.find_ghosts = find_ghosts
@@ -131,6 +129,12 @@
This initialises all the needed variables, and then fetches the
requested revisions, finally clearing the progress bar.
"""
+ # Roughly this is what we're aiming for fetch to become:
+ #
+ # missing = self.sink.insert_stream(self.source.get_stream(search))
+ # if missing:
+ # missing = self.sink.insert_stream(self.source.get_items(missing))
+ # assert not missing
self.count_total = 0
self.file_ids_names = {}
pp = ProgressPhase('Transferring', 4, self.pb)
@@ -139,11 +143,7 @@
search = self._revids_to_fetch()
if search is None:
return
- if getattr(self, '_fetch_everything_for_search', None) is not None:
- self._fetch_everything_for_search(search, pp)
- else:
- # backward compatibility
- self._fetch_everything_for_revisions(search.get_keys, pp)
+ self._fetch_everything_for_search(search, pp)
finally:
self.pb.clear()
@@ -157,58 +157,73 @@
# item_keys_introduced_by should have a richer API than it does at the
# moment, so that it can feed the progress information back to this
# function?
- phase = 'file'
pb = bzrlib.ui.ui_factory.nested_progress_bar()
try:
- revs = search.get_keys()
- graph = self.from_repository.get_graph()
- revs = list(graph.iter_topo_order(revs))
- data_to_fetch = self.from_repository.item_keys_introduced_by(revs,
- pb)
- text_keys = []
- for knit_kind, file_id, revisions in data_to_fetch:
- if knit_kind != phase:
- phase = knit_kind
- # Make a new progress bar for this phase
- pb.finished()
- pp.next_phase()
- pb = bzrlib.ui.ui_factory.nested_progress_bar()
- if knit_kind == "file":
- # Accumulate file texts
- text_keys.extend([(file_id, revision) for revision in
- revisions])
- elif knit_kind == "inventory":
- # Now copy the file texts.
- to_texts = self.to_repository.texts
- from_texts = self.from_repository.texts
- to_texts.insert_record_stream(from_texts.get_record_stream(
- text_keys, self.to_repository._fetch_order,
- not self.to_repository._fetch_uses_deltas))
- # Cause an error if a text occurs after we have done the
- # copy.
- text_keys = None
- # Before we process the inventory we generate the root
- # texts (if necessary) so that the inventories references
- # will be valid.
- self._generate_root_texts(revs)
- # NB: This currently reopens the inventory weave in source;
- # using a single stream interface instead would avoid this.
- self._fetch_inventory_weave(revs, pb)
- elif knit_kind == "signatures":
- # Nothing to do here; this will be taken care of when
- # _fetch_revision_texts happens.
- pass
- elif knit_kind == "revisions":
- self._fetch_revision_texts(revs, pb)
- else:
- raise AssertionError("Unknown knit kind %r" % knit_kind)
- if self.to_repository._fetch_reconcile:
- self.to_repository.reconcile()
+ from_format = self.from_repository._format
+ stream = self.get_stream(search, pb, pp)
+ self.sink.insert_stream(stream, from_format)
+ self.sink.finished()
finally:
if pb is not None:
pb.finished()
+
+ def get_stream(self, search, pb, pp):
+ phase = 'file'
+ revs = search.get_keys()
+ graph = self.from_repository.get_graph()
+ revs = list(graph.iter_topo_order(revs))
+ data_to_fetch = self.from_repository.item_keys_introduced_by(revs,
+ pb)
+ text_keys = []
+ for knit_kind, file_id, revisions in data_to_fetch:
+ if knit_kind != phase:
+ phase = knit_kind
+ # Make a new progress bar for this phase
+ pb.finished()
+ pp.next_phase()
+ pb = bzrlib.ui.ui_factory.nested_progress_bar()
+ if knit_kind == "file":
+ # Accumulate file texts
+ text_keys.extend([(file_id, revision) for revision in
+ revisions])
+ elif knit_kind == "inventory":
+ # Now copy the file texts.
+ to_texts = self.to_repository.texts
+ from_texts = self.from_repository.texts
+ yield ('texts', from_texts.get_record_stream(
+ text_keys, self.to_repository._fetch_order,
+ not self.to_repository._fetch_uses_deltas))
+ # Cause an error if a text occurs after we have done the
+ # copy.
+ text_keys = None
+ # Before we process the inventory we generate the root
+ # texts (if necessary) so that the inventories references
+ # will be valid.
+ for _ in self._generate_root_texts(revs):
+ yield _
+ # NB: This currently reopens the inventory weave in source;
+ # using a single stream interface instead would avoid this.
+ pb.update("fetch inventory", 0, 1)
+ from_weave = self.from_repository.inventories
+ # we fetch only the referenced inventories because we do not
+ # know for unselected inventories whether all their required
+ # texts are present in the other repository - it could be
+ # corrupt.
+ yield ('inventories', from_weave.get_record_stream(
+ [(rev_id,) for rev_id in revs],
+ self.inventory_fetch_order(),
+ not self.delta_on_metadata()))
+ elif knit_kind == "signatures":
+ # Nothing to do here; this will be taken care of when
+ # _fetch_revision_texts happens.
+ pass
+ elif knit_kind == "revisions":
+ for _ in self._fetch_revision_texts(revs, pb):
+ yield _
+ else:
+ raise AssertionError("Unknown knit kind %r" % knit_kind)
self.count_copied += len(revs)
-
+
def _revids_to_fetch(self):
"""Determines the exact revisions needed from self.from_repository to
install self._last_revision in self.to_repository.
@@ -229,45 +244,25 @@
except errors.NoSuchRevision, e:
raise InstallFailed([self._last_revision])
- def _fetch_inventory_weave(self, revs, pb):
- pb.update("fetch inventory", 0, 2)
- to_weave = self.to_repository.inventories
- # just merge, this is optimisable and its means we don't
- # copy unreferenced data such as not-needed inventories.
- pb.update("fetch inventory", 1, 3)
- from_weave = self.from_repository.inventories
- pb.update("fetch inventory", 2, 3)
- # we fetch only the referenced inventories because we do not
- # know for unselected inventories whether all their required
- # texts are present in the other repository - it could be
- # corrupt.
- to_weave.insert_record_stream(from_weave.get_record_stream(
- [(rev_id,) for rev_id in revs],
- self.to_repository._fetch_order,
- not self.to_repository._fetch_uses_deltas))
-
def _fetch_revision_texts(self, revs, pb):
# fetch signatures first and then the revision texts
# may need to be a InterRevisionStore call here.
- to_sf = self.to_repository.signatures
from_sf = self.from_repository.signatures
# A missing signature is just skipped.
- to_sf.insert_record_stream(filter_absent(from_sf.get_record_stream(
- [(rev_id,) for rev_id in revs],
+ keys = [(rev_id,) for rev_id in revs]
+ signatures = filter_absent(from_sf.get_record_stream(
+ keys,
self.to_repository._fetch_order,
- not self.to_repository._fetch_uses_deltas)))
- self._fetch_just_revision_texts(revs)
-
- def _fetch_just_revision_texts(self, version_ids):
- to_rf = self.to_repository.revisions
- from_rf = self.from_repository.revisions
+ not self.to_repository._fetch_uses_deltas))
# If a revision has a delta, this is actually expanded inside the
# insert_record_stream code now, which is an alternate fix for
# bug #261339
- to_rf.insert_record_stream(from_rf.get_record_stream(
- [(rev_id,) for rev_id in version_ids],
+ from_rf = self.from_repository.revisions
+ revisions = from_rf.get_record_stream(
+ keys,
self.to_repository._fetch_order,
- not self.to_repository._fetch_uses_deltas))
+ not self.delta_on_metadata())
+ return [('signatures', signatures), ('revisions', revisions)]
def _generate_root_texts(self, revs):
"""This will be called by __fetch between fetching weave texts and
@@ -276,7 +271,16 @@
Subclasses should override this if they need to generate root texts
after fetching weave texts.
"""
- pass
+ return []
+
+ def inventory_fetch_order(self):
+ return self.to_repository._fetch_order
+
+ def delta_on_metadata(self):
+ src_serializer = self.from_repository._format._serializer
+ target_serializer = self.to_repository._format._serializer
+ return (self.to_repository._fetch_uses_deltas and
+ src_serializer == target_serializer)
class Inter1and2Helper(object):
@@ -285,14 +289,12 @@
This is for use by fetchers and converters.
"""
- def __init__(self, source, target):
+ def __init__(self, source):
"""Constructor.
:param source: The repository data comes from
- :param target: The repository data goes to
"""
self.source = source
- self.target = target
def iter_rev_trees(self, revs):
"""Iterate through RevisionTrees efficiently.
@@ -338,7 +340,6 @@
:param revs: the revisions to include
"""
- to_texts = self.target.texts
graph = self.source.get_graph()
parent_map = graph.get_parent_map(revs)
rev_order = topo_sort(parent_map)
@@ -368,25 +369,7 @@
if parent != NULL_REVISION and
rev_id_to_root_id.get(parent, root_id) == root_id)
yield FulltextContentFactory(key, parent_keys, None, '')
- to_texts.insert_record_stream(yield_roots())
-
- def regenerate_inventory(self, revs):
- """Generate a new inventory versionedfile in target, convertin data.
-
- The inventory is retrieved from the source, (deserializing it), and
- stored in the target (reserializing it in a different format).
- :param revs: The revisions to include
- """
- for tree in self.iter_rev_trees(revs):
- parents = tree.get_parent_ids()
- self.target.add_inventory(tree.get_revision_id(), tree.inventory,
- parents)
-
- def fetch_revisions(self, revision_ids):
- # TODO: should this batch them up rather than requesting 10,000
- # revisions at once?
- for revision in self.source.get_revisions(revision_ids):
- self.target.add_revision(revision.revision_id, revision)
+ return [('texts', yield_roots())]
class Model1toKnit2Fetcher(RepoFetcher):
@@ -394,49 +377,14 @@
"""
def __init__(self, to_repository, from_repository, last_revision=None,
pb=None, find_ghosts=True):
- self.helper = Inter1and2Helper(from_repository, to_repository)
- RepoFetcher.__init__(self, to_repository, from_repository,
- last_revision, pb, find_ghosts)
-
- def _generate_root_texts(self, revs):
- self.helper.generate_root_texts(revs)
-
- def _fetch_inventory_weave(self, revs, pb):
- self.helper.regenerate_inventory(revs)
-
- def _fetch_revision_texts(self, revs, pb):
- """Fetch revision object texts"""
- count = 0
- total = len(revs)
- for rev in revs:
- pb.update('copying revisions', count, total)
- try:
- sig_text = self.from_repository.get_signature_text(rev)
- self.to_repository.add_signature_text(rev, sig_text)
- except errors.NoSuchRevision:
- # not signed.
- pass
- self._copy_revision(rev)
- count += 1
-
- def _copy_revision(self, rev):
- self.helper.fetch_revisions([rev])
-
-
-class Knit1to2Fetcher(RepoFetcher):
- """Fetch from a Knit1 repository into a Knit2 repository"""
-
- def __init__(self, to_repository, from_repository, last_revision=None,
- pb=None, find_ghosts=True):
- self.helper = Inter1and2Helper(from_repository, to_repository)
- RepoFetcher.__init__(self, to_repository, from_repository,
- last_revision, pb, find_ghosts)
-
- def _generate_root_texts(self, revs):
- self.helper.generate_root_texts(revs)
-
- def _fetch_inventory_weave(self, revs, pb):
- self.helper.regenerate_inventory(revs)
-
- def _fetch_just_revision_texts(self, version_ids):
- self.helper.fetch_revisions(version_ids)
+ self.helper = Inter1and2Helper(from_repository)
+ RepoFetcher.__init__(self, to_repository, from_repository,
+ last_revision, pb, find_ghosts)
+
+ def _generate_root_texts(self, revs):
+ return self.helper.generate_root_texts(revs)
+
+ def inventory_fetch_order(self):
+ return 'topological'
+
+Knit1to2Fetcher = Model1toKnit2Fetcher
=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py 2009-02-20 01:45:00 +0000
+++ b/bzrlib/remote.py 2009-02-20 05:05:25 +0000
@@ -18,6 +18,7 @@
# across to run on the server.
import bz2
+import struct
from bzrlib import (
branch,
@@ -325,6 +326,15 @@
self._creating_repo._ensure_real()
return self._creating_repo._real_repository._format.network_name()
+ @property
+ def _serializer(self):
+ # We should only be getting asked for the serializer for
+ # RemoteRepositoryFormat objects except when the RemoteRepositoryFormat
+ # object is a concrete instance for a RemoteRepository. In this case
+ # we know the creating_repo and can use it to supply the serializer.
+ self._creating_repo._ensure_real()
+ return self._creating_repo._real_repository._format._serializer
+
class RemoteRepository(_RpcHelper):
"""Repository accessed over rpc.
@@ -471,6 +481,11 @@
return revision_graph
+ def _get_sink(self):
+ """See Repository._get_sink()."""
+ self._ensure_real()
+ return self._real_repository._get_sink()
+
def has_revision(self, revision_id):
"""See Repository.has_revision()."""
if revision_id == NULL_REVISION:
@@ -1256,6 +1271,10 @@
raise errors.UnexpectedSmartServerResponse(response)
+def _length_prefix(bytes):
+ return struct.pack('!L', len(bytes))
+
+
class RemoteBranchLockableFiles(LockableFiles):
"""A 'LockableFiles' implementation that talks to a smart server.
=== modified file 'bzrlib/repofmt/weaverepo.py'
--- a/bzrlib/repofmt/weaverepo.py 2009-02-19 03:09:55 +0000
+++ b/bzrlib/repofmt/weaverepo.py 2009-02-20 05:05:25 +0000
@@ -192,14 +192,11 @@
class WeaveMetaDirRepository(MetaDirVersionedFileRepository):
"""A subclass of MetaDirRepository to set weave specific policy."""
- @property
- def _serializer(self):
- return xml5.serializer_v5
-
def __init__(self, _format, a_bzrdir, control_files):
super(WeaveMetaDirRepository, self).__init__(_format, a_bzrdir, control_files)
self._fetch_order = 'topological'
self._fetch_reconcile = True
+ self._serializer = _format._serializer
@needs_read_lock
def _all_possible_ids(self):
@@ -390,6 +387,9 @@
_versionedfile_class = weave.WeaveFile
_matchingbzrdir = bzrdir.BzrDirFormat5()
+ @property
+ def _serializer(self):
+ return xml5.serializer_v5
def __init__(self):
super(RepositoryFormat5, self).__init__()
@@ -410,9 +410,8 @@
weave.WeaveFile, mapper, repo.is_locked)
def _get_revisions(self, repo_transport, repo):
- from bzrlib.xml5 import serializer_v5
return RevisionTextStore(repo_transport.clone('revision-store'),
- serializer_v5, False, versionedfile.PrefixMapper(),
+ xml5.serializer_v5, False, versionedfile.PrefixMapper(),
repo.is_locked, repo.is_write_locked)
def _get_signatures(self, repo_transport, repo):
@@ -438,6 +437,9 @@
_versionedfile_class = weave.WeaveFile
_matchingbzrdir = bzrdir.BzrDirFormat6()
+ @property
+ def _serializer(self):
+ return xml5.serializer_v5
def __init__(self):
super(RepositoryFormat6, self).__init__()
@@ -458,9 +460,8 @@
weave.WeaveFile, mapper, repo.is_locked)
def _get_revisions(self, repo_transport, repo):
- from bzrlib.xml5 import serializer_v5
return RevisionTextStore(repo_transport.clone('revision-store'),
- serializer_v5, False, versionedfile.HashPrefixMapper(),
+ xml5.serializer_v5, False, versionedfile.HashPrefixMapper(),
repo.is_locked, repo.is_write_locked)
def _get_signatures(self, repo_transport, repo):
@@ -489,6 +490,9 @@
_versionedfile_class = weave.WeaveFile
supports_ghosts = False
+ @property
+ def _serializer(self):
+ return xml5.serializer_v5
def get_format_string(self):
"""See RepositoryFormat.get_format_string()."""
@@ -507,9 +511,8 @@
weave.WeaveFile, mapper, repo.is_locked)
def _get_revisions(self, repo_transport, repo):
- from bzrlib.xml5 import serializer_v5
return RevisionTextStore(repo_transport.clone('revision-store'),
- serializer_v5, True, versionedfile.HashPrefixMapper(),
+ xml5.serializer_v5, True, versionedfile.HashPrefixMapper(),
repo.is_locked, repo.is_write_locked)
def _get_signatures(self, repo_transport, repo):
=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py 2009-02-19 03:09:55 +0000
+++ b/bzrlib/repository.py 2009-02-20 05:05:25 +0000
@@ -1220,6 +1220,10 @@
dest_repo = a_bzrdir.open_repository()
return dest_repo
+ def _get_sink(self):
+ """Return a sink for streaming into this repository."""
+ return StreamSink(self)
+
@needs_read_lock
def has_revision(self, revision_id):
"""True if this repository has a copy of the revision."""
@@ -3643,3 +3647,80 @@
revision_graph[key] = tuple(parent for parent in parents if parent
in revision_graph)
return revision_graph
+
+
+class StreamSink(object):
+ """An object that can insert a stream into a repository.
+
+ This interface handles the complexity of reserialising inventories and
+ revisions from different formats, and allows unidirectional insertion into
+ stacked repositories without looking for the missing basis parents
+ beforehand.
+ """
+
+ def __init__(self, target_repo):
+ self.target_repo = target_repo
+
+ def insert_stream(self, stream, src_format):
+ """Insert a stream's content into the target repository.
+
+ :param src_format: a bzr repository format.
+
+ :return: an iterable of keys additional items required before the
+ insertion can be completed.
+ """
+ result = []
+ to_serializer = self.target_repo._format._serializer
+ src_serializer = src_format._serializer
+ for substream_type, substream in stream:
+ if substream_type == 'texts':
+ self.target_repo.texts.insert_record_stream(substream)
+ elif substream_type == 'inventories':
+ if src_serializer == to_serializer:
+ self.target_repo.inventories.insert_record_stream(
+ substream)
+ else:
+ self._extract_and_insert_inventories(
+ substream, src_serializer)
+ elif substream_type == 'revisions':
+ # This may fallback to extract-and-insert more often than
+ # required if the serializers are different only in terms of
+ # the inventory.
+ if src_serializer == to_serializer:
+ self.target_repo.revisions.insert_record_stream(
+ substream)
+ else:
+ self._extract_and_insert_revisions(substream,
+ src_serializer)
+ elif substream_type == 'signatures':
+ self.target_repo.signatures.insert_record_stream(substream)
+ else:
+ raise AssertionError('kaboom! %s' % (substream_type,))
+ return result
+
+ def _extract_and_insert_inventories(self, substream, serializer):
+ """Generate a new inventory versionedfile in target, converting data.
+
+ The inventory is retrieved from the source, (deserializing it), and
+ stored in the target (reserializing it in a different format).
+ """
+ for record in substream:
+ bytes = record.get_bytes_as('fulltext')
+ revision_id = record.key[0]
+ inv = serializer.read_inventory_from_string(bytes, revision_id)
+ parents = [key[0] for key in record.parents]
+ self.target_repo.add_inventory(revision_id, inv, parents)
+
+ def _extract_and_insert_revisions(self, substream, serializer):
+ for record in substream:
+ bytes = record.get_bytes_as('fulltext')
+ revision_id = record.key[0]
+ rev = serializer.read_revision_from_string(bytes)
+ if rev.revision_id != revision_id:
+ raise AssertionError('wtf: %s != %s' % (rev, revision_id))
+ self.target_repo.add_revision(revision_id, rev)
+
+ def finished(self):
+ if self.target_repo._fetch_reconcile:
+ self.target_repo.reconcile()
+
=== modified file 'bzrlib/tests/per_repository/test_repository.py'
--- a/bzrlib/tests/per_repository/test_repository.py 2009-02-18 21:00:02 +0000
+++ b/bzrlib/tests/per_repository/test_repository.py 2009-02-20 05:05:25 +0000
@@ -840,6 +840,11 @@
# The repository format is preserved.
self.assertEqual(repo._format, target_repo._format)
+ def test__get_sink(self):
+ repo = self.make_repository('repo')
+ sink = repo._get_sink()
+ self.assertIsInstance(sink, repository.StreamSink)
+
def test__make_parents_provider(self):
"""Repositories must have a _make_parents_provider method that returns
an object with a get_parent_map method.
More information about the bazaar-commits
mailing list