Rev 4023: Refactoring of fetch to have a sender and sink component enabling splitting the logic over a network stream. (Robert Collins, Andrew Bennetts) in http://people.ubuntu.com/~robertc/baz2.0/fetch.sinks

Robert Collins robertc at robertcollins.net
Fri Feb 20 05:05:28 GMT 2009


At http://people.ubuntu.com/~robertc/baz2.0/fetch.sinks

------------------------------------------------------------
revno: 4023
revision-id: robertc at robertcollins.net-20090220050525-o3i6wve7yigkx6i9
parent: pqm at pqm.ubuntu.com-20090220022509-leun2dkfewbwcgn7
committer: Robert Collins <robertc at robertcollins.net>
branch nick: fetch.sinks
timestamp: Fri 2009-02-20 16:05:25 +1100
message:
  Refactoring of fetch to have a sender and sink component enabling splitting the logic over a network stream. (Robert Collins, Andrew Bennetts)
=== modified file 'NEWS'
--- a/NEWS	2009-02-20 02:25:09 +0000
+++ b/NEWS	2009-02-20 05:05:25 +0000
@@ -62,6 +62,10 @@
 
   INTERNALS:
 
+    * ``bzrlib.fetch`` is now composed of a sender and a sink component
+      allowing for decoupling over a network connection. (Andrew Bennetts,
+      Robert Collins)
+
     * ``bzrlib.tests.run_suite`` accepts a runner_class parameter
       supporting the use of different runners. (Robert Collins)
 
@@ -72,6 +76,10 @@
     * ``RepositoryFormat`` objects now have a ``network_name`` for passing
       the format across RPC calls. (Robert Collins, Andrew Bennetts)
 
+    * ``RepositoryFormat`` objects now all have a new attribute
+      ``_serializer`` used by fetch when reserialising is required.
+      (Robert Collins, Andrew Bennetts)
+
     * Some methods have been pulled up from ``BzrBranch`` to ``Branch``
       to aid branch types that are not bzr branch objects (like
       RemoteBranch). (Robert Collins, Andrew Bennetts)

=== modified file 'bzrlib/fetch.py'
--- a/bzrlib/fetch.py	2009-02-02 05:56:34 +0000
+++ b/bzrlib/fetch.py	2009-02-20 05:05:25 +0000
@@ -36,10 +36,7 @@
 import bzrlib.errors as errors
 from bzrlib.errors import InstallFailed
 from bzrlib.progress import ProgressPhase
-from bzrlib.revision import is_null, NULL_REVISION
-from bzrlib.symbol_versioning import (deprecated_function,
-        deprecated_method,
-        )
+from bzrlib.revision import NULL_REVISION
 from bzrlib.tsort import topo_sort
 from bzrlib.trace import mutter
 import bzrlib.ui
@@ -95,6 +92,7 @@
                     '%r and %r' % (to_repository, from_repository))
         self.to_repository = to_repository
         self.from_repository = from_repository
+        self.sink = to_repository._get_sink()
         # must not mutate self._last_revision as its potentially a shared instance
         self._last_revision = last_revision
         self.find_ghosts = find_ghosts
