Rev 28: Automatically combine indices with a simple log2 backoff on revision count. in http://people.ubuntu.com/~robertc/baz2.0/plugins/search/trunk

Robert Collins robertc at robertcollins.net
Fri Jun 13 00:35:49 BST 2008


At http://people.ubuntu.com/~robertc/baz2.0/plugins/search/trunk

------------------------------------------------------------
revno: 28
revision-id: robertc at robertcollins.net-20080612233548-flhaytd97uyvuzpf
parent: robertc at robertcollins.net-20080612134630-z0ijr6ocijamd66e
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Fri 2008-06-13 09:35:48 +1000
message:
  Automatically combine indices with a simple log2 backoff on revision count.
modified:
  index.py                       index.py-20080608055509-hnimeek7q8tctkqf-2
  tests/test_index.py            test_index.py-20080608055509-hnimeek7q8tctkqf-4
=== modified file 'index.py'
--- a/index.py	2008-06-12 13:46:30 +0000
+++ b/index.py	2008-06-12 23:35:48 +0000
@@ -17,6 +17,7 @@
 
 """The core logic for search."""
 
+import math
 import md5
 import re
 
@@ -205,8 +206,8 @@
 
     def _index_revisions(self, locked_branch, revisions_to_index):
         """Helper for indexed_revisions."""
-        # TODO: split into groups of <reasonable memory size> for which we
-        # then:
+        if not revisions_to_index:
+            return
         _ensure_regexes()
         graph = locked_branch.repository.get_graph()
         parent_map = graph.get_parent_map(revisions_to_index)
@@ -229,20 +230,25 @@
                 builder.add_revision(rev_id)
             self._add_index(builder)
 
-    def _add_index(self, builder):
-        """Add a new component index to the list of indices."""
+    def _add_index(self, builder, to_remove=None, allow_pack=True):
+        """Add a new component index to the list of indices.
+        
+        :param builder: A component builder supporting the upload_index call.
+        :param to_remove: An optional iterable of components to remove.
+        :param allow_pack: Whether an auto pack is permitted by this operation.
+        """
         # The index name is the md5sum of the revision index serialised form.
         index_name, index_value, elements = builder.upload_index(
             self._upload_transport)
         if index_name in self._current_names:
             raise Exception("md5 collision! rad! %s" % index_name)
-        # The indices are uploaded, we only need to rename to activate.
-        self._refresh_indices()
-        if index_name in self._current_names:
-            raise Exception(
-                "md5 collision with concurrent writer! rad! %s" % index_name)
+        # The component is uploaded, we only need to rename to activate.
         self._lock.lock_write()
         try:
+            self._refresh_indices(to_remove=to_remove)
+            if index_name in self._current_names:
+                raise Exception(
+                    "md5 collision with concurrent writer! rad! %s" % index_name)
             # Serialise the index list
             new_names = InMemoryGraphIndex(0, 1)
             new_names.add_node((index_name,), index_value, ())
@@ -257,12 +263,45 @@
             index = ComponentIndex(index_name, index_value,
                 self._indices_transport)
             self._orig_names[index_name] = (index_value, index)
+            # Cleanup obsoleted if needed, if we are removing things.
+            if to_remove:
+                self._obsolete_transport.delete_multi(
+                    self._obsolete_transport.list_dir('.'))
         finally:
             self._lock.unlock()
+        # Move any no-longer-referenced packs out of the indices to the
+        # obsoleted area.
+        if to_remove:
+            for component in to_remove:
+                relpath = component.name + '.pack'
+                self._indices_transport.rename(relpath,
+                    '../obsolete/' + relpath)
         # Add in-memory
         self._add_index_to_memory(index_name, index_value, index)
         # Its safely inserted. Trigger a pack ?
-
+        if not allow_pack:
+            return
+        total_revisions = self._revision_index.key_count()
+        max_components = int(math.ceil(math.log(total_revisions, 2)))
+        if max_components < 1:
+            max_components = 1
+        excess = len(self._current_names) - max_components
+        if not excess:
+            return
+        old_components = []
+        for name, (value, component) in self._current_names.iteritems():
+            old_components.append((component.revision_index.key_count(), name))
+        old_components.sort()
+        del old_components[excess + 1:]
+        components = [self._current_names[name][1] for length, name in
+            old_components]
+        # Note: we don't recurse here because of two things:
+        # B) we don't want to regress infinitely; a flag to _add_index would do
+        # this.
+        # C) We need to remove components too.
+        combiner = ComponentCombiner(components, self._upload_transport)
+        self._add_index(combiner, to_remove=components, allow_pack=False)
+        
     def _add_index_to_memory(self, name, value, index):
         """Add an index (with meta-value value) to the in-memory index list."""
         self._current_names[name] = (value, index)
@@ -276,12 +315,19 @@
         for node in self._revision_index.iter_all_entries():
             yield node[1]
 
-    def _refresh_indices(self):
-        """Merge on-disk index lists into the memory top level index list."""
+    def _refresh_indices(self, to_remove=None):
+        """Merge on-disk index lists into the memory top level index list.
+        
+        :param to_remove: An optional list of components to remove from memory
+            even if they are still listed on disk.
+        """
         names = GraphIndex(self._transport, 'names', None)
         new_names = {}
         merged_names = {}
         deleted_names = set()
+        if to_remove:
+            for component in to_remove:
+                deleted_names.add(component.name)
         added_names = set()
         same_names = set()
         for node in names.iter_all_entries():
@@ -312,6 +358,7 @@
         del self._term_doc_indices[index.term_index]
         self._term_indices.remove(index.term_index)
         self._revision_indices.remove(index.revision_index)
+        del self._current_names[name]
 
     def search(self, termlist):
         """Trivial set-based search of the index.
@@ -819,3 +866,8 @@
                 posting_info = node[2]
                 term_set = self.terms.setdefault(term, set())
                 term_set.add((component, posting_info))
+
+    def upload_index(self, upload_transport):
+        """Thunk for use by Index._add_index."""
+        self.transport = upload_transport
+        return self.combine()

=== modified file 'tests/test_index.py'
--- a/tests/test_index.py	2008-06-12 13:46:30 +0000
+++ b/tests/test_index.py	2008-06-12 23:35:48 +0000
@@ -81,6 +81,52 @@
         rev_index = index.index_url(self.get_url('foo'))
         self.assertEqual(set([(revid,)]), set(rev_index.indexed_revisions()))
 
+    def test_index_combining(self):
+        # After inserting 1 revision, we get one pack,
+        # After 2 we should still have 1, but also two discards
+        # 3 should map to 2 packs, as should 4 (but with 2 discard)
+        # To test: we create four revisions:
+        tree = self.make_branch_and_tree('foo')
+        revid1 = tree.commit('1')
+        revid2 = tree.commit('2')
+        revid3 = tree.commit('3')
+        revid4 = tree.commit('4')
+        rev_index = index.init_index(tree.branch)
+        def get_names():
+            return [name + '.pack' for name in rev_index._current_names]
+        # index one revision
+        rev_index.index_revisions(tree.branch, [revid1])
+        self.assertEqual(1, len(rev_index._current_names))
+        names = get_names()
+        # index the second, should pack
+        rev_index.index_revisions(tree.branch, [revid2])
+        self.assertEqual(1, len(rev_index._current_names))
+        obsolete_names = rev_index._obsolete_transport.list_dir('.')
+        # There should be two - the old name and one more.
+        self.assertSubset(names, obsolete_names)
+        self.assertEqual(2, len(obsolete_names))
+        names = get_names()
+        # index the third, should not pack, and not clean obsoletes, and leave
+        # the existing pack in place.
+        rev_index.index_revisions(tree.branch, [revid3])
+        self.assertEqual(2, len(rev_index._current_names))
+        self.assertEqual(obsolete_names,
+            rev_index._obsolete_transport.list_dir('.'))
+        # new names should be the pack for revid3
+        new_names = set(get_names()) - set(names)
+        self.assertEqual(1, len(new_names))
+        # index the fourth, which should pack the new name and the fourth one
+        # stil leaving the previous one untouched, should clean obsoletes and
+        # put what was new on three into it
+        rev_index.index_revisions(tree.branch, [revid4])
+        self.assertEqual(2, len(rev_index._current_names))
+        obsolete_names = rev_index._obsolete_transport.list_dir('.')
+        # the revid3 pack should have been obsoleted
+        self.assertSubset(new_names, obsolete_names)
+        self.assertEqual(2, len(obsolete_names))
+        new_names = set(get_names()) - set(names)
+        self.assertEqual(1, len(new_names))
+
 
 class TestIndexRevisions(TestCaseWithTransport):
     """Tests for indexing of a set of revisions."""




More information about the bazaar-commits mailing list