Rev 27: Create logic to combine index components. in http://people.ubuntu.com/~robertc/baz2.0/plugins/search/trunk

Robert Collins robertc at robertcollins.net
Thu Jun 12 14:46:30 BST 2008


At http://people.ubuntu.com/~robertc/baz2.0/plugins/search/trunk

------------------------------------------------------------
revno: 27
revision-id: robertc at robertcollins.net-20080612134630-z0ijr6ocijamd66e
parent: robertc at robertcollins.net-20080612041922-krshze0sflcghw92
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Thu 2008-06-12 23:46:30 +1000
message:
  Create logic to combine index components.
modified:
  index.py                       index.py-20080608055509-hnimeek7q8tctkqf-2
  tests/test_index.py            test_index.py-20080608055509-hnimeek7q8tctkqf-4
=== modified file 'index.py'
--- a/index.py	2008-06-12 04:08:43 +0000
+++ b/index.py	2008-06-12 13:46:30 +0000
@@ -170,26 +170,9 @@
         self._refresh_indices()
         result = {}
         for value, component in self._current_names.values():
-            index = component.term_index
-            for node in index.iter_all_entries():
-                # XXX: Duplicated logic with search().
-                term = node[1][0]
-                term_id, posting_count, posting_start, posting_length = \
-                    node[2].split(" ")
-                posting_count = int(posting_count)
-                posting_start = int(posting_start)
-                posting_length = int(posting_length)
-                posting_stop = posting_start + posting_length
-                post_name = "term_list." + term_id
-                filemap = {post_name:(posting_start, posting_stop)}
-                view = FileView(self._indices_transport,
-                    component.name + '.pack', filemap)
-                post_index = GraphIndex(view, post_name, posting_length)
-                doc_ids = set([node[1][0] for node in
-                    post_index.iter_all_entries()])
-                doc_ids = [(index, doc_id) for doc_id in doc_ids]
-                posting_list = result.setdefault(term, set())
-                posting_list.update(self._document_ids_to_keys(doc_ids))
+            terms = component.all_terms()
+            for term, posting_list in terms.iteritems():
+                result.setdefault(term, set()).update(posting_list)
         return result.iteritems()
 
     def _document_ids_to_keys(self, document_ids):
@@ -371,7 +354,7 @@
             while common_doc_keys and term_info:
                 _, term_id, posting_start, posting_length = term_info.pop(0)
                 posting_stop = posting_start + posting_length
-                post_name = component.name + "." + term_id
+                post_name = "term_list." + term_id
                 filemap = {post_name:(posting_start, posting_stop)}
                 view = FileView(self._indices_transport,
                     component.name + '.pack', filemap)
@@ -528,18 +511,49 @@
         self.term_index = term_index
         self.document_index = doc_index
         self.name = name
-
-
-class ComponentIndexBuilder(object):
-    """Creates a component index."""
-
-    def __init__(self):
-        self.document_index = InMemoryGraphIndex(0, 1)
-        self._document_ids = {}
-        self.terms = {}
-        self.revision_index = InMemoryGraphIndex(0, 1)
-        self.posting_lists = {}
-
+        self.transport = transport
+
+    def all_terms(self):
+        """As per Index, but for a single component."""
+        result = {}
+        for node in self.term_index.iter_all_entries():
+            # XXX: Duplicated logic with search().
+            term = node[1][0]
+            term_id, posting_count, posting_start, posting_length = \
+                node[2].split(" ")
+            posting_count = int(posting_count)
+            posting_start = int(posting_start)
+            posting_length = int(posting_length)
+            posting_stop = posting_start + posting_length
+            post_name = "term_list." + term_id
+            filemap = {post_name:(posting_start, posting_stop)}
+            view = FileView(self.transport, self.name + '.pack', filemap)
+            post_index = GraphIndex(view, post_name, posting_length)
+            doc_ids = set([node[1] for node in
+                post_index.iter_all_entries()])
+            posting_list = set(self._document_ids_to_keys(doc_ids))
+            result[term] = posting_list
+        return result
+
+    def _document_ids_to_keys(self, doc_ids):
+        """Expand document ids to keys.
+
+        :param document_ids: An iterable of doc_id tuples.
+        :result: An iterable of document keys.
+        """
+        indices = {}
+        for node in self.document_index.iter_entries(doc_ids):
+            yield tuple(node[2].split(' '))
+
+    def indexed_revisions(self):
+        """Return the revision_keys that this index contains terms for."""
+        for node in self.revision_index.iter_all_entries():
+            yield node[1]
+
+
+class ComponentCreator(object):
+    """Base class for classes that create ComponentIndices."""
+    
     def add_document(self, document_key):
         """Add a document key to the index.
 
@@ -553,6 +567,38 @@
         self._document_ids[document_key] = next_id
         return next_id
 
+    def _add_index_to_pack(self, index, name, writer, index_bytes=None):
+        """Add an index to a pack.
+
+        This ensures the index is encoded as plain bytes in the pack allowing
+        arbitrary readvs.
+
+        :param index: The index to write to the pack.
+        :param name: The name of the index in the pack.
+        :param writer: a ContainerWriter.
+        :param index_bytes: Optional - the contents of the serialised index.
+        :return: A start, length tuple for reading the index back from the
+            pack.
+        """
+        if index_bytes is None:
+            index_bytes = index.finish().read()
+        pos, size = writer.add_bytes_record(index_bytes, [(name,)])
+        length = len(index_bytes)
+        offset = size - length
+        start = pos + offset
+        return start, length
+
+
+class ComponentIndexBuilder(ComponentCreator):
+    """Creates a component index."""
+
+    def __init__(self):
+        self.document_index = InMemoryGraphIndex(0, 1)
+        self._document_ids = {}
+        self.terms = {}
+        self.revision_index = InMemoryGraphIndex(0, 1)
+        self.posting_lists = {}
+
     def add_term(self, term, posting_list):
         """Add a term to the index.
 