@@ -131,6 +129,12 @@
         This initialises all the needed variables, and then fetches the 
         requested revisions, finally clearing the progress bar.
         """
+        # Roughly this is what we're aiming for fetch to become:
+        #
+        # missing = self.sink.insert_stream(self.source.get_stream(search))
+        # if missing:
+        #     missing = self.sink.insert_stream(self.source.get_items(missing))
+        # assert not missing
         self.count_total = 0
         self.file_ids_names = {}
         pp = ProgressPhase('Transferring', 4, self.pb)
@@ -139,11 +143,7 @@
             search = self._revids_to_fetch()
             if search is None:
                 return
-            if getattr(self, '_fetch_everything_for_search', None) is not None:
-                self._fetch_everything_for_search(search, pp)
-            else:
-                # backward compatibility
-                self._fetch_everything_for_revisions(search.get_keys, pp)
+            self._fetch_everything_for_search(search, pp)
         finally:
             self.pb.clear()
 
@@ -157,58 +157,73 @@
         # item_keys_introduced_by should have a richer API than it does at the
         # moment, so that it can feed the progress information back to this
         # function?
-        phase = 'file'
         pb = bzrlib.ui.ui_factory.nested_progress_bar()
         try:
-            revs = search.get_keys()
-            graph = self.from_repository.get_graph()
-            revs = list(graph.iter_topo_order(revs))
-            data_to_fetch = self.from_repository.item_keys_introduced_by(revs,
-                                                                         pb)
-            text_keys = []
-            for knit_kind, file_id, revisions in data_to_fetch:
-                if knit_kind != phase:
-                    phase = knit_kind
-                    # Make a new progress bar for this phase
-                    pb.finished()
-                    pp.next_phase()
-                    pb = bzrlib.ui.ui_factory.nested_progress_bar()
-                if knit_kind == "file":
-                    # Accumulate file texts
-                    text_keys.extend([(file_id, revision) for revision in
-                        revisions])
-                elif knit_kind == "inventory":
-                    # Now copy the file texts.
-                    to_texts = self.to_repository.texts
-                    from_texts = self.from_repository.texts
-                    to_texts.insert_record_stream(from_texts.get_record_stream(
-                        text_keys, self.to_repository._fetch_order,
-                        not self.to_repository._fetch_uses_deltas))
-                    # Cause an error if a text occurs after we have done the
-                    # copy.
-                    text_keys = None
-                    # Before we process the inventory we generate the root
-                    # texts (if necessary) so that the inventories references
-                    # will be valid.
-                    self._generate_root_texts(revs)
-                    # NB: This currently reopens the inventory weave in source;
-                    # using a single stream interface instead would avoid this.
-                    self._fetch_inventory_weave(revs, pb)
-                elif knit_kind == "signatures":
-                    # Nothing to do here; this will be taken care of when
-                    # _fetch_revision_texts happens.
-                    pass
-                elif knit_kind == "revisions":
-                    self._fetch_revision_texts(revs, pb)
-                else:
-                    raise AssertionError("Unknown knit kind %r" % knit_kind)
-            if self.to_repository._fetch_reconcile:
-                self.to_repository.reconcile()
+            from_format = self.from_repository._format
+            stream = self.get_stream(search, pb, pp)
+            self.sink.insert_stream(stream, from_format)
+            self.sink.finished()
         finally:
             if pb is not None:
                 pb.finished()
+        
+    def get_stream(self, search, pb, pp):
+        phase = 'file'
+        revs = search.get_keys()
+        graph = self.from_repository.get_graph()
+        revs = list(graph.iter_topo_order(revs))
+        data_to_fetch = self.from_repository.item_keys_introduced_by(revs,
+                                                                     pb)
+        text_keys = []
+        for knit_kind, file_id, revisions in data_to_fetch:
+            if knit_kind != phase:
+                phase = knit_kind
+                # Make a new progress bar for this phase
+                pb.finished()
+                pp.next_phase()
+                pb = bzrlib.ui.ui_factory.nested_progress_bar()
+            if knit_kind == "file":
+                # Accumulate file texts
+                text_keys.extend([(file_id, revision) for revision in
+                    revisions])
+            elif knit_kind == "inventory":
+                # Now copy the file texts.
+                to_texts = self.to_repository.texts
+                from_texts = self.from_repository.texts
+                yield ('texts', from_texts.get_record_stream(
+                    text_keys, self.to_repository._fetch_order,
+                    not self.to_repository._fetch_uses_deltas))
+                # Cause an error if a text occurs after we have done the
+                # copy.
+                text_keys = None
+                # Before we process the inventory we generate the root
+                # texts (if necessary) so that the inventories references
+                # will be valid.
+                for _ in self._generate_root_texts(revs):
+                    yield _
+                # NB: This currently reopens the inventory weave in source;
+                # using a single stream interface instead would avoid this.
+                pb.update("fetch inventory", 0, 1)
+                from_weave = self.from_repository.inventories
+                # we fetch only the referenced inventories because we do not
+                # know for unselected inventories whether all their required
+                # texts are present in the other repository - it could be
+                # corrupt.
+                yield ('inventories', from_weave.get_record_stream(
+                    [(rev_id,) for rev_id in revs],
+                    self.inventory_fetch_order(),
+                    not self.delta_on_metadata()))
+            elif knit_kind == "signatures":
+                # Nothing to do here; this will be taken care of when
+                # _fetch_revision_texts happens.
+                pass
+            elif knit_kind == "revisions":
+                for _ in self._fetch_revision_texts(revs, pb):
+                    yield _
+            else:
+                raise AssertionError("Unknown knit kind %r" % knit_kind)
         self.count_copied += len(revs)
-        
+
     def _revids_to_fetch(self):
         """Determines the exact revisions needed from self.from_repository to
         install self._last_revision in self.to_repository.
