Rev 12: Split the term list and doc id list into separate indices, to provide better scaling (so in the future we don't have to pay overhead to resolve documents we end up not referencing). in http://people.ubuntu.com/~robertc/baz2.0/plugins/search/trunk

Robert Collins robertc at robertcollins.net
Mon Jun 9 08:37:27 BST 2008


At http://people.ubuntu.com/~robertc/baz2.0/plugins/search/trunk

------------------------------------------------------------
revno: 12
revision-id: robertc at robertcollins.net-20080609073726-h6urz1q3hnjz1a9k
parent: robertc at robertcollins.net-20080608234217-e3v0a9e8kak9jas7
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Mon 2008-06-09 17:37:26 +1000
message:
  Split the term list and doc id list into separate indices, to provide better scaling (so in the future we don't have to pay overhead to resolve documents we end up not referencing).
modified:
  index.py                       index.py-20080608055509-hnimeek7q8tctkqf-2
  tests/test_index.py            test_index.py-20080608055509-hnimeek7q8tctkqf-4
=== modified file 'index.py'
--- a/index.py	2008-06-08 23:42:17 +0000
+++ b/index.py	2008-06-09 07:37:26 +0000
@@ -122,10 +122,14 @@
         self._indices_transport = self._transport.clone('indices')
         if _FORMAT_1 != format:
             raise UnknownFormatError(format, 'bzr-search')
-        self._indices = []
         self._orig_names = {}
         self._current_names = {}
-        self._index = CombinedGraphIndex(self._indices)
+        self._revision_indices = []
+        self._term_indices = []
+        self._term_doc_indices = {}
+        self._revision_index = CombinedGraphIndex(self._revision_indices)
+        # because terms may occur in many component indices, we don't use a 
+        # CombinedGraphIndex for grouping the term indices or doc indices.
         self._lock = LockDir(index_transport, 'names-lock')
 
     def _add_terms(self, index, terms):
@@ -133,16 +137,14 @@
 
         A term is a single index key suffix (e.g. 'first').
         A posting list is an iterable of full index keys (e.g.
-        ('r', 'x', REVID) for a revision, or ('t', FILEID, REVID) for a file
+        ('r', '', REVID) for a revision, or ('t', FILEID, REVID) for a file
         text.)
 
-        :param index: A GraphIndexBuilder.
+        :param index: A ComponentIndexBuilder.
         :param terms: An iterable of term -> posting list.
         """
         for term, posting_list in terms:
-            # term, ignored, term-value
-            term_key = ('t', 'x', term)
-            index.add_node(term_key, '', (posting_list,))
+            index.add_term(term, posting_list)
 
     def all_terms(self):
         """Return an iterable of all the posting lists in the index.
@@ -150,10 +152,30 @@
         :return: An iterator of (term -> document ids).
         """
         self._refresh_indices()
-        for node in self._index.iter_entries_prefix([('t', 'x', None)]):
-            term = node[1][2]
-            posting_list = node[3][0]
-            yield term, posting_list
+        result = {}
+        for index in self._term_indices:
+            for node in index.iter_all_entries():
+                term = node[1][0]
+                doc_ids = [(node[0], doc_id) for doc_id in node[2].split(' ')]
+                posting_list = result.setdefault(term, set())
+                posting_list.update(self._document_ids_to_keys(doc_ids))
+        return result.iteritems()
+
+    def _document_ids_to_keys(self, document_ids):
+        """Expand document ids to keys.
+
+        :param document_ids: An iterable of (index, doc_id) tuples.
+        :result: An iterable of document keys.
+        """
+        indices = {}
+        # group by index
+        for index, doc_id in document_ids:
+            doc_ids = indices.setdefault(index, set())
+            doc_ids.add((doc_id,))
+        for index, doc_ids in indices.items():
+            doc_index = self._term_doc_indices[index]
+            for node in doc_index.iter_entries(doc_ids):
+                yield tuple(node[2].split(' '))
 
     def index_revisions(self, branch, revisions_to_index):
         """Index some revisions from branch.
@@ -175,52 +197,66 @@
         # here: index inventory/paths
         # here: index revisions
         _ensure_regexes()
-        index = InMemoryGraphIndex(1, 3)
+        index = ComponentIndexBuilder()
         terms = self._terms_for_revs(locked_branch.repository,
             revisions_to_index)
         self._add_terms(index, terms)
         for rev_id in revisions_to_index:
-            # (metadata field, revision, id)
-            index.add_node(('r', 'x', rev_id), '', ((),))
+            index.add_revision(rev_id)
+        self._add_index(index)
+
+    def _add_index(self, index):
+        """Add a new component index to the list of indices."""
+        # The index name is the md5sum of the revision index serialised form.
+        rev_index_bytes = index.revision_index.finish().read()
+        index_name = md5.new(rev_index_bytes).hexdigest()
+        if index_name in self._current_names:
+            raise Exception("md5 collision! rad! %s" % index_name)
+        # Upload preparatory to renaming into place.
         # write to disc.
-        index_bytes = index.finish().read()
-        index_name = md5.new(index_bytes).hexdigest()
-        index_value = "%d" % len(index_bytes)
-        self._upload_transport.put_bytes_non_atomic(index_name, index_bytes)
-        self._add_index(index, index_name, index_value)
-
-    def _add_index(self, index, name, value):
-        """Add an index to the list of indices."""
-        index_name = name
-        if name in self._current_names:
-            raise Exception("md5 collision! rad! %s" % name)
+        index_value = "%d" % len(rev_index_bytes)
+        self._upload_transport.put_bytes_non_atomic(index_name + ".rix",
+            rev_index_bytes)
+        doc_length = self._upload_transport.put_file_non_atomic(
+            index_name + '.dix', index.document_index.finish())
+        term_length = self._upload_transport.put_file_non_atomic(
+            index_name + '.tix', index.term_index.finish())
+        # The indices are uploaded, we only need to rename to activate.
         self._refresh_indices()
-        if name in self._current_names:
-            raise Exception("md5 collision with concurrent writer! rad! %s" % name)
+        if index_name in self._current_names:
+            raise Exception(
+                "md5 collision with concurrent writer! rad! %s" % index_name)
         self._lock.lock_write()
         try:
             # Serialise the index list
             new_names = InMemoryGraphIndex(0, 1)
-            new_names.add_node((name,), value, ())
+            new_names.add_node((index_name,), index_value, ())
             for name, (value, index) in self._current_names.items():
                 new_names.add_node((name,), value, ())
             # Now, as the last step, rename the new index into place and update
             # the disk list of names.
-            self._upload_transport.rename(index_name, '../indices/' + index_name)
+            for suffix in [".rix", ".dix", ".tix"]:
+                self._upload_transport.rename(index_name + suffix,
+                    '../indices/' + index_name + suffix)
             self._transport.put_file('names', new_names.finish())
         finally:
             self._lock.unlock()
         # Add in-memory
+        self._add_index_to_memory(index_name, index_value, index)
+        # Its safely inserted. Trigger a pack ?
+
+    def _add_index_to_memory(self, name, value, index):
+        """Add an index (with meta-value value) to the in-memory index list."""
         self._current_names[name] = (value, index)
-        # Allow it to be searched
-        self._indices.append(index)
-        # Its safely inserted. Trigger a pack ?
+        self._revision_indices.append(index.revision_index)
+        self._term_indices.append(index.term_index)
+        self._term_doc_indices[index.term_index] = index.document_index
 
     def indexed_revisions(self):
         """Return the revision_keys that this index contains terms for."""
         self._refresh_indices()
-        for node in self._index.iter_entries_prefix([('r', 'x', None)]):
-            yield node[1][2:3]
+        for node in self._revision_index.iter_all_entries():
+            yield node[1]
 
     def _refresh_indices(self):
         """Merge on-disk index lists into the memory top level index list."""
@@ -243,17 +279,23 @@
                 # in our last read; not in memory anymore:
                 deleted_names.add(name)
                 # XXX perhaps cross-check the size?
-        # in case of exception, have no change until we are finished:
-        current_names = dict(self._current_names.items())
         for name in added_names:
-            # we need an index object:
-            # NB: we disable bisection because in this get-it-going layout
-            # post-lists may be >>>> than the bisection code is prepared for.
-            index = GraphIndex(self._indices_transport, name, None)
-            self._current_names[name] = (new_names[name][0], index)
-            self._indices.append(index)
-        self._current_names = current_names
-        self._orig_names = new_names
+            # TODO: byte length of the indices here.
+            rev_index = GraphIndex(self._indices_transport, name + '.rix', None)
+            term_index = GraphIndex(self._indices_transport, name + '.tix', None)
+            doc_index = GraphIndex(self._indices_transport, name + '.dix', None)
+            component = ComponentIndex(rev_index, term_index, doc_index)
+            value = new_names[name][0]
+            self._add_index_to_memory(name, value, component)
+        for name in deleted_names:
+            self._remove_component_from_memory(name)
+
+    def _remove_component_from_memory(self, name):
+        """Remove the component name from the index list in memory."""
+        index = self._current_names[name][1]
+        del self._term_doc_indices[index.term_index]
+        self._term_indices.remove(index.term_index)
+        self._revision_indices.remove(index.revision_index)
 
     def search(self, termlist):
         """Trivial set-based search of the index.
@@ -297,7 +339,8 @@
             # properties (todo - names only?)
             # bugfixes (a property we know how to read)
             # other filters?
-            commit_terms = _tokeniser_re.split(revision.message)
+            message_utf8 = revision.message.encode('utf8')
+            commit_terms = _tokeniser_re.split(message_utf8)
             for term in commit_terms:
                 if not term:
                     continue
@@ -320,3 +363,66 @@
         """The name of the document found, for human consumption."""
         # Perhaps need to utf_decode this?
         return "Revision id '%s'." % self.revision_key[0]
+
+
+class ComponentIndex(object):
+    """A single component in the aggregate search index.
+
+    This has a revision index listing the indexed revisions, a term index 
+    with the indexed terms and there posting lists (as a list of document ids),
+    and finally a document id index which maps document ids to document keys.
+    """
+
+    def __init__(self, revision_index, term_index, document_index):
+        """Create a ComponentIndex.
+
+        :param revision_index: The revision index for this component.
+        :param term_index: The term index for this component.
+        :param document_index: The document index for this component.
+        """
+        self.revision_index = revision_index
+        self.term_index = term_index
+        self.document_index = document_index
+
+
+class ComponentIndexBuilder(object):
+    """Creates a component index."""
+
+    def __init__(self):
+        self.document_index = InMemoryGraphIndex(0, 1)
+        self._document_ids = {}
+        self.term_index = InMemoryGraphIndex(0, 1)
+        self.revision_index = InMemoryGraphIndex(0, 1)
+
+    def add_document(self, document_key):
+        """Add a document key to the index.
+
+        :param document_key: A document key e.g. ('r', '', 'some-rev-id').
+        :return: The document id allocated within this index.
+        """
+        if document_key in self._document_ids:
+            return self._document_ids[document_key]
+        next_id = str(self.document_index.key_count())
+        self.document_index.add_node((next_id,), "%s %s %s" % document_key, ())
+        self._document_ids[document_key] = next_id
+        return next_id
+
+    def add_term(self, term, posting_list):
+        """Add a term to the index.
+
+        :param term: A term, e.g. 'foo'.
+        :param posting_list: A list of the document_key's that this term
+            indexes.
+        :return: None.
+        """
+        document_ids = []
+        for document_key in posting_list:
+            document_ids.append(self.add_document(document_key))
+        document_ids.sort()
+        if type(term) != str:
+            raise ValueError("terms must be bytestrings at this layer %r" % term)
+        self.term_index.add_node((term,), " ".join(document_ids), ())
+
+    def add_revision(self, revision_id):
+        """List a revision as having been indexed by this index."""
+        self.revision_index.add_node((revision_id,), '',  ())

=== modified file 'tests/test_index.py'
--- a/tests/test_index.py	2008-06-08 23:42:17 +0000
+++ b/tests/test_index.py	2008-06-09 07:37:26 +0000
@@ -138,3 +138,51 @@
         self.assertEqualDiff(u"Revision id 'foo'.",
             result.document_name())
         self.assertEqual(('foo',), result.revision_key)
+
+
+class TestComponentIndexBuilder(TestCaseWithTransport):
+
+    def test_documents(self):
+        builder = index.ComponentIndexBuilder()
+        self.assertEqual("0", builder.add_document(('r', '', 'revid')))
+        self.assertEqual("1", builder.add_document(('r', '', 'other-revid')))
+        self.assertEqual("0", builder.add_document(('r', '', 'revid')))
+        doc_index = builder.document_index
+        nodes = sorted(list(doc_index.iter_all_entries()))
+        self.assertEqual([(doc_index, ("0",), "r  revid"),
+            (doc_index, ("1",), "r  other-revid")], nodes)
+
+    def test_posting_list(self):
+        builder = index.ComponentIndexBuilder()
+        # adding a term adds its documents
+        builder.add_term("term1", [('r', '', 'revid'), ('r', '', 'other-revid')])
+        doc_index = builder.document_index
+        nodes = sorted(list(doc_index.iter_all_entries()))
+        self.assertEqual([(doc_index, ("0",), "r  revid"),
+            (doc_index, ("1",), "r  other-revid")], nodes)
+        term_index = builder.term_index
+        # and the term refers to document ids
+        self.assertEqual([(term_index, ("term1",), "0 1")],
+            sorted(list(term_index.iter_all_entries())))
+        # adding a new term adds unique documents
+        builder.add_term("term2", [('r', '', 'revid'), ('r', '', 'third-revid')])
+        nodes = sorted(list(doc_index.iter_all_entries()))
+        # and refers to the correct ids
+        self.assertEqual([(doc_index, ("0",), "r  revid"),
+            (doc_index, ("1",), "r  other-revid"),
+            (doc_index, ("2",), "r  third-revid")], nodes)
+        self.assertEqual([(term_index, ("term1",), "0 1"),
+            (term_index, ("term2",), "0 2")],
+            sorted(list(term_index.iter_all_entries())))
+
+    def test_add_revision(self):
+        builder = index.ComponentIndexBuilder()
+        # adding a revision lists the revision, does not alter document keys
+        # etc.
+        builder.add_revision('foo')
+        nodes = sorted(list(builder.document_index.iter_all_entries()))
+        self.assertEqual([], nodes)
+        nodes = sorted(list(builder.term_index.iter_all_entries()))
+        self.assertEqual([], nodes)
+        nodes = sorted(list(builder.revision_index.iter_all_entries()))
+        self.assertEqual([(builder.revision_index, ("foo",), "")], nodes)




More information about the bazaar-commits mailing list