Rev 28: Automatically combine indices with a simple log2 backoff on revision count. in http://people.ubuntu.com/~robertc/baz2.0/plugins/search/trunk
Robert Collins
robertc at robertcollins.net
Fri Jun 13 00:35:49 BST 2008
At http://people.ubuntu.com/~robertc/baz2.0/plugins/search/trunk
------------------------------------------------------------
revno: 28
revision-id: robertc at robertcollins.net-20080612233548-flhaytd97uyvuzpf
parent: robertc at robertcollins.net-20080612134630-z0ijr6ocijamd66e
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Fri 2008-06-13 09:35:48 +1000
message:
Automatically combine indices with a simple log2 backoff on revision count.
modified:
index.py index.py-20080608055509-hnimeek7q8tctkqf-2
tests/test_index.py test_index.py-20080608055509-hnimeek7q8tctkqf-4
=== modified file 'index.py'
--- a/index.py 2008-06-12 13:46:30 +0000
+++ b/index.py 2008-06-12 23:35:48 +0000
@@ -17,6 +17,7 @@
"""The core logic for search."""
+import math
import md5
import re
@@ -205,8 +206,8 @@
def _index_revisions(self, locked_branch, revisions_to_index):
"""Helper for indexed_revisions."""
- # TODO: split into groups of <reasonable memory size> for which we
- # then:
+ if not revisions_to_index:
+ return
_ensure_regexes()
graph = locked_branch.repository.get_graph()
parent_map = graph.get_parent_map(revisions_to_index)
@@ -229,20 +230,25 @@
builder.add_revision(rev_id)
self._add_index(builder)
- def _add_index(self, builder):
- """Add a new component index to the list of indices."""
+ def _add_index(self, builder, to_remove=None, allow_pack=True):
+ """Add a new component index to the list of indices.
+
+ :param builder: A component builder supporting the upload_index call.
+ :param to_remove: An optional iterable of components to remove.
+ :param allow_pack: Whether an auto pack is permitted by this operation.
+ """
# The index name is the md5sum of the revision index serialised form.
index_name, index_value, elements = builder.upload_index(
self._upload_transport)
if index_name in self._current_names:
raise Exception("md5 collision! rad! %s" % index_name)
- # The indices are uploaded, we only need to rename to activate.
- self._refresh_indices()
- if index_name in self._current_names:
- raise Exception(
- "md5 collision with concurrent writer! rad! %s" % index_name)
+ # The component is uploaded, we only need to rename to activate.
self._lock.lock_write()
try:
+ self._refresh_indices(to_remove=to_remove)
+ if index_name in self._current_names:
+ raise Exception(
+ "md5 collision with concurrent writer! rad! %s" % index_name)
# Serialise the index list
new_names = InMemoryGraphIndex(0, 1)
new_names.add_node((index_name,), index_value, ())
@@ -257,12 +263,45 @@
index = ComponentIndex(index_name, index_value,
self._indices_transport)
self._orig_names[index_name] = (index_value, index)
+ # Cleanup obsoleted if needed, if we are removing things.
+ if to_remove:
+ self._obsolete_transport.delete_multi(
+ self._obsolete_transport.list_dir('.'))
finally:
self._lock.unlock()
+ # Move any no-longer-referenced packs out of the indices to the
+ # obsoleted area.
+ if to_remove:
+ for component in to_remove:
+ relpath = component.name + '.pack'
+ self._indices_transport.rename(relpath,
+ '../obsolete/' + relpath)
# Add in-memory
self._add_index_to_memory(index_name, index_value, index)
# Its safely inserted. Trigger a pack ?
-
+ if not allow_pack:
+ return
+ total_revisions = self._revision_index.key_count()
+ max_components = int(math.ceil(math.log(total_revisions, 2)))
+ if max_components < 1:
+ max_components = 1
+ excess = len(self._current_names) - max_components
+ if not excess:
+ return
+ old_components = []
+ for name, (value, component) in self._current_names.iteritems():
+ old_components.append((component.revision_index.key_count(), name))
+ old_components.sort()
+ del old_components[excess + 1:]
+ components = [self._current_names[name][1] for length, name in
+ old_components]
+ # Note: we don't recurse here because of two things:
+ # B) we don't want to regress infinitely; a flag to _add_index would do
+ # this.
+ # C) We need to remove components too.
+ combiner = ComponentCombiner(components, self._upload_transport)
+ self._add_index(combiner, to_remove=components, allow_pack=False)
+
def _add_index_to_memory(self, name, value, index):
"""Add an index (with meta-value value) to the in-memory index list."""
self._current_names[name] = (value, index)
@@ -276,12 +315,19 @@
for node in self._revision_index.iter_all_entries():
yield node[1]
- def _refresh_indices(self):
- """Merge on-disk index lists into the memory top level index list."""
+ def _refresh_indices(self, to_remove=None):
+ """Merge on-disk index lists into the memory top level index list.
+
+ :param to_remove: An optional list of components to remove from memory
+ even if they are still listed on disk.
+ """
names = GraphIndex(self._transport, 'names', None)
new_names = {}
merged_names = {}
deleted_names = set()
+ if to_remove:
+ for component in to_remove:
+ deleted_names.add(component.name)
added_names = set()
same_names = set()
for node in names.iter_all_entries():
@@ -312,6 +358,7 @@
del self._term_doc_indices[index.term_index]
self._term_indices.remove(index.term_index)
self._revision_indices.remove(index.revision_index)
+ del self._current_names[name]
def search(self, termlist):
"""Trivial set-based search of the index.
@@ -819,3 +866,8 @@
posting_info = node[2]
term_set = self.terms.setdefault(term, set())
term_set.add((component, posting_info))
+
+ def upload_index(self, upload_transport):
+ """Thunk for use by Index._add_index."""
+ self.transport = upload_transport
+ return self.combine()
=== modified file 'tests/test_index.py'
--- a/tests/test_index.py 2008-06-12 13:46:30 +0000
+++ b/tests/test_index.py 2008-06-12 23:35:48 +0000
@@ -81,6 +81,52 @@
rev_index = index.index_url(self.get_url('foo'))
self.assertEqual(set([(revid,)]), set(rev_index.indexed_revisions()))
+ def test_index_combining(self):
+ # After inserting 1 revision, we get one pack,
+ # After 2 we should still have 1, but also two discards
+ # 3 should map to 2 packs, as should 4 (but with 2 discard)
+ # To test: we create four revisions:
+ tree = self.make_branch_and_tree('foo')
+ revid1 = tree.commit('1')
+ revid2 = tree.commit('2')
+ revid3 = tree.commit('3')
+ revid4 = tree.commit('4')
+ rev_index = index.init_index(tree.branch)
+ def get_names():
+ return [name + '.pack' for name in rev_index._current_names]
+ # index one revision
+ rev_index.index_revisions(tree.branch, [revid1])
+ self.assertEqual(1, len(rev_index._current_names))
+ names = get_names()
+ # index the second, should pack
+ rev_index.index_revisions(tree.branch, [revid2])
+ self.assertEqual(1, len(rev_index._current_names))
+ obsolete_names = rev_index._obsolete_transport.list_dir('.')
+ # There should be two - the old name and one more.
+ self.assertSubset(names, obsolete_names)
+ self.assertEqual(2, len(obsolete_names))
+ names = get_names()
+ # index the third, should not pack, and not clean obsoletes, and leave
+ # the existing pack in place.
+ rev_index.index_revisions(tree.branch, [revid3])
+ self.assertEqual(2, len(rev_index._current_names))
+ self.assertEqual(obsolete_names,
+ rev_index._obsolete_transport.list_dir('.'))
+ # new names should be the pack for revid3
+ new_names = set(get_names()) - set(names)
+ self.assertEqual(1, len(new_names))
+ # index the fourth, which should pack the new name and the fourth one
+ # stil leaving the previous one untouched, should clean obsoletes and
+ # put what was new on three into it
+ rev_index.index_revisions(tree.branch, [revid4])
+ self.assertEqual(2, len(rev_index._current_names))
+ obsolete_names = rev_index._obsolete_transport.list_dir('.')
+ # the revid3 pack should have been obsoleted
+ self.assertSubset(new_names, obsolete_names)
+ self.assertEqual(2, len(obsolete_names))
+ new_names = set(get_names()) - set(names)
+ self.assertEqual(1, len(new_names))
+
class TestIndexRevisions(TestCaseWithTransport):
"""Tests for indexing of a set of revisions."""
More information about the bazaar-commits
mailing list