@@ -229,45 +244,25 @@
         except errors.NoSuchRevision, e:
             raise InstallFailed([self._last_revision])
 
-    def _fetch_inventory_weave(self, revs, pb):
-        pb.update("fetch inventory", 0, 2)
-        to_weave = self.to_repository.inventories
-        # just merge, this is optimisable and its means we don't
-        # copy unreferenced data such as not-needed inventories.
-        pb.update("fetch inventory", 1, 3)
-        from_weave = self.from_repository.inventories
-        pb.update("fetch inventory", 2, 3)
-        # we fetch only the referenced inventories because we do not
-        # know for unselected inventories whether all their required
-        # texts are present in the other repository - it could be
-        # corrupt.
-        to_weave.insert_record_stream(from_weave.get_record_stream(
-            [(rev_id,) for rev_id in revs],
-            self.to_repository._fetch_order,
-            not self.to_repository._fetch_uses_deltas))
-
     def _fetch_revision_texts(self, revs, pb):
         # fetch signatures first and then the revision texts
         # may need to be a InterRevisionStore call here.
-        to_sf = self.to_repository.signatures
         from_sf = self.from_repository.signatures
         # A missing signature is just skipped.
-        to_sf.insert_record_stream(filter_absent(from_sf.get_record_stream(
-            [(rev_id,) for rev_id in revs],
+        keys = [(rev_id,) for rev_id in revs]
+        signatures = filter_absent(from_sf.get_record_stream(
+            keys,
             self.to_repository._fetch_order,
-            not self.to_repository._fetch_uses_deltas)))
-        self._fetch_just_revision_texts(revs)
-
-    def _fetch_just_revision_texts(self, version_ids):
-        to_rf = self.to_repository.revisions
-        from_rf = self.from_repository.revisions
+            not self.to_repository._fetch_uses_deltas))
         # If a revision has a delta, this is actually expanded inside the
         # insert_record_stream code now, which is an alternate fix for
         # bug #261339
-        to_rf.insert_record_stream(from_rf.get_record_stream(
-            [(rev_id,) for rev_id in version_ids],
+        from_rf = self.from_repository.revisions
+        revisions = from_rf.get_record_stream(
+            keys,
             self.to_repository._fetch_order,
-            not self.to_repository._fetch_uses_deltas))
+            not self.delta_on_metadata())
+        return [('signatures', signatures), ('revisions', revisions)]
 
     def _generate_root_texts(self, revs):
         """This will be called by __fetch between fetching weave texts and
@@ -276,7 +271,16 @@
         Subclasses should override this if they need to generate root texts
         after fetching weave texts.
         """
-        pass
+        return []
+
+    def inventory_fetch_order(self):
+        return self.to_repository._fetch_order
+
+    def delta_on_metadata(self):
+        src_serializer = self.from_repository._format._serializer
+        target_serializer = self.to_repository._format._serializer
+        return (self.to_repository._fetch_uses_deltas and
+            src_serializer == target_serializer)
 
 
 class Inter1and2Helper(object):
@@ -285,14 +289,12 @@
     This is for use by fetchers and converters.
     """
 
-    def __init__(self, source, target):
+    def __init__(self, source):
         """Constructor.
 
         :param source: The repository data comes from
-        :param target: The repository data goes to
         """
         self.source = source
-        self.target = target
 
     def iter_rev_trees(self, revs):
         """Iterate through RevisionTrees efficiently.