@@ -599,27 +645,6 @@
         except KeyError:
             return None
 
-    def _add_index_to_pack(self, index, name, writer, index_bytes=None):
-        """Add an index to a pack.
-
-        This ensures the index is encoded as plain bytes in the pack allowing
-        arbitrary readvs.
-
-        :param index: The index to write to the pack.
-        :param name: The name of the index in the pack.
-        :param writer: a ContainerWriter.
-        :param index_bytes: Optional - the contents of the serialised index.
-        :return: A start, length tuple for reading the index back from the
-            pack.
-        """
-        if index_bytes is None:
-            index_bytes = index.finish().read()
-        pos, size = writer.add_bytes_record(index_bytes, [(name,)])
-        length = len(index_bytes)
-        offset = size - length
-        start = pos + offset
-        return start, length
-
     def upload_index(self, upload_transport):
         """Upload the index in preparation for insertion.
 
@@ -648,6 +673,10 @@
             posting_name = "term_list." + term_id
             start, length = self._add_index_to_pack(post_index, posting_name,
                 writer)
+            # The below can be factored out and reused with the
+            # ComponentCombiner if we get rid of self.terms and use terms
+            # directly until we serialise the posting lists, rather than
+            # assigning ids aggressively.
             # How many document ids, and the range for the file view when we
             # read the pack later.
             term_value = "%s %d %d %d" % (term_id, len(posting_list), start,
@@ -662,3 +691,131 @@
             term_length, doc_start, doc_length)
         elements = [index_name + ".pack"]
         return index_name, index_value, elements
+
+
+class ComponentCombiner(ComponentCreator):
+    """Combines components into a new single larger component."""
+
+    def __init__(self, components, transport):
+        """Create a combiner.
+
+        :param components: An iterable of components.
+        :param transport: A transport to upload the combined component to.
+        :return: A tuple - the component name, the value for the names file,
+            and the elements list for the component.
+        """
+        self.components = list(components)
+        self.transport = transport
+    
+    def _copy_documents(self):
+        """Copy the document references from components to a new component.
+        
+        This popules self.component_docid with the mappings from each
+        component's document ids to the output document ids.
+        """
+        self._document_ids = {}
+        self.document_index = InMemoryGraphIndex(0, 1)
+        self.component_docids = {}
+        for component in self.components:
+            component_docs = {}
+            self.component_docids[component] = component_docs
+            for node in component.document_index.iter_all_entries():
+                # duplication with _document_ids_to_keys
+                document_key = tuple(node[2].split(' '))
+                doc_id = self.add_document(document_key)
+                # Map from the old doc id to the new doc it
+                component_docs[node[1]] = doc_id
+        self.doc_start, self.doc_length = self._add_index_to_pack(
+            self.document_index, "documents", self.writer)
+        # Clear out used objects
+        del self._document_ids
+        del self.document_index
+
+    def _copy_posting_lists(self):
+        """Copy the posting lists from components to the new component.
+        
+        This uses self.component_docid to map document ids across efficiently,
+        and self.terms to determine what to copy from.
+        It populates self.term_index as it progresses.
+        """
+        term_index = InMemoryGraphIndex(0, 1)
+        for term, posting_lists in self.terms.iteritems():
+            posting_list = set()
+            for component, posting_line in posting_lists:
+                elements = posting_line.split(' ')
+                _, term_id, posting_start, posting_length = elements
+                posting_start = int(posting_start)
+                posting_length = int(posting_length)
+                posting_stop = posting_start + posting_length
+                post_name = "term_list." + term_id
+                filemap = {post_name:(posting_start, posting_stop)}
+                view = FileView(component.transport,
+                    component.name + '.pack', filemap)
+                post_index = GraphIndex(view, post_name, posting_length)
+                doc_mapping = self.component_docids[component]
+                for node in post_index.iter_all_entries():
+                    posting_list.add(doc_mapping[node[1]])
+            post_index = InMemoryGraphIndex(0, 1)
+            for doc_id in posting_list:
+                post_index.add_node((doc_id,), '', ())
+            term_id = str(term_index.key_count())
+            start, length = self._add_index_to_pack(
+                post_index, "term_list." + term_id, self.writer)
+            # How many document ids, and the range for the file view when we
+            # read the pack later.
+            term_value = "%s %d %d %d" % (term_id, len(posting_list), start,
+                length)
+            term_index.add_node((term,), term_value, ())
+        self.term_index = term_index
+        # Clear out used objects
+        del self.terms
+        del self.component_docids
+
+    def _copy_revisions(self):
+        """Copy the revisions from components to a new component.
+        
+        This also creates the writer.
+        """
+        # Merge revisions:
+        revisions = set()
+        for component in self.components:
+            for node in component.revision_index.iter_all_entries():
+                revisions.add(node[1])
+        revision_index = InMemoryGraphIndex(0, 1)
+        for revision in revisions:
+            revision_index.add_node(revision, '', ())
+        index_bytes = revision_index.finish().read()
+        self.index_name = md5.new(index_bytes).hexdigest()
+        self.write_stream = self.transport.open_write_stream(
+            self.index_name + ".pack")
+        self.writer = ContainerWriter(self.write_stream.write)
+        self.writer.begin()
+        self.rev_start, self.rev_length = self._add_index_to_pack(
+            revision_index, "revisions", self.writer, index_bytes)
+
+    def combine(self):
+        """Combine the components."""
+        # Note on memory pressue: deleting the source index caches
+        # as soon as they are copied would reduce memory pressure.
+        self._copy_revisions()
+        self._copy_documents()
+        self._scan_terms()
+        self._copy_posting_lists()
+        self.term_start, self.term_length = self._add_index_to_pack(
+            self.term_index, "terms", self.writer)
+        self.writer.end()
+        self.write_stream.close()
+        index_value = "%d %d %d %d %d %d" % (self.rev_start, self.rev_length,
+            self.term_start, self.term_length, self.doc_start, self.doc_length)
+        elements = [self.index_name + ".pack"]
+        return self.index_name, index_value, elements
+
+    def _scan_terms(self):
+        """Scan the terms in all components to prepare to copy posting lists."""
+        self.terms = {}
+        for component in self.components:
+            for node in component.term_index.iter_all_entries():
+                term = node[1][0]
+                posting_info = node[2]
+                term_set = self.terms.setdefault(term, set())
+                term_set.add((component, posting_info))

=== modified file 'tests/test_index.py'
--- a/tests/test_index.py	2008-06-11 08:29:50 +0000
+++ b/tests/test_index.py	2008-06-12 13:46:30 +0000
@@ -228,3 +228,38 @@
         self.assertEqual({}, builder.terms)
         nodes = sorted(list(builder.revision_index.iter_all_entries()))
         self.assertEqual([(builder.revision_index, ("foo",), "")], nodes)
+
+
+class TestComponentCombiner(TestCaseWithTransport):
+
+    def test_combine_two_components_overlapping_data(self):
+        # create one component:
+        transport = self.get_transport()
+        components = []
+        builder = index.ComponentIndexBuilder()
+        builder.add_revision('rev1')
+        builder.add_revision('rev-common')
+        builder.add_term("term1", [('r', '', 'rev1'), ('r', '', 'rev-common')])
+        builder.add_term("term-common", [('r', '', 'rev1'), ('r', '', 'rev-common')])
+        name, value, elements = builder.upload_index(transport)
+        component1 = index.ComponentIndex(name, value, transport)
+        components.append(component1)
+        builder = index.ComponentIndexBuilder()
+        builder.add_revision('rev-common')
+        builder.add_revision('rev2')
+        builder.add_term("term-common", [('r', '', 'rev2'), ('r', '', 'rev-common')])
+        builder.add_term("term2", [('r', '', 'rev2'), ('r', '', 'other-revid')])
+        name, value, elements = builder.upload_index(transport)
+        component2 = index.ComponentIndex(name, value, transport)
+        components.append(component2)
+        combiner = index.ComponentCombiner(components, transport)
+        name, value, elements = combiner.combine()
+        combined = index.ComponentIndex(name, value, transport)
+        terms = {}
+        terms['term-common'] = set([('r', '', 'rev-common'), ('r', '', 'rev1'),
+            ('r', '', 'rev2')])
+        terms['term1'] = set([('r', '', 'rev-common'), ('r', '', 'rev1')])
+        terms['term2'] = set([('r', '', 'other-revid'), ('r', '', 'rev2')])
+        self.assertEqual(terms, combined.all_terms())
+        self.assertEqual(set([('rev1',), ('rev2',), ('rev-common',)]),
+            set(combined.indexed_revisions()))




More information about the bazaar-commits mailing list