Rev 3071: * ``bzr pack`` now orders revision texts in topological order, with newest in http://people.ubuntu.com/~robertc/baz2.0/pack

Robert Collins robertc at robertcollins.net
Tue Dec 4 02:19:58 GMT 2007


At http://people.ubuntu.com/~robertc/baz2.0/pack

------------------------------------------------------------
revno: 3071
revision-id:robertc at robertcollins.net-20071204021939-883w43jh8fuy1mzf
parent: pqm at pqm.ubuntu.com-20071203210338-3w0ryakegm0xopp0
committer: Robert Collins <robertc at robertcollins.net>
branch nick: pack
timestamp: Tue 2007-12-04 13:19:39 +1100
message:
  * ``bzr pack`` now orders revision texts in topological order, with newest
    at the start of the file, promoting linear reads for ``bzr log`` and the
    like. This partially fixes #154129. (Robert Collins)
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
  bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
=== modified file 'NEWS'
--- a/NEWS	2007-12-03 17:03:10 +0000
+++ b/NEWS	2007-12-04 02:19:39 +0000
@@ -12,6 +12,10 @@
    * ``bzr commit`` now doesn't print the revision number twice. (Matt
      Nordhoff, #172612)
 
+   * ``bzr pack`` now orders revision texts in topological order, with newest
+     at the start of the file, promoting linear reads for ``bzr log`` and the
+     like. This partially fixes #154129. (Robert Collins)
+
    * Support logging single merge revisions with short and line log formatters.
      (Kent Gibson)
 

=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2007-12-03 19:26:40 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2007-12-04 02:19:39 +0000
@@ -610,8 +610,9 @@
         revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
         # copy revision keys and adjust values
         self.pb.update("Copying revision texts", 1)
-        list(self._copy_nodes_graph(revision_nodes, revision_index_map,
-            self.new_pack._writer, self.new_pack.revision_index))
+        total_items, readv_group_iter = self._sort_revisions(revision_nodes)
+        list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
+            self.new_pack.revision_index, readv_group_iter, total_items))
         if 'pack' in debug.debug_flags:
             mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
                 time.ctime(), self._pack_collection._upload_transport.base,
@@ -639,8 +640,10 @@
         # XXX: Should be a helper function to allow different inv representation
         # at this point.
         self.pb.update("Copying inventory texts", 2)
-        inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
-            self.new_pack._writer, self.new_pack.inventory_index, output_lines=True)
+        total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
+        inv_lines = self._copy_nodes_graph(inventory_index_map,
+            self.new_pack._writer, self.new_pack.inventory_index,
+            readv_group_iter, total_items, output_lines=True)
         if self.revision_ids:
             self._process_inventory_lines(inv_lines)
         else:
@@ -675,8 +678,9 @@
                     a_missing_key[0])
         # copy text keys and adjust values
         self.pb.update("Copying content texts", 3)
-        list(self._copy_nodes_graph(text_nodes, text_index_map,
-            self.new_pack._writer, self.new_pack.text_index))
+        total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
+        list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
+            self.new_pack.text_index, readv_group_iter, total_items))
         self._log_copied_texts()
 
     def _check_references(self):
@@ -787,8 +791,8 @@
                 pb.update("Copied record", record_index)
                 record_index += 1
 
-    def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
-        output_lines=False):
+    def _copy_nodes_graph(self, index_map, writer, write_index,
+        readv_group_iter, total_items, output_lines=False):
         """Copy knit nodes between packs.
 
         :param output_lines: Return lines present in the copied data as
@@ -796,8 +800,8 @@
         """
         pb = ui.ui_factory.nested_progress_bar()
         try:
-            for result in self._do_copy_nodes_graph(nodes, index_map, writer,
-                write_index, output_lines, pb):
+            for result in self._do_copy_nodes_graph(index_map, writer,
+                write_index, output_lines, pb, readv_group_iter, total_items):
                 yield result
         except Exception:
             # Python 2.4 does not permit try:finally: in a generator.
@@ -806,42 +810,21 @@
         else:
             pb.finished()
 
-    def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
-        output_lines, pb):
+    def _do_copy_nodes_graph(self, index_map, writer, write_index,
+        output_lines, pb, readv_group_iter, total_items):
         # for record verification
         knit_data = _KnitData(None)
         # for line extraction when requested (inventories only)
         if output_lines:
             factory = knit.KnitPlainFactory()