@@ -338,7 +340,6 @@
 
         :param revs: the revisions to include
         """
-        to_texts = self.target.texts
         graph = self.source.get_graph()
         parent_map = graph.get_parent_map(revs)
         rev_order = topo_sort(parent_map)
@@ -368,25 +369,7 @@
                     if parent != NULL_REVISION and
                         rev_id_to_root_id.get(parent, root_id) == root_id)
                 yield FulltextContentFactory(key, parent_keys, None, '')
-        to_texts.insert_record_stream(yield_roots())
-
-    def regenerate_inventory(self, revs):
-        """Generate a new inventory versionedfile in target, convertin data.
-        
-        The inventory is retrieved from the source, (deserializing it), and
-        stored in the target (reserializing it in a different format).
-        :param revs: The revisions to include
-        """
-        for tree in self.iter_rev_trees(revs):
-            parents = tree.get_parent_ids()
-            self.target.add_inventory(tree.get_revision_id(), tree.inventory,
-                                      parents)
-
-    def fetch_revisions(self, revision_ids):
-        # TODO: should this batch them up rather than requesting 10,000
-        #       revisions at once?
-        for revision in self.source.get_revisions(revision_ids):
-            self.target.add_revision(revision.revision_id, revision)
+        return [('texts', yield_roots())]
 
 
 class Model1toKnit2Fetcher(RepoFetcher):
@@ -394,49 +377,14 @@
     """
     def __init__(self, to_repository, from_repository, last_revision=None,
                  pb=None, find_ghosts=True):
-        self.helper = Inter1and2Helper(from_repository, to_repository)
-        RepoFetcher.__init__(self, to_repository, from_repository,
-            last_revision, pb, find_ghosts)
-
-    def _generate_root_texts(self, revs):
-        self.helper.generate_root_texts(revs)
-
-    def _fetch_inventory_weave(self, revs, pb):
-        self.helper.regenerate_inventory(revs)
-
-    def _fetch_revision_texts(self, revs, pb):
-        """Fetch revision object texts"""
-        count = 0
-        total = len(revs)
-        for rev in revs:
-            pb.update('copying revisions', count, total)
-            try:
-                sig_text = self.from_repository.get_signature_text(rev)
-                self.to_repository.add_signature_text(rev, sig_text)
-            except errors.NoSuchRevision:
-                # not signed.
-                pass
-            self._copy_revision(rev)
-            count += 1
-
-    def _copy_revision(self, rev):
-        self.helper.fetch_revisions([rev])
-
-
-class Knit1to2Fetcher(RepoFetcher):
-    """Fetch from a Knit1 repository into a Knit2 repository"""
-
-    def __init__(self, to_repository, from_repository, last_revision=None,
-                 pb=None, find_ghosts=True):
-        self.helper = Inter1and2Helper(from_repository, to_repository)
-        RepoFetcher.__init__(self, to_repository, from_repository,
-            last_revision, pb, find_ghosts)
-
-    def _generate_root_texts(self, revs):
-        self.helper.generate_root_texts(revs)
-
-    def _fetch_inventory_weave(self, revs, pb):
-        self.helper.regenerate_inventory(revs)
-
-    def _fetch_just_revision_texts(self, version_ids):
-        self.helper.fetch_revisions(version_ids)
+        self.helper = Inter1and2Helper(from_repository)
+        RepoFetcher.__init__(self, to_repository, from_repository,
+            last_revision, pb, find_ghosts)
+
+    def _generate_root_texts(self, revs):
+        return self.helper.generate_root_texts(revs)
+
+    def inventory_fetch_order(self):
+        return 'topological'
+
+Knit1to2Fetcher = Model1toKnit2Fetcher

=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py	2009-02-20 01:45:00 +0000
+++ b/bzrlib/remote.py	2009-02-20 05:05:25 +0000
@@ -18,6 +18,7 @@
 # across to run on the server.
 
 import bz2
