Rev 3828: Change how progress indicators work during 'generic fetch' in http://bzr.arbash-meinel.com/branches/bzr/brisbane/generic_fetch_ordering

John Arbash Meinel john at arbash-meinel.com
Thu Feb 19 17:13:35 GMT 2009


At http://bzr.arbash-meinel.com/branches/bzr/brisbane/generic_fetch_ordering

------------------------------------------------------------
revno: 3828
revision-id: john at arbash-meinel.com-20090219171317-o39ib282zbwyby8f
parent: john at arbash-meinel.com-20090218190726-oozb84xy6h8ok28z
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: generic_fetch_ordering
timestamp: Thu 2009-02-19 11:13:17 -0600
message:
  Change how progress indicators work during 'generic fetch'
-------------- next part --------------
=== modified file 'bzrlib/fetch.py'
--- a/bzrlib/fetch.py	2009-02-18 16:36:18 +0000
+++ b/bzrlib/fetch.py	2009-02-19 17:13:17 +0000
@@ -62,6 +62,13 @@
 # - then go through all files; for each one get the weave,
 #   and add in all file versions
 
+def _pb_stream_adapter(pb, msg, num_keys, stream):
+    def adapter():
+        for idx, record in enumerate(stream):
+            pb.update(msg, idx, num_keys)
+            yield record
+    return adapter
+
 
 class RepoFetcher(object):
     """Pull revisions and texts from one repository to another.
@@ -180,11 +187,7 @@
                     # When the inventory keys start being reported, all text
                     # keys have already been issued - and we want the text keys
                     # inserted before inventory keys: copy the texts.
-                    to_texts = self.to_repository.texts
-                    from_texts = self.from_repository.texts
-                    to_texts.insert_record_stream(from_texts.get_record_stream(
-                        text_keys, self.to_repository._fetch_order,
-                        not self.to_repository._fetch_uses_deltas))
+                    self._fetch_text_texts(text_keys, pb=pb)
                     # Cause an error if a text occurs after we have done the
                     # copy.
                     text_keys = None
@@ -230,40 +233,41 @@
         except errors.NoSuchRevision, e:
             raise InstallFailed([self._last_revision])
 
+    def _fetch_text_texts(self, text_keys, pb):
+        to_texts = self.to_repository.texts
+        from_texts = self.from_repository.texts
+        text_stream = from_texts.get_record_stream(text_keys,
+                            self.to_repository._fetch_order,
+                            not self.to_repository._fetch_uses_deltas)
+        adapter = _pb_stream_adapter(pb, 'fetch text', len(text_keys),
+                                     text_stream)
+        to_texts.insert_record_stream(adapter())
+
     def _fetch_inventory_weave(self, revs, pb):
-        pb.update("fetch inventory", 0, 2)
-        to_weave = self.to_repository.inventories
-        # just merge, this is optimisable and its means we don't
-        # copy unreferenced data such as not-needed inventories.
-        pb.update("fetch inventory", 1, 3)
-        from_weave = self.from_repository.inventories
-        pb.update("fetch inventory", 2, 3)
         # we fetch only the referenced inventories because we do not
         # know for unselected inventories whether all their required
         # texts are present in the other repository - it could be
         # corrupt.
-        child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
-        try:
-            if (self.from_repository._format.supports_chks and
-                self.to_repository._format.supports_chks):
-                self._fetch_chk_inventories(revs, child_pb)
-            elif (self.from_repository._format.supports_chks or
-                self.to_repository._format.supports_chks):
-                # Hack to make not-chk->chk fetch: copy the inventories as
-                # inventories.
-                total = len(revs)
-                for pos, inv in enumerate(
-                    self.from_repository.iter_inventories(revs)):
-                    child_pb.update("Copying inventories", pos, total)
-                    self.to_repository.add_inventory(inv.revision_id, inv, [])
-            else:
-                to_weave.insert_record_stream(from_weave.get_record_stream(
-                    [(rev_id,) for rev_id in revs],
-                    self.to_repository._fetch_order,
-                    not self.to_repository._fetch_uses_deltas))
-        finally:
-            child_pb.finished()
-        pb.update("fetch inventory", 3, 3)
+        if (self.from_repository._format.supports_chks and
+            self.to_repository._format.supports_chks):
+            self._fetch_chk_inventories(revs, pb)
+        elif (self.from_repository._format.supports_chks or
+              self.to_repository._format.supports_chks):
+            # Hack to make not-chk->chk fetch: copy the inventories as
+            # inventories.
+            total = len(revs)
+            for idx, inv in enumerate(
+                self.from_repository.iter_inventories(revs)):
+                pb.update("Copying inventories", idx, total)
+                self.to_repository.add_inventory(inv.revision_id, inv, [])
+        else:
+            to_weave = self.to_repository.inventories
+            from_weave = self.from_repository.inventories
+            adapter = _pb_stream_adapter(pb, 'fetch inv', len(revs),
+                from_weave.get_record_stream([(rev_id,) for rev_id in revs],
+                            self.to_repository._fetch_order,
+                            not self.to_repository._fetch_uses_deltas))
+            to_weave.insert_record_stream(adapter())
 
     def _fetch_revision_texts(self, revs, pb):
         # fetch signatures first and then the revision texts
@@ -312,7 +316,8 @@
         uninteresting_chk_roots = set()
         interesting_chk_roots = set()
         def filter_inv_stream(inv_stream):
-            for record in inv_stream:
+            for idx, record in enumerate(inv_stream):
+                child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
                 bytes = record.get_bytes_as('fulltext')
                 chk_inv = inventory.CHKInventory.deserialise(
                     self.from_repository.chk_bytes, bytes, record.key)
@@ -327,29 +332,41 @@
                     p_id_map = chk_inv.parent_id_basename_to_file_id
                     if p_id_map is not None:
                         interesting_chk_roots.add(p_id_map.key())
-        self.to_repository.inventories.insert_record_stream(filter_inv_stream(inv_stream))
+        pb.update('fetch inventory', 0, 2)
+        child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
+        try:
+            self.to_repository.inventories.insert_record_stream(
+                filter_inv_stream(inv_stream))
+        finally:
+            child_pb.finished()
         # Now that we have worked out all of the interesting root nodes, grab
         # all of the interesting pages and insert them
-        interesting = chk_map.iter_interesting_nodes(
-            self.from_repository.chk_bytes, interesting_chk_roots,
-            uninteresting_chk_roots, pb=pb)
-        def to_stream_adapter():
-            """Adapt the iter_interesting_nodes result to a single stream.
+        pb.update('fetch inventory', 1, 2)
+        child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
+        try:
+            interesting = chk_map.iter_interesting_nodes(
+                self.from_repository.chk_bytes, interesting_chk_roots,
+                uninteresting_chk_roots, pb=child_pb)
+            def to_stream_adapter():
+                """Adapt the iter_interesting_nodes result to a single stream.
 
-            iter_interesting_nodes returns records as it processes them, which
-            can be in batches. But we only want a single stream to be inserted.
-            """
-            for record, items in interesting:
-                for value in record.itervalues():
-                    yield value
-        # XXX: We could instead call get_record_stream(records.keys())
-        #      ATM, this will always insert the records as fulltexts, and
-        #      requires that you can hang on to records once you have gone
-        #      on to the next one. Further, it causes the target to
-        #      recompress the data. Testing shows it to be faster than
-        #      requesting the records again, though.
-        self.to_repository.chk_bytes.insert_record_stream(
-            to_stream_adapter())
+                iter_interesting_nodes returns records as it processes them, which
+                can be in batches. But we only want a single stream to be inserted.
+                """
+                for record, items in interesting:
+                    for value in record.itervalues():
+                        yield value
+            # XXX: We could instead call get_record_stream(records.keys())
+            #      ATM, this will always insert the records as fulltexts, and
+            #      requires that you can hang on to records once you have gone
+            #      on to the next one. Further, it causes the target to
+            #      recompress the data. Testing shows it to be faster than
+            #      requesting the records again, though.
+            self.to_repository.chk_bytes.insert_record_stream(
+                to_stream_adapter())
+        finally:
+            child_pb.finished()
+        pb.update('fetch inventory', 2, 2)
 
     def _generate_root_texts(self, revs):
         """This will be called by __fetch between fetching weave texts and



More information about the bazaar-commits mailing list