Rev 27: Create logic to combine index components. in http://people.ubuntu.com/~robertc/baz2.0/plugins/search/trunk
Robert Collins
robertc at robertcollins.net
Thu Jun 12 14:46:30 BST 2008
At http://people.ubuntu.com/~robertc/baz2.0/plugins/search/trunk
------------------------------------------------------------
revno: 27
revision-id: robertc at robertcollins.net-20080612134630-z0ijr6ocijamd66e
parent: robertc at robertcollins.net-20080612041922-krshze0sflcghw92
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Thu 2008-06-12 23:46:30 +1000
message:
Create logic to combine index components.
modified:
index.py index.py-20080608055509-hnimeek7q8tctkqf-2
tests/test_index.py test_index.py-20080608055509-hnimeek7q8tctkqf-4
=== modified file 'index.py'
--- a/index.py 2008-06-12 04:08:43 +0000
+++ b/index.py 2008-06-12 13:46:30 +0000
@@ -170,26 +170,9 @@
self._refresh_indices()
result = {}
for value, component in self._current_names.values():
- index = component.term_index
- for node in index.iter_all_entries():
- # XXX: Duplicated logic with search().
- term = node[1][0]
- term_id, posting_count, posting_start, posting_length = \
- node[2].split(" ")
- posting_count = int(posting_count)
- posting_start = int(posting_start)
- posting_length = int(posting_length)
- posting_stop = posting_start + posting_length
- post_name = "term_list." + term_id
- filemap = {post_name:(posting_start, posting_stop)}
- view = FileView(self._indices_transport,
- component.name + '.pack', filemap)
- post_index = GraphIndex(view, post_name, posting_length)
- doc_ids = set([node[1][0] for node in
- post_index.iter_all_entries()])
- doc_ids = [(index, doc_id) for doc_id in doc_ids]
- posting_list = result.setdefault(term, set())
- posting_list.update(self._document_ids_to_keys(doc_ids))
+ terms = component.all_terms()
+ for term, posting_list in terms.iteritems():
+ result.setdefault(term, set()).update(posting_list)
return result.iteritems()
def _document_ids_to_keys(self, document_ids):
@@ -371,7 +354,7 @@
while common_doc_keys and term_info:
_, term_id, posting_start, posting_length = term_info.pop(0)
posting_stop = posting_start + posting_length
- post_name = component.name + "." + term_id
+ post_name = "term_list." + term_id
filemap = {post_name:(posting_start, posting_stop)}
view = FileView(self._indices_transport,
component.name + '.pack', filemap)
@@ -528,18 +511,49 @@
self.term_index = term_index
self.document_index = doc_index
self.name = name
-
-
-class ComponentIndexBuilder(object):
- """Creates a component index."""
-
- def __init__(self):
- self.document_index = InMemoryGraphIndex(0, 1)
- self._document_ids = {}
- self.terms = {}
- self.revision_index = InMemoryGraphIndex(0, 1)
- self.posting_lists = {}
-
+ self.transport = transport
+
+ def all_terms(self):
+ """As per Index, but for a single component."""
+ result = {}
+ for node in self.term_index.iter_all_entries():
+ # XXX: Duplicated logic with search().
+ term = node[1][0]
+ term_id, posting_count, posting_start, posting_length = \
+ node[2].split(" ")
+ posting_count = int(posting_count)
+ posting_start = int(posting_start)
+ posting_length = int(posting_length)
+ posting_stop = posting_start + posting_length
+ post_name = "term_list." + term_id
+ filemap = {post_name:(posting_start, posting_stop)}
+ view = FileView(self.transport, self.name + '.pack', filemap)
+ post_index = GraphIndex(view, post_name, posting_length)
+ doc_ids = set([node[1] for node in
+ post_index.iter_all_entries()])
+ posting_list = set(self._document_ids_to_keys(doc_ids))
+ result[term] = posting_list
+ return result
+
+ def _document_ids_to_keys(self, doc_ids):
+ """Expand document ids to keys.
+
+ :param document_ids: An iterable of doc_id tuples.
+ :result: An iterable of document keys.
+ """
+ indices = {}
+ for node in self.document_index.iter_entries(doc_ids):
+ yield tuple(node[2].split(' '))
+
+ def indexed_revisions(self):
+ """Return the revision_keys that this index contains terms for."""
+ for node in self.revision_index.iter_all_entries():
+ yield node[1]
+
+
+class ComponentCreator(object):
+ """Base class for classes that create ComponentIndices."""
+
def add_document(self, document_key):
"""Add a document key to the index.
@@ -553,6 +567,38 @@
self._document_ids[document_key] = next_id
return next_id
+ def _add_index_to_pack(self, index, name, writer, index_bytes=None):
+ """Add an index to a pack.
+
+ This ensures the index is encoded as plain bytes in the pack allowing
+ arbitrary readvs.
+
+ :param index: The index to write to the pack.
+ :param name: The name of the index in the pack.
+ :param writer: a ContainerWriter.
+ :param index_bytes: Optional - the contents of the serialised index.
+ :return: A start, length tuple for reading the index back from the
+ pack.
+ """
+ if index_bytes is None:
+ index_bytes = index.finish().read()
+ pos, size = writer.add_bytes_record(index_bytes, [(name,)])
+ length = len(index_bytes)
+ offset = size - length
+ start = pos + offset
+ return start, length
+
+
+class ComponentIndexBuilder(ComponentCreator):
+ """Creates a component index."""
+
+ def __init__(self):
+ self.document_index = InMemoryGraphIndex(0, 1)
+ self._document_ids = {}
+ self.terms = {}
+ self.revision_index = InMemoryGraphIndex(0, 1)
+ self.posting_lists = {}
+
def add_term(self, term, posting_list):
"""Add a term to the index.
@@ -599,27 +645,6 @@
except KeyError:
return None
- def _add_index_to_pack(self, index, name, writer, index_bytes=None):
- """Add an index to a pack.
-
- This ensures the index is encoded as plain bytes in the pack allowing
- arbitrary readvs.
-
- :param index: The index to write to the pack.
- :param name: The name of the index in the pack.
- :param writer: a ContainerWriter.
- :param index_bytes: Optional - the contents of the serialised index.
- :return: A start, length tuple for reading the index back from the
- pack.
- """
- if index_bytes is None:
- index_bytes = index.finish().read()
- pos, size = writer.add_bytes_record(index_bytes, [(name,)])
- length = len(index_bytes)
- offset = size - length
- start = pos + offset
- return start, length
-
def upload_index(self, upload_transport):
"""Upload the index in preparation for insertion.
@@ -648,6 +673,10 @@
posting_name = "term_list." + term_id
start, length = self._add_index_to_pack(post_index, posting_name,
writer)
+ # The below can be factored out and reused with the
+ # ComponentCombiner if we get rid of self.terms and use terms
+ # directly until we serialise the posting lists, rather than
+ # assigning ids aggressively.
# How many document ids, and the range for the file view when we
# read the pack later.
term_value = "%s %d %d %d" % (term_id, len(posting_list), start,
@@ -662,3 +691,131 @@
term_length, doc_start, doc_length)
elements = [index_name + ".pack"]
return index_name, index_value, elements
+
+
+class ComponentCombiner(ComponentCreator):
+ """Combines components into a new single larger component."""
+
+ def __init__(self, components, transport):
+ """Create a combiner.
+
+ :param components: An iterable of components.
+ :param transport: A transport to upload the combined component to.
+ :return: A tuple - the component name, the value for the names file,
+ and the elements list for the component.
+ """
+ self.components = list(components)
+ self.transport = transport
+
+ def _copy_documents(self):
+ """Copy the document references from components to a new component.
+
+ This popules self.component_docid with the mappings from each
+ component's document ids to the output document ids.
+ """
+ self._document_ids = {}
+ self.document_index = InMemoryGraphIndex(0, 1)
+ self.component_docids = {}
+ for component in self.components:
+ component_docs = {}
+ self.component_docids[component] = component_docs
+ for node in component.document_index.iter_all_entries():
+ # duplication with _document_ids_to_keys
+ document_key = tuple(node[2].split(' '))
+ doc_id = self.add_document(document_key)
+ # Map from the old doc id to the new doc it
+ component_docs[node[1]] = doc_id
+ self.doc_start, self.doc_length = self._add_index_to_pack(
+ self.document_index, "documents", self.writer)
+ # Clear out used objects
+ del self._document_ids
+ del self.document_index
+
+ def _copy_posting_lists(self):
+ """Copy the posting lists from components to the new component.
+
+ This uses self.component_docid to map document ids across efficiently,
+ and self.terms to determine what to copy from.
+ It populates self.term_index as it progresses.
+ """
+ term_index = InMemoryGraphIndex(0, 1)
+ for term, posting_lists in self.terms.iteritems():
+ posting_list = set()
+ for component, posting_line in posting_lists:
+ elements = posting_line.split(' ')
+ _, term_id, posting_start, posting_length = elements
+ posting_start = int(posting_start)
+ posting_length = int(posting_length)
+ posting_stop = posting_start + posting_length
+ post_name = "term_list." + term_id
+ filemap = {post_name:(posting_start, posting_stop)}
+ view = FileView(component.transport,
+ component.name + '.pack', filemap)
+ post_index = GraphIndex(view, post_name, posting_length)
+ doc_mapping = self.component_docids[component]
+ for node in post_index.iter_all_entries():
+ posting_list.add(doc_mapping[node[1]])
+ post_index = InMemoryGraphIndex(0, 1)
+ for doc_id in posting_list:
+ post_index.add_node((doc_id,), '', ())
+ term_id = str(term_index.key_count())
+ start, length = self._add_index_to_pack(
+ post_index, "term_list." + term_id, self.writer)
+ # How many document ids, and the range for the file view when we
+ # read the pack later.
+ term_value = "%s %d %d %d" % (term_id, len(posting_list), start,
+ length)
+ term_index.add_node((term,), term_value, ())
+ self.term_index = term_index
+ # Clear out used objects
+ del self.terms
+ del self.component_docids
+
+ def _copy_revisions(self):
+ """Copy the revisions from components to a new component.
+
+ This also creates the writer.
+ """
+ # Merge revisions:
+ revisions = set()
+ for component in self.components:
+ for node in component.revision_index.iter_all_entries():
+ revisions.add(node[1])
+ revision_index = InMemoryGraphIndex(0, 1)
+ for revision in revisions:
+ revision_index.add_node(revision, '', ())
+ index_bytes = revision_index.finish().read()
+ self.index_name = md5.new(index_bytes).hexdigest()
+ self.write_stream = self.transport.open_write_stream(
+ self.index_name + ".pack")
+ self.writer = ContainerWriter(self.write_stream.write)
+ self.writer.begin()
+ self.rev_start, self.rev_length = self._add_index_to_pack(
+ revision_index, "revisions", self.writer, index_bytes)
+
+ def combine(self):
+ """Combine the components."""
+ # Note on memory pressue: deleting the source index caches
+ # as soon as they are copied would reduce memory pressure.
+ self._copy_revisions()
+ self._copy_documents()
+ self._scan_terms()
+ self._copy_posting_lists()
+ self.term_start, self.term_length = self._add_index_to_pack(
+ self.term_index, "terms", self.writer)
+ self.writer.end()
+ self.write_stream.close()
+ index_value = "%d %d %d %d %d %d" % (self.rev_start, self.rev_length,
+ self.term_start, self.term_length, self.doc_start, self.doc_length)
+ elements = [self.index_name + ".pack"]
+ return self.index_name, index_value, elements
+
+ def _scan_terms(self):
+ """Scan the terms in all components to prepare to copy posting lists."""
+ self.terms = {}
+ for component in self.components:
+ for node in component.term_index.iter_all_entries():
+ term = node[1][0]
+ posting_info = node[2]
+ term_set = self.terms.setdefault(term, set())
+ term_set.add((component, posting_info))
=== modified file 'tests/test_index.py'
--- a/tests/test_index.py 2008-06-11 08:29:50 +0000
+++ b/tests/test_index.py 2008-06-12 13:46:30 +0000
@@ -228,3 +228,38 @@
self.assertEqual({}, builder.terms)
nodes = sorted(list(builder.revision_index.iter_all_entries()))
self.assertEqual([(builder.revision_index, ("foo",), "")], nodes)
+
+
+class TestComponentCombiner(TestCaseWithTransport):
+
+ def test_combine_two_components_overlapping_data(self):
+ # create one component:
+ transport = self.get_transport()
+ components = []
+ builder = index.ComponentIndexBuilder()
+ builder.add_revision('rev1')
+ builder.add_revision('rev-common')
+ builder.add_term("term1", [('r', '', 'rev1'), ('r', '', 'rev-common')])
+ builder.add_term("term-common", [('r', '', 'rev1'), ('r', '', 'rev-common')])
+ name, value, elements = builder.upload_index(transport)
+ component1 = index.ComponentIndex(name, value, transport)
+ components.append(component1)
+ builder = index.ComponentIndexBuilder()
+ builder.add_revision('rev-common')
+ builder.add_revision('rev2')
+ builder.add_term("term-common", [('r', '', 'rev2'), ('r', '', 'rev-common')])
+ builder.add_term("term2", [('r', '', 'rev2'), ('r', '', 'other-revid')])
+ name, value, elements = builder.upload_index(transport)
+ component2 = index.ComponentIndex(name, value, transport)
+ components.append(component2)
+ combiner = index.ComponentCombiner(components, transport)
+ name, value, elements = combiner.combine()
+ combined = index.ComponentIndex(name, value, transport)
+ terms = {}
+ terms['term-common'] = set([('r', '', 'rev-common'), ('r', '', 'rev1'),
+ ('r', '', 'rev2')])
+ terms['term1'] = set([('r', '', 'rev-common'), ('r', '', 'rev1')])
+ terms['term2'] = set([('r', '', 'other-revid'), ('r', '', 'rev2')])
+ self.assertEqual(terms, combined.all_terms())
+ self.assertEqual(set([('rev1',), ('rev2',), ('rev-common',)]),
+ set(combined.indexed_revisions()))
More information about the bazaar-commits
mailing list