+import struct
 
 from bzrlib import (
     branch,
@@ -325,6 +326,15 @@
         self._creating_repo._ensure_real()
         return self._creating_repo._real_repository._format.network_name()
 
+    @property
+    def _serializer(self):
+        # We should only be getting asked for the serializer for
+        # RemoteRepositoryFormat objects except when the RemoteRepositoryFormat
+        # object is a concrete instance for a RemoteRepository. In this case
+        # we know the creating_repo and can use it to supply the serializer.
+        self._creating_repo._ensure_real()
+        return self._creating_repo._real_repository._format._serializer
+
 
 class RemoteRepository(_RpcHelper):
     """Repository accessed over rpc.
@@ -471,6 +481,11 @@
             
         return revision_graph
 
+    def _get_sink(self):
+        """See Repository._get_sink()."""
+        self._ensure_real()
+        return self._real_repository._get_sink()
+
     def has_revision(self, revision_id):
         """See Repository.has_revision()."""
         if revision_id == NULL_REVISION:
@@ -1256,6 +1271,10 @@
             raise errors.UnexpectedSmartServerResponse(response)
 
 
+def _length_prefix(bytes):
+    return struct.pack('!L', len(bytes))
+
+
 class RemoteBranchLockableFiles(LockableFiles):
     """A 'LockableFiles' implementation that talks to a smart server.
     

=== modified file 'bzrlib/repofmt/weaverepo.py'
--- a/bzrlib/repofmt/weaverepo.py	2009-02-19 03:09:55 +0000
+++ b/bzrlib/repofmt/weaverepo.py	2009-02-20 05:05:25 +0000
@@ -192,14 +192,11 @@
 class WeaveMetaDirRepository(MetaDirVersionedFileRepository):
     """A subclass of MetaDirRepository to set weave specific policy."""
 
-    @property
-    def _serializer(self):
-        return xml5.serializer_v5
-
     def __init__(self, _format, a_bzrdir, control_files):
         super(WeaveMetaDirRepository, self).__init__(_format, a_bzrdir, control_files)
         self._fetch_order = 'topological'
         self._fetch_reconcile = True
+        self._serializer = _format._serializer
 
     @needs_read_lock
     def _all_possible_ids(self):
@@ -390,6 +387,9 @@
 
     _versionedfile_class = weave.WeaveFile
     _matchingbzrdir = bzrdir.BzrDirFormat5()
+    @property
+    def _serializer(self):
+        return xml5.serializer_v5
 
     def __init__(self):
         super(RepositoryFormat5, self).__init__()
@@ -410,9 +410,8 @@
             weave.WeaveFile, mapper, repo.is_locked)
 
     def _get_revisions(self, repo_transport, repo):
-        from bzrlib.xml5 import serializer_v5
         return RevisionTextStore(repo_transport.clone('revision-store'),
-            serializer_v5, False, versionedfile.PrefixMapper(),
+            xml5.serializer_v5, False, versionedfile.PrefixMapper(),
             repo.is_locked, repo.is_write_locked)
 
     def _get_signatures(self, repo_transport, repo):
@@ -438,6 +437,9 @@
 
     _versionedfile_class = weave.WeaveFile
     _matchingbzrdir = bzrdir.BzrDirFormat6()
+    @property
+    def _serializer(self):
+        return xml5.serializer_v5
 
     def __init__(self):
         super(RepositoryFormat6, self).__init__()
@@ -458,9 +460,8 @@
             weave.WeaveFile, mapper, repo.is_locked)
 
     def _get_revisions(self, repo_transport, repo):
-        from bzrlib.xml5 import serializer_v5
         return RevisionTextStore(repo_transport.clone('revision-store'),
-            serializer_v5, False, versionedfile.HashPrefixMapper(),
+            xml5.serializer_v5, False, versionedfile.HashPrefixMapper(),
             repo.is_locked, repo.is_write_locked)
 
     def _get_signatures(self, repo_transport, repo):
@@ -489,6 +490,9 @@
 
     _versionedfile_class = weave.WeaveFile
     supports_ghosts = False
+    @property
+    def _serializer(self):
+        return xml5.serializer_v5
 
     def get_format_string(self):
         """See RepositoryFormat.get_format_string()."""
@@ -507,9 +511,8 @@
             weave.WeaveFile, mapper, repo.is_locked)
 
     def _get_revisions(self, repo_transport, repo):
-        from bzrlib.xml5 import serializer_v5
         return RevisionTextStore(repo_transport.clone('revision-store'),
-            serializer_v5, True, versionedfile.HashPrefixMapper(),
+            xml5.serializer_v5, True, versionedfile.HashPrefixMapper(),
             repo.is_locked, repo.is_write_locked)
 
     def _get_signatures(self, repo_transport, repo):

=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py	2009-02-19 03:09:55 +0000
+++ b/bzrlib/repository.py	2009-02-20 05:05:25 +0000
@@ -1220,6 +1220,10 @@
                 dest_repo = a_bzrdir.open_repository()
         return dest_repo
 
+    def _get_sink(self):
+        """Return a sink for streaming into this repository."""
+        return StreamSink(self)
+
     @needs_read_lock
     def has_revision(self, revision_id):
         """True if this repository has a copy of the revision."""
@@ -3643,3 +3647,80 @@
         revision_graph[key] = tuple(parent for parent in parents if parent
             in revision_graph)
     return revision_graph
+
+
+class StreamSink(object):
+    """An object that can insert a stream into a repository.
+
+    This interface handles the complexity of reserialising inventories and
+    revisions from different formats, and allows unidirectional insertion into
+    stacked repositories without looking for the missing basis parents
+    beforehand.
+    """
+
+    def __init__(self, target_repo):
+        self.target_repo = target_repo
+
+    def insert_stream(self, stream, src_format):
+        """Insert a stream's content into the target repository.
+
+        :param src_format: a bzr repository format.
+
+        :return: an iterable of keys additional items required before the
+        insertion can be completed.
+        """
+        result = []
+        to_serializer = self.target_repo._format._serializer
+        src_serializer = src_format._serializer
+        for substream_type, substream in stream:
+            if substream_type == 'texts':
+                self.target_repo.texts.insert_record_stream(substream)
+            elif substream_type == 'inventories':
+                if src_serializer == to_serializer:
+                    self.target_repo.inventories.insert_record_stream(
+                        substream)
+                else:
+                    self._extract_and_insert_inventories(
+                        substream, src_serializer)
+            elif substream_type == 'revisions':
+                # This may fallback to extract-and-insert more often than
+                # required if the serializers are different only in terms of
+                # the inventory.
+                if src_serializer == to_serializer:
+                    self.target_repo.revisions.insert_record_stream(
+                        substream)
+                else:
+                    self._extract_and_insert_revisions(substream,
+                        src_serializer)
+            elif substream_type == 'signatures':
+                self.target_repo.signatures.insert_record_stream(substream)
+            else:
+                raise AssertionError('kaboom! %s' % (substream_type,))
+        return result
+
+    def _extract_and_insert_inventories(self, substream, serializer):
+        """Generate a new inventory versionedfile in target, converting data.
+        
+        The inventory is retrieved from the source, (deserializing it), and
+        stored in the target (reserializing it in a different format).
+        """
+        for record in substream:
+            bytes = record.get_bytes_as('fulltext')
+            revision_id = record.key[0]
+            inv = serializer.read_inventory_from_string(bytes, revision_id)
+            parents = [key[0] for key in record.parents]
+            self.target_repo.add_inventory(revision_id, inv, parents)
+
+    def _extract_and_insert_revisions(self, substream, serializer):
+        for record in substream:
+            bytes = record.get_bytes_as('fulltext')
+            revision_id = record.key[0]
+            rev = serializer.read_revision_from_string(bytes)
+            if rev.revision_id != revision_id:
+                raise AssertionError('wtf: %s != %s' % (rev, revision_id))
+            self.target_repo.add_revision(revision_id, rev)
+
+    def finished(self):
+        if self.target_repo._fetch_reconcile:
+            self.target_repo.reconcile()
+

=== modified file 'bzrlib/tests/per_repository/test_repository.py'
--- a/bzrlib/tests/per_repository/test_repository.py	2009-02-18 21:00:02 +0000
+++ b/bzrlib/tests/per_repository/test_repository.py	2009-02-20 05:05:25 +0000
@@ -840,6 +840,11 @@
         # The repository format is preserved.
         self.assertEqual(repo._format, target_repo._format)
 
+    def test__get_sink(self):
+        repo = self.make_repository('repo')
+        sink = repo._get_sink()
+        self.assertIsInstance(sink, repository.StreamSink)
+                
     def test__make_parents_provider(self):
         """Repositories must have a _make_parents_provider method that returns
         an object with a get_parent_map method.




More information about the bazaar-commits mailing list