Rev 3071: * ``bzr pack`` now orders revision texts in topological order, with newest in http://people.ubuntu.com/~robertc/baz2.0/pack
Robert Collins
robertc at robertcollins.net
Tue Dec 4 02:19:58 GMT 2007
At http://people.ubuntu.com/~robertc/baz2.0/pack
------------------------------------------------------------
revno: 3071
revision-id:robertc at robertcollins.net-20071204021939-883w43jh8fuy1mzf
parent: pqm at pqm.ubuntu.com-20071203210338-3w0ryakegm0xopp0
committer: Robert Collins <robertc at robertcollins.net>
branch nick: pack
timestamp: Tue 2007-12-04 13:19:39 +1100
message:
* ``bzr pack`` now orders revision texts in topological order, with newest
at the start of the file, promoting linear reads for ``bzr log`` and the
like. This partially fixes #154129. (Robert Collins)
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
=== modified file 'NEWS'
--- a/NEWS 2007-12-03 17:03:10 +0000
+++ b/NEWS 2007-12-04 02:19:39 +0000
@@ -12,6 +12,10 @@
* ``bzr commit`` now doesn't print the revision number twice. (Matt
Nordhoff, #172612)
+ * ``bzr pack`` now orders revision texts in topological order, with newest
+ at the start of the file, promoting linear reads for ``bzr log`` and the
+ like. This partially fixes #154129. (Robert Collins)
+
* Support logging single merge revisions with short and line log formatters.
(Kent Gibson)
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py 2007-12-03 19:26:40 +0000
+++ b/bzrlib/repofmt/pack_repo.py 2007-12-04 02:19:39 +0000
@@ -610,8 +610,9 @@
revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
# copy revision keys and adjust values
self.pb.update("Copying revision texts", 1)
- list(self._copy_nodes_graph(revision_nodes, revision_index_map,
- self.new_pack._writer, self.new_pack.revision_index))
+ total_items, readv_group_iter = self._sort_revisions(revision_nodes)
+ list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
+ self.new_pack.revision_index, readv_group_iter, total_items))
if 'pack' in debug.debug_flags:
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
time.ctime(), self._pack_collection._upload_transport.base,
@@ -639,8 +640,10 @@
# XXX: Should be a helper function to allow different inv representation
# at this point.
self.pb.update("Copying inventory texts", 2)
- inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
- self.new_pack._writer, self.new_pack.inventory_index, output_lines=True)
+ total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
+ inv_lines = self._copy_nodes_graph(inventory_index_map,
+ self.new_pack._writer, self.new_pack.inventory_index,
+ readv_group_iter, total_items, output_lines=True)
if self.revision_ids:
self._process_inventory_lines(inv_lines)
else:
@@ -675,8 +678,9 @@
a_missing_key[0])
# copy text keys and adjust values
self.pb.update("Copying content texts", 3)
- list(self._copy_nodes_graph(text_nodes, text_index_map,
- self.new_pack._writer, self.new_pack.text_index))
+ total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
+ list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
+ self.new_pack.text_index, readv_group_iter, total_items))
self._log_copied_texts()
def _check_references(self):
@@ -787,8 +791,8 @@
pb.update("Copied record", record_index)
record_index += 1
- def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
- output_lines=False):
+ def _copy_nodes_graph(self, index_map, writer, write_index,
+ readv_group_iter, total_items, output_lines=False):
"""Copy knit nodes between packs.
:param output_lines: Return lines present in the copied data as
@@ -796,8 +800,8 @@
"""
pb = ui.ui_factory.nested_progress_bar()
try:
- for result in self._do_copy_nodes_graph(nodes, index_map, writer,
- write_index, output_lines, pb):
+ for result in self._do_copy_nodes_graph(index_map, writer,
+ write_index, output_lines, pb, readv_group_iter, total_items):
yield result
except Exception:
# Python 2.4 does not permit try:finally: in a generator.
@@ -806,42 +810,21 @@
else:
pb.finished()
- def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
- output_lines, pb):
+ def _do_copy_nodes_graph(self, index_map, writer, write_index,
+ output_lines, pb, readv_group_iter, total_items):
# for record verification
knit_data = _KnitData(None)
# for line extraction when requested (inventories only)
if output_lines:
factory = knit.KnitPlainFactory()
- # plan a readv on each source pack:
- # group by pack
- nodes = sorted(nodes)
- # how to map this into knit.py - or knit.py into this?
- # we don't want the typical knit logic, we want grouping by pack
- # at this point - perhaps a helper library for the following code
- # duplication points?
- request_groups = {}
record_index = 0
- pb.update("Copied record", record_index, len(nodes))
- for index, key, value, references in nodes:
- if index not in request_groups:
- request_groups[index] = []
- request_groups[index].append((key, value, references))
- for index, items in request_groups.iteritems():
- pack_readv_requests = []
- for key, value, references in items:
- # ---- KnitGraphIndex.get_position
- bits = value[1:].split(' ')
- offset, length = int(bits[0]), int(bits[1])
- pack_readv_requests.append((offset, length, (key, value[0], references)))
- # linear scan up the pack
- pack_readv_requests.sort()
+ pb.update("Copied record", record_index, total_items)
+ for index, readv_vector, node_vector in readv_group_iter:
# copy the data
transport, path = index_map[index]
- reader = pack.make_readv_reader(transport, path,
- [offset[0:2] for offset in pack_readv_requests])
- for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
- izip(reader.iter_records(), pack_readv_requests):
+ reader = pack.make_readv_reader(transport, path, readv_vector)
+ for (names, read_func), (key, eol_flag, references) in \
+ izip(reader.iter_records(), node_vector):
raw_data = read_func(None)
version_id = key[-1]
if output_lines:
@@ -868,6 +851,43 @@
return text_index_map, self._pack_collection._index_contents(text_index_map,
self._text_filter)
+ def _least_readv_node_readv(self, nodes):
+ """Generate request groups for nodes using the least readv's.
+
+ :param nodes: An iterable of graph index nodes.
+ :return: Total node count and an iterator of the data needed to perform
+ readvs to obtain the data for nodes. Each item yielded by the
+ iterator is a tuple with:
+ index, readv_vector, node_vector. readv_vector is a list ready to
+ hand to the transport readv method, and node_vector is a list of
+ (key, eol_flag, references) for the the node retrieved by the
+ matching readv_vector.
+ """
+ # group by pack so we do one readv per pack
+ nodes = sorted(nodes)
+ total = len(nodes)
+ request_groups = {}
+ for index, key, value, references in nodes:
+ if index not in request_groups:
+ request_groups[index] = []
+ request_groups[index].append((key, value, references))
+ result = []
+ for index, items in request_groups.iteritems():
+ pack_readv_requests = []
+ for key, value, references in items:
+ # ---- KnitGraphIndex.get_position
+ bits = value[1:].split(' ')
+ offset, length = int(bits[0]), int(bits[1])
+ pack_readv_requests.append(
+ ((offset, length), (key, value[0], references)))
+ # linear scan up the pack to maximum range combining.
+ pack_readv_requests.sort()
+ # split out the readv and the node data.
+ pack_readv = [readv for readv, node in pack_readv_requests]
+ node_vector = [node for readv, node in pack_readv_requests]
+ result.append((index, pack_readv, node_vector))
+ return total, result
+
def _log_copied_texts(self):
if 'pack' in debug.debug_flags:
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
@@ -886,6 +906,15 @@
text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
self._text_filter = text_filter
+ def _sort_revisions(self, revision_nodes):
+ """Return the total revisions and the readv's to issue.
+
+ :param revision_nodes: The revision index contents for the packs being
+ incorporated into the new pack.
+ :return: As per _least_readv_node_readv.
+ """
+ return self._least_readv_node_readv(revision_nodes)
+
def _use_pack(self, new_pack):
"""Return True if new_pack should be used.
@@ -895,6 +924,40 @@
return new_pack.data_inserted()
+class OptimisingPacker(Packer):
+ """A packer which spends more time to create better disk layouts."""
+
+ def _sort_revisions(self, revision_nodes):
+ """Return the total revisions and the readv's to issue.
+
+ This sort places revisions in topological order with the ancestors
+ after the children.
+
+ :param revision_nodes: The revision index contents for the packs being
+ incorporated into the new pack.
+ :return: As per _least_readv_node_readv.
+ """
+ # build an ancestors dict
+ ancestors = {}
+ by_key = {}
+ for index, key, value, references in revision_nodes:
+ ancestors[key] = references[0]
+ by_key[key] = (index, value, references)
+ order = tsort.topo_sort(ancestors)
+ total = len(order)
+ # Single IO is pathological, but it will work as a starting point.
+ requests = []
+ for key in reversed(order):
+ index, value, references = by_key[key]
+ # ---- KnitGraphIndex.get_position
+ bits = value[1:].split(' ')
+ offset, length = int(bits[0]), int(bits[1])
+ requests.append(
+ (index, [(offset, length)], [(key, value[0], references)]))
+ # TODO: combine requests in the same index that are in ascending order.
+ return total, requests
+
+
class ReconcilePacker(Packer):
"""A packer which regenerates indices etc as it copies.
@@ -968,8 +1031,9 @@
del ideal_index
del text_nodes
# 3) bulk copy the ok data
- list(self._copy_nodes_graph(ok_nodes, text_index_map,
- self.new_pack._writer, self.new_pack.text_index))
+ total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes)
+ list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
+ self.new_pack.text_index, readv_group_iter, total_items))
# 4) adhoc copy all the other texts.
# We have to topologically insert all texts otherwise we can fail to
# reconcile when parts of a single delta chain are preserved intact,
@@ -1164,17 +1228,19 @@
self._execute_pack_operations(pack_operations)
return True
- def _execute_pack_operations(self, pack_operations):
+ def _execute_pack_operations(self, pack_operations, _packer_class=Packer):
"""Execute a series of pack operations.
:param pack_operations: A list of [revision_count, packs_to_combine].
+ :param _packer_class: The class of packer to use. If not supplied
+ Packer will be used.
:return: None.
"""
for revision_count, packs in pack_operations:
# we may have no-ops from the setup logic
if len(packs) == 0:
continue
- Packer(self, packs, '.autopack').pack()
+ _packer_class(self, packs, '.autopack').pack()
for pack in packs:
self._remove_pack_from_memory(pack)
# record the newly available packs and stop advertising the old
@@ -1197,6 +1263,9 @@
self.ensure_loaded()
total_packs = len(self._names)
if total_packs < 2:
+ # This is arguably wrong because we might not be optimal, but for
+ # now lets leave it in. (e.g. reconcile -> one pack. But not
+ # optimal.
return
total_revisions = self.revision_index.combined_index.key_count()
# XXX: the following may want to be a class, to pack with a given
@@ -1208,10 +1277,9 @@
pack_distribution = [1]
pack_operations = [[0, []]]
for pack in self.all_packs():
- revision_count = pack.get_revision_count()
- pack_operations[-1][0] += revision_count
+ pack_operations[-1][0] += pack.get_revision_count()
pack_operations[-1][1].append(pack)
- self._execute_pack_operations(pack_operations)
+ self._execute_pack_operations(pack_operations, OptimisingPacker)
def plan_autopack_combinations(self, existing_packs, pack_distribution):
"""Plan a pack operation.
=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py 2007-11-30 15:59:02 +0000
+++ b/bzrlib/tests/test_repository.py 2007-12-04 02:19:39 +0000
@@ -929,6 +929,26 @@
self.assertEqual(1, len(list(index.iter_all_entries())))
self.assertEqual(2, len(tree.branch.repository.all_revision_ids()))
+ def test_pack_layout(self):
+ format = self.get_format()
+ tree = self.make_branch_and_tree('.', format=format)
+ trans = tree.branch.repository.bzrdir.get_repository_transport(None)
+ tree.commit('start', rev_id='1')
+ tree.commit('more work', rev_id='2')
+ tree.branch.repository.pack()
+ tree.lock_read()
+ self.addCleanup(tree.unlock)
+ pack = tree.branch.repository._pack_collection.get_pack_by_name(
+ tree.branch.repository._pack_collection.names()[0])
+ # revision access tends to be tip->ancestor, so ordering that way on
+ # disk is a good idea.
+ for _1, key, val, refs in pack.revision_index.iter_all_entries():
+ if key == ('1',):
+ pos_1 = int(val[1:].split()[0])
+ else:
+ pos_2 = int(val[1:].split()[0])
+ self.assertTrue(pos_2 < pos_1)
+
def test_pack_repositories_support_multiple_write_locks(self):
format = self.get_format()
self.make_repository('.', shared=True, format=format)
More information about the bazaar-commits
mailing list