Rev 3828: Change how progress indicators work during 'generic fetch' in http://bzr.arbash-meinel.com/branches/bzr/brisbane/generic_fetch_ordering
John Arbash Meinel
john at arbash-meinel.com
Thu Feb 19 17:13:35 GMT 2009
At http://bzr.arbash-meinel.com/branches/bzr/brisbane/generic_fetch_ordering
------------------------------------------------------------
revno: 3828
revision-id: john at arbash-meinel.com-20090219171317-o39ib282zbwyby8f
parent: john at arbash-meinel.com-20090218190726-oozb84xy6h8ok28z
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: generic_fetch_ordering
timestamp: Thu 2009-02-19 11:13:17 -0600
message:
Change how progress indicators work during 'generic fetch'
-------------- next part --------------
=== modified file 'bzrlib/fetch.py'
--- a/bzrlib/fetch.py 2009-02-18 16:36:18 +0000
+++ b/bzrlib/fetch.py 2009-02-19 17:13:17 +0000
@@ -62,6 +62,13 @@
# - then go through all files; for each one get the weave,
# and add in all file versions
+def _pb_stream_adapter(pb, msg, num_keys, stream):
+ def adapter():
+ for idx, record in enumerate(stream):
+ pb.update(msg, idx, num_keys)
+ yield record
+ return adapter
+
class RepoFetcher(object):
"""Pull revisions and texts from one repository to another.
@@ -180,11 +187,7 @@
# When the inventory keys start being reported, all text
# keys have already been issued - and we want the text keys
# inserted before inventory keys: copy the texts.
- to_texts = self.to_repository.texts
- from_texts = self.from_repository.texts
- to_texts.insert_record_stream(from_texts.get_record_stream(
- text_keys, self.to_repository._fetch_order,
- not self.to_repository._fetch_uses_deltas))
+ self._fetch_text_texts(text_keys, pb=pb)
# Cause an error if a text occurs after we have done the
# copy.
text_keys = None
@@ -230,40 +233,41 @@
except errors.NoSuchRevision, e:
raise InstallFailed([self._last_revision])
+ def _fetch_text_texts(self, text_keys, pb):
+ to_texts = self.to_repository.texts
+ from_texts = self.from_repository.texts
+ text_stream = from_texts.get_record_stream(text_keys,
+ self.to_repository._fetch_order,
+ not self.to_repository._fetch_uses_deltas)
+ adapter = _pb_stream_adapter(pb, 'fetch text', len(text_keys),
+ text_stream)
+ to_texts.insert_record_stream(adapter())
+
def _fetch_inventory_weave(self, revs, pb):
- pb.update("fetch inventory", 0, 2)
- to_weave = self.to_repository.inventories
- # just merge, this is optimisable and its means we don't
- # copy unreferenced data such as not-needed inventories.
- pb.update("fetch inventory", 1, 3)
- from_weave = self.from_repository.inventories
- pb.update("fetch inventory", 2, 3)
# we fetch only the referenced inventories because we do not
# know for unselected inventories whether all their required
# texts are present in the other repository - it could be
# corrupt.
- child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
- try:
- if (self.from_repository._format.supports_chks and
- self.to_repository._format.supports_chks):
- self._fetch_chk_inventories(revs, child_pb)
- elif (self.from_repository._format.supports_chks or
- self.to_repository._format.supports_chks):
- # Hack to make not-chk->chk fetch: copy the inventories as
- # inventories.
- total = len(revs)
- for pos, inv in enumerate(
- self.from_repository.iter_inventories(revs)):
- child_pb.update("Copying inventories", pos, total)
- self.to_repository.add_inventory(inv.revision_id, inv, [])
- else:
- to_weave.insert_record_stream(from_weave.get_record_stream(
- [(rev_id,) for rev_id in revs],
- self.to_repository._fetch_order,
- not self.to_repository._fetch_uses_deltas))
- finally:
- child_pb.finished()
- pb.update("fetch inventory", 3, 3)
+ if (self.from_repository._format.supports_chks and
+ self.to_repository._format.supports_chks):
+ self._fetch_chk_inventories(revs, pb)
+ elif (self.from_repository._format.supports_chks or
+ self.to_repository._format.supports_chks):
+ # Hack to make not-chk->chk fetch: copy the inventories as
+ # inventories.
+ total = len(revs)
+ for idx, inv in enumerate(
+ self.from_repository.iter_inventories(revs)):
+ pb.update("Copying inventories", idx, total)
+ self.to_repository.add_inventory(inv.revision_id, inv, [])
+ else:
+ to_weave = self.to_repository.inventories
+ from_weave = self.from_repository.inventories
+ adapter = _pb_stream_adapter(pb, 'fetch inv', len(revs),
+ from_weave.get_record_stream([(rev_id,) for rev_id in revs],
+ self.to_repository._fetch_order,
+ not self.to_repository._fetch_uses_deltas))
+ to_weave.insert_record_stream(adapter())
def _fetch_revision_texts(self, revs, pb):
# fetch signatures first and then the revision texts
@@ -312,7 +316,8 @@
uninteresting_chk_roots = set()
interesting_chk_roots = set()
def filter_inv_stream(inv_stream):
- for record in inv_stream:
+ for idx, record in enumerate(inv_stream):
+ child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
bytes = record.get_bytes_as('fulltext')
chk_inv = inventory.CHKInventory.deserialise(
self.from_repository.chk_bytes, bytes, record.key)
@@ -327,29 +332,41 @@
p_id_map = chk_inv.parent_id_basename_to_file_id
if p_id_map is not None:
interesting_chk_roots.add(p_id_map.key())
- self.to_repository.inventories.insert_record_stream(filter_inv_stream(inv_stream))
+ pb.update('fetch inventory', 0, 2)
+ child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
+ try:
+ self.to_repository.inventories.insert_record_stream(
+ filter_inv_stream(inv_stream))
+ finally:
+ child_pb.finished()
# Now that we have worked out all of the interesting root nodes, grab
# all of the interesting pages and insert them
- interesting = chk_map.iter_interesting_nodes(
- self.from_repository.chk_bytes, interesting_chk_roots,
- uninteresting_chk_roots, pb=pb)
- def to_stream_adapter():
- """Adapt the iter_interesting_nodes result to a single stream.
+ pb.update('fetch inventory', 1, 2)
+ child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
+ try:
+ interesting = chk_map.iter_interesting_nodes(
+ self.from_repository.chk_bytes, interesting_chk_roots,
+ uninteresting_chk_roots, pb=child_pb)
+ def to_stream_adapter():
+ """Adapt the iter_interesting_nodes result to a single stream.
- iter_interesting_nodes returns records as it processes them, which
- can be in batches. But we only want a single stream to be inserted.
- """
- for record, items in interesting:
- for value in record.itervalues():
- yield value
- # XXX: We could instead call get_record_stream(records.keys())
- # ATM, this will always insert the records as fulltexts, and
- # requires that you can hang on to records once you have gone
- # on to the next one. Further, it causes the target to
- # recompress the data. Testing shows it to be faster than
- # requesting the records again, though.
- self.to_repository.chk_bytes.insert_record_stream(
- to_stream_adapter())
+ iter_interesting_nodes returns records as it processes them, which
+ can be in batches. But we only want a single stream to be inserted.
+ """
+ for record, items in interesting:
+ for value in record.itervalues():
+ yield value
+ # XXX: We could instead call get_record_stream(records.keys())
+ # ATM, this will always insert the records as fulltexts, and
+ # requires that you can hang on to records once you have gone
+ # on to the next one. Further, it causes the target to
+ # recompress the data. Testing shows it to be faster than
+ # requesting the records again, though.
+ self.to_repository.chk_bytes.insert_record_stream(
+ to_stream_adapter())
+ finally:
+ child_pb.finished()
+ pb.update('fetch inventory', 2, 2)
def _generate_root_texts(self, revs):
"""This will be called by __fetch between fetching weave texts and
More information about the bazaar-commits
mailing list