-        # plan a readv on each source pack:
-        # group by pack
-        nodes = sorted(nodes)
-        # how to map this into knit.py - or knit.py into this?
-        # we don't want the typical knit logic, we want grouping by pack
-        # at this point - perhaps a helper library for the following code 
-        # duplication points?
-        request_groups = {}
         record_index = 0
-        pb.update("Copied record", record_index, len(nodes))
-        for index, key, value, references in nodes:
-            if index not in request_groups:
-                request_groups[index] = []
-            request_groups[index].append((key, value, references))
-        for index, items in request_groups.iteritems():
-            pack_readv_requests = []
-            for key, value, references in items:
-                # ---- KnitGraphIndex.get_position
-                bits = value[1:].split(' ')
-                offset, length = int(bits[0]), int(bits[1])
-                pack_readv_requests.append((offset, length, (key, value[0], references)))
-            # linear scan up the pack
-            pack_readv_requests.sort()
+        pb.update("Copied record", record_index, total_items)
+        for index, readv_vector, node_vector in readv_group_iter:
             # copy the data
             transport, path = index_map[index]
-            reader = pack.make_readv_reader(transport, path,
-                [offset[0:2] for offset in pack_readv_requests])
-            for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
-                izip(reader.iter_records(), pack_readv_requests):
+            reader = pack.make_readv_reader(transport, path, readv_vector)
+            for (names, read_func), (key, eol_flag, references) in \
+                izip(reader.iter_records(), node_vector):
                 raw_data = read_func(None)
                 version_id = key[-1]
                 if output_lines:
@@ -868,6 +851,43 @@
         return text_index_map, self._pack_collection._index_contents(text_index_map,
             self._text_filter)
 
+    def _least_readv_node_readv(self, nodes):
+        """Generate request groups for nodes using the least readv's.
+        
+        :param nodes: An iterable of graph index nodes.
+        :return: Total node count and an iterator of the data needed to perform
+            readvs to obtain the data for nodes. Each item yielded by the
+            iterator is a tuple with:
+            index, readv_vector, node_vector. readv_vector is a list ready to
+            hand to the transport readv method, and node_vector is a list of
+            (key, eol_flag, references) for the the node retrieved by the
+            matching readv_vector.
+        """
+        # group by pack so we do one readv per pack
+        nodes = sorted(nodes)
+        total = len(nodes)
+        request_groups = {}
+        for index, key, value, references in nodes:
+            if index not in request_groups:
+                request_groups[index] = []
+            request_groups[index].append((key, value, references))
+        result = []
+        for index, items in request_groups.iteritems():
+            pack_readv_requests = []
+            for key, value, references in items:
+                # ---- KnitGraphIndex.get_position
+                bits = value[1:].split(' ')
+                offset, length = int(bits[0]), int(bits[1])
+                pack_readv_requests.append(
+                    ((offset, length), (key, value[0], references)))
+            # linear scan up the pack to maximum range combining.
+            pack_readv_requests.sort()
+            # split out the readv and the node data.
+            pack_readv = [readv for readv, node in pack_readv_requests]
+            node_vector = [node for readv, node in pack_readv_requests]
+            result.append((index, pack_readv, node_vector))
+        return total, result
+
     def _log_copied_texts(self):
         if 'pack' in debug.debug_flags:
             mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
@@ -886,6 +906,15 @@
             text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
         self._text_filter = text_filter
 
+    def _sort_revisions(self, revision_nodes):
+        """Return the total revisions and the readv's to issue.
+
+        :param revision_nodes: The revision index contents for the packs being
+            incorporated into the new pack.
+        :return: As per _least_readv_node_readv.
+        """
+        return self._least_readv_node_readv(revision_nodes)
+
     def _use_pack(self, new_pack):
         """Return True if new_pack should be used.
 
@@ -895,6 +924,40 @@
         return new_pack.data_inserted()
 
 
+class OptimisingPacker(Packer):
+    """A packer which spends more time to create better disk layouts."""
+
+    def _sort_revisions(self, revision_nodes):
+        """Return the total revisions and the readv's to issue.
+
+        This sort places revisions in topological order with the ancestors
+        after the children.
+
+        :param revision_nodes: The revision index contents for the packs being
+            incorporated into the new pack.
+        :return: As per _least_readv_node_readv.
+        """
+        # build an ancestors dict
+        ancestors = {}
+        by_key = {}
+        for index, key, value, references in revision_nodes:
+            ancestors[key] = references[0]
+            by_key[key] = (index, value, references)
+        order = tsort.topo_sort(ancestors)
+        total = len(order)
+        # Single IO is pathological, but it will work as a starting point.
+        requests = []
+        for key in reversed(order):
+            index, value, references = by_key[key]
+            # ---- KnitGraphIndex.get_position
+            bits = value[1:].split(' ')
+            offset, length = int(bits[0]), int(bits[1])
+            requests.append(
+                (index, [(offset, length)], [(key, value[0], references)]))
+        # TODO: combine requests in the same index that are in ascending order.
+        return total, requests
+
+
 class ReconcilePacker(Packer):
     """A packer which regenerates indices etc as it copies.
     
@@ -968,8 +1031,9 @@
         del ideal_index
         del text_nodes
         # 3) bulk copy the ok data
-        list(self._copy_nodes_graph(ok_nodes, text_index_map,
-            self.new_pack._writer, self.new_pack.text_index))
+        total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes)
+        list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
+            self.new_pack.text_index, readv_group_iter, total_items))
         # 4) adhoc copy all the other texts.
         # We have to topologically insert all texts otherwise we can fail to
         # reconcile when parts of a single delta chain are preserved intact,
@@ -1164,17 +1228,19 @@
         self._execute_pack_operations(pack_operations)
         return True
 
-    def _execute_pack_operations(self, pack_operations):
+    def _execute_pack_operations(self, pack_operations, _packer_class=Packer):
         """Execute a series of pack operations.
 
         :param pack_operations: A list of [revision_count, packs_to_combine].
+        :param _packer_class: The class of packer to use. If not supplied
+            Packer will be used.
         :return: None.
         """
         for revision_count, packs in pack_operations:
             # we may have no-ops from the setup logic
             if len(packs) == 0:
                 continue
-            Packer(self, packs, '.autopack').pack()
+            _packer_class(self, packs, '.autopack').pack()
             for pack in packs:
                 self._remove_pack_from_memory(pack)
         # record the newly available packs and stop advertising the old
@@ -1197,6 +1263,9 @@
         self.ensure_loaded()
         total_packs = len(self._names)
         if total_packs < 2:
+            # This is arguably wrong because we might not be optimal, but for
+            # now lets leave it in. (e.g. reconcile -> one pack. But not
+            # optimal.
             return
         total_revisions = self.revision_index.combined_index.key_count()
         # XXX: the following may want to be a class, to pack with a given
@@ -1208,10 +1277,9 @@
         pack_distribution = [1]
         pack_operations = [[0, []]]
         for pack in self.all_packs():
-            revision_count = pack.get_revision_count()
-            pack_operations[-1][0] += revision_count
+            pack_operations[-1][0] += pack.get_revision_count()
             pack_operations[-1][1].append(pack)
-        self._execute_pack_operations(pack_operations)
+        self._execute_pack_operations(pack_operations, OptimisingPacker)
 
     def plan_autopack_combinations(self, existing_packs, pack_distribution):
         """Plan a pack operation.

=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py	2007-11-30 15:59:02 +0000
+++ b/bzrlib/tests/test_repository.py	2007-12-04 02:19:39 +0000
@@ -929,6 +929,26 @@
         self.assertEqual(1, len(list(index.iter_all_entries())))
         self.assertEqual(2, len(tree.branch.repository.all_revision_ids()))
 
+    def test_pack_layout(self):
+        format = self.get_format()
+        tree = self.make_branch_and_tree('.', format=format)
+        trans = tree.branch.repository.bzrdir.get_repository_transport(None)
+        tree.commit('start', rev_id='1')
+        tree.commit('more work', rev_id='2')
+        tree.branch.repository.pack()
+        tree.lock_read()
+        self.addCleanup(tree.unlock)
+        pack = tree.branch.repository._pack_collection.get_pack_by_name(
+            tree.branch.repository._pack_collection.names()[0])
+        # revision access tends to be tip->ancestor, so ordering that way on 
+        # disk is a good idea.
+        for _1, key, val, refs in pack.revision_index.iter_all_entries():
+            if key == ('1',):
+                pos_1 = int(val[1:].split()[0])
+            else:
+                pos_2 = int(val[1:].split()[0])
+        self.assertTrue(pos_2 < pos_1)
+
     def test_pack_repositories_support_multiple_write_locks(self):
         format = self.get_format()
         self.make_repository('.', shared=True, format=format)



More information about the bazaar-commits mailing list