Rev 31: Implement spooling to disk for indices with more than 100000 nodes (which is configurable via the constructor.) in http://people.ubuntu.com/~robertc/baz2.0/plugins/index2/trunk

Robert Collins robertc at robertcollins.net
Sun Jul 13 18:39:11 BST 2008


At http://people.ubuntu.com/~robertc/baz2.0/plugins/index2/trunk

------------------------------------------------------------
revno: 31
revision-id: robertc at robertcollins.net-20080713173905-dxz6v9ke8iynar4l
parent: robertc at robertcollins.net-20080713115155-k1nu89eo932mpp1l
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Mon 2008-07-14 03:39:05 +1000
message:
  Implement spooling to disk for indices with more than 100000 nodes (which is configurable via the constructor.)
modified:
  btree_index.py                 index.py-20080624222253-p0x5f92uyh5hw734-7
  tests/test_btree_index.py      test_index.py-20080624222253-p0x5f92uyh5hw734-13
=== modified file 'btree_index.py'
--- a/btree_index.py	2008-07-13 11:51:55 +0000
+++ b/btree_index.py	2008-07-13 17:39:05 +0000
@@ -30,8 +30,10 @@
 from bzrlib import debug, index, lru_cache, osutils, trace
 from bzrlib import errors as bzrerrors
 from bzrlib.index import _OPTION_NODE_REFS, _OPTION_KEY_ELEMENTS, _OPTION_LEN
+from bzrlib.osutils import basename, dirname
 from bzrlib.plugins.index2 import errors, chunk_writer
 from bzrlib.plugins.pybloom.pybloom import BloomSHA1
+from bzrlib.transport import get_transport
 
 
 _BTSIGNATURE = "B+Tree Graph Index 1\n"
@@ -178,6 +180,58 @@
     VALUE          := no-newline-no-null-bytes
     """
 
+    def __init__(self, reference_lists=0, key_elements=1, spill_at=100000):
+        """See GraphIndexBuilder.__init__.
+
+        :param spill_at: Optional parameter controlling the maximum number
+            of nodes that BTreeBuilder will hold in memory.
+        """
+        index.GraphIndexBuilder.__init__(self, reference_lists=reference_lists,
+            key_elements=key_elements)
+        self._spill_at = spill_at
+        self._backing_indices = []
+
+    def add_node(self, key, value, references=()):
+        """Add a node to the index.
+
+        If adding the node causes the builder to reach its spill_at threshold,
+        disk spilling will be triggered.
+
+        :param key: The key. keys are non-empty tuples containing
+            as many whitespace-free utf8 bytestrings as the key length
+            defined for this index.
+        :param references: An iterable of iterables of keys. Each is a
+            reference to another key.
+        :param value: The value to associate with the key. It may be any
+            bytes as long as it does not contain \0 or \n.
+        """
+        index.GraphIndexBuilder.add_node(self, key, value, references=references)
+        if len(self._keys) < self._spill_at:
+            return
+        iterators_to_combine = [iter(sorted(self._iter_mem_nodes()))]
+        pos = -1
+        for pos, backing in enumerate(self._backing_indices):
+            if backing is None:
+                pos -= 1
+                break
+            iterators_to_combine.append(backing.iter_all_entries())
+        backing_pos = pos + 1
+        new_backing_file, size = \
+            self._write_nodes(self._iter_smallest(iterators_to_combine))
+        new_backing = BTreeGraphIndex(
+            get_transport(dirname(new_backing_file.name)),
+            basename(new_backing_file.name), size)
+        # GC will clean up the file
+        new_backing._file = new_backing_file
+        if len(self._backing_indices) == backing_pos:
+            self._backing_indices.append(None)
+        self._backing_indices[backing_pos] = new_backing
+        for pos in range(backing_pos):
+            self._backing_indices[pos] = None
+        self._keys = set()
+        self._nodes = {}
+        self._nodes_by_key = {}
+
     def add_nodes(self, nodes):
         """Add nodes to the index.
 
@@ -201,6 +255,33 @@
                 if not absent:
                     yield self, key, value
 
+    def _iter_smallest(self, iterators_to_combine):
+        current_values = []
+        for iterator in iterators_to_combine:
+            try:
+                current_values.append(iterator.next())
+            except StopIteration:
+                current_values.append(None)
+        def getter(item):
+            return item[1][1]
+        last = None
+        while True:
+            candidates = [item for item
+                in enumerate(current_values) if item[1] is not None]
+            if not len(candidates):
+                return
+            selected = min(candidates, key=getter)
+            if last == selected[1][1]:
+                raise bzrerrors.BadIndexDuplicateKey(last, self)
+            last = selected[1][1]
+            # Yield, with self as the index
+            yield (self,) + selected[1][1:]
+            pos = selected[0]
+            try:
+                current_values[pos] = iterators_to_combine[pos].next()
+            except StopIteration:
+                current_values[pos] = None
+
     def _write_nodes(self, node_iterator):
         """Write node_iterator out as a B+Tree.
 
@@ -336,7 +417,7 @@
             add_key(string_key, line)
         for row in reversed(rows):
             row.finish_node()
-        result = tempfile.TemporaryFile()
+        result = tempfile.NamedTemporaryFile()
         lines = [_BTSIGNATURE]
         lines.append(_OPTION_NODE_REFS + str(self.reference_lists) + '\n')
         lines.append(_OPTION_KEY_ELEMENTS + str(self._key_length) + '\n')
@@ -385,8 +466,9 @@
                         num_bloom_pages * _PAGE_SIZE, len(bloom_str))
                 result.write(bloom_str)
         result.flush()
+        size = result.tell()
         result.seek(0)
-        return result
+        return result, size
 
     def finish(self):
         """Finalise the index.
@@ -394,8 +476,7 @@
         :return: A file handle for a temporary file containing the nodes added
             to the index.
         """
-        nodes = sorted(self._iter_mem_nodes())
-        return self._write_nodes(nodes)
+        return self._write_nodes(self.iter_all_entries())[0]
 
     def iter_all_entries(self):
         """Iterate over all keys within the index
@@ -407,7 +488,13 @@
         if 'evil' in debug.debug_flags:
             trace.mutter_callsite(3,
                 "iter_all_entries scales with size of history.")
-        return self._iter_mem_nodes()
+        # Doing serial rather than ordered would be faster; but this shouldn't
+        # be getting called routinely anyway.
+        iterators = [iter(sorted(self._iter_mem_nodes()))]
+        for backing in self._backing_indices:
+            if backing is not None:
+                iterators.append(backing.iter_all_entries())
+        return self._iter_smallest(iterators)
 
     def iter_entries(self, keys):
         """Iterate over keys within the index.
@@ -428,6 +515,15 @@
                 node = self._nodes[key]
                 if not node[0]:
                     yield self, key, node[2]
+        keys.difference_update(self._keys)
+        for backing in self._backing_indices:
+            if backing is None:
+                continue
+            if not keys:
+                return
+            for node in backing.iter_entries(keys):
+                keys.remove(node[1])
+                yield (self,) + node[1:]
 
     def iter_entries_prefix(self, keys):
         """Iterate over keys within the index using prefix matching.
@@ -451,6 +547,11 @@
         keys = set(keys)
         if not keys:
             return
+        for backing in self._backing_indices:
+            if backing is None:
+                continue
+            for node in backing.iter_entries_prefix(keys):
+                yield (self,) + node[1:]
         if self._key_length == 1:
             for key in keys:
                 # sanity check
@@ -458,7 +559,10 @@
                     raise errors.BadIndexKey(key)
                 if len(key) != self._key_length:
                     raise errors.BadIndexKey(key)
-                node = self._nodes[key]
+                try:
+                    node = self._nodes[key]
+                except KeyError:
+                    continue
                 if node[0]:
                     continue 
                 if self.reference_lists:
@@ -504,7 +608,8 @@
         
         For InMemoryGraphIndex the estimate is exact.
         """
-        return len(self._keys)
+        return len(self._keys) + sum(backing.key_count() for backing in
+            self._backing_indices if backing is not None)
 
     def validate(self):
         """In memory index's have no known corruption at the moment."""
@@ -703,15 +808,15 @@
         start_of_leaves = self._row_offsets[-2]
         end_of_leaves = self._row_offsets[-1]
         needed_nodes = range(start_of_leaves, end_of_leaves)
-        # TODO: we *might* want to check in the cache for nodes which we
-        #       already have parsed
+        # We iterate strictly in-order so that we can use this function
+        # for spilling index builds to disk.
         if self.node_ref_lists:
             for _, node in self._read_nodes(needed_nodes):
-                for key, (value, refs) in node.keys.items():
+                for key, (value, refs) in sorted(node.keys.items()):
                     yield (self, key, value, refs)
         else:
             for _, node in self._read_nodes(needed_nodes):
-                for key, (value, refs) in node.keys.items():
+                for key, (value, refs) in sorted(node.keys.items()):
                     yield (self, key, value)
 
     @staticmethod
@@ -1019,11 +1124,14 @@
                     raise bzrerrors.BadIndexKey(key)
                 if len(key) != self._key_length:
                     raise bzrerrors.BadIndexKey(key)
-                if self.node_ref_lists:
-                    value, node_refs = nodes[key]
-                    yield self, key, value, node_refs
-                else:
-                    yield self, key, nodes[key]
+                try:
+                    if self.node_ref_lists:
+                        value, node_refs = nodes[key]
+                        yield self, key, value, node_refs
+                    else:
+                        yield self, key, nodes[key]
+                except KeyError:
+                    pass
             return
         for key in keys:
             # sanity check

=== modified file 'tests/test_btree_index.py'
--- a/tests/test_btree_index.py	2008-07-13 07:28:15 +0000
+++ b/tests/test_btree_index.py	2008-07-13 17:39:05 +0000
@@ -108,7 +108,10 @@
 
     def test_empty_1_0(self):
         builder = btree_index.BTreeBuilder(key_elements=1, reference_lists=0)
-        content = builder.finish().read()
+        # NamedTemporaryFile dies on builder.finish().read(). weird.
+        temp_file = builder.finish()
+        content = temp_file.read()
+        del temp_file
         self.assertEqual(
             "B+Tree Graph Index 1\nnode_ref_lists=0\nkey_elements=1\nlen=0\n"
             "row_lengths=\nbloom_pages=0\n",
@@ -116,7 +119,10 @@
 
     def test_empty_2_1(self):
         builder = btree_index.BTreeBuilder(key_elements=2, reference_lists=1)
-        content = builder.finish().read()
+        # NamedTemporaryFile dies on builder.finish().read(). weird.
+        temp_file = builder.finish()
+        content = temp_file.read()
+        del temp_file
         self.assertEqual(
             "B+Tree Graph Index 1\nnode_ref_lists=1\nkey_elements=2\nlen=0\n"
             "row_lengths=\nbloom_pages=0\n",
@@ -127,7 +133,10 @@
         nodes = self.make_nodes(5, 1, 0)
         for node in nodes:
             builder.add_node(*node)
-        content = builder.finish().read()
+        # NamedTemporaryFile dies on builder.finish().read(). weird.
+        temp_file = builder.finish()
+        content = temp_file.read()
+        del temp_file
         self.assertEqual(4096, len(content))
         self.assertEqual(
             "B+Tree Graph Index 1\nnode_ref_lists=0\nkey_elements=1\nlen=5\n"
@@ -148,7 +157,10 @@
         nodes = self.make_nodes(5, 2, 2)
         for node in nodes:
             builder.add_node(*node)
-        content = builder.finish().read()
+        # NamedTemporaryFile dies on builder.finish().read(). weird.
+        temp_file = builder.finish()
+        content = temp_file.read()
+        del temp_file
         self.assertEqual(4096, len(content))
         self.assertEqual(
             "B+Tree Graph Index 1\nnode_ref_lists=2\nkey_elements=2\nlen=10\n"
@@ -177,7 +189,10 @@
         nodes = self.make_nodes(800, 1, 0)
         for node in nodes:
             builder.add_node(*node)
-        content = builder.finish().read()
+        # NamedTemporaryFile dies on builder.finish().read(). weird.
+        temp_file = builder.finish()
+        content = temp_file.read()
+        del temp_file
         self.assertEqual(4096*4, len(content))
         self.assertEqual(
             "B+Tree Graph Index 1\nnode_ref_lists=0\nkey_elements=1\nlen=800\n"
@@ -275,7 +290,10 @@
         nodes = self.make_nodes(200, 2, 2)
         for node in nodes:
             builder.add_node(*node)
-        content = builder.finish().read()
+        # NamedTemporaryFile dies on builder.finish().read(). weird.
+        temp_file = builder.finish()
+        content = temp_file.read()
+        del temp_file
         self.assertEqual(4096*4, len(content))
         self.assertEqual(
             "B+Tree Graph Index 1\nnode_ref_lists=2\nkey_elements=2\nlen=400\n"
@@ -308,6 +326,192 @@
         self.assertEqual(global_bloom._array.tostring(), global_bloom_bytes)
         # We assume the other leaf nodes have been written correctly - layering FTW.
 
+    def test_spill_index_stress_1_1(self):
+        builder = btree_index.BTreeBuilder(key_elements=1, spill_at=2)
+        nodes = [node[0:2] for node in self.make_nodes(16, 1, 0)]
+        builder.add_node(*nodes[0])
+        # Test the parts of the index that take up memory are doing so
+        # predictably.
+        self.assertEqual(1, len(builder._nodes))
+        self.assertEqual(1, len(builder._keys))
+        self.assertEqual({}, builder._nodes_by_key)
+        builder.add_node(*nodes[1])
+        self.assertEqual(0, len(builder._nodes))
+        self.assertEqual(0, len(builder._keys))
+        self.assertEqual({}, builder._nodes_by_key)
+        self.assertEqual(1, len(builder._backing_indices))
+        self.assertEqual(2, builder._backing_indices[0].key_count())
+        # now back to memory
+        builder.add_node(*nodes[2])
+        self.assertEqual(1, len(builder._nodes))
+        self.assertEqual(1, len(builder._keys))
+        self.assertEqual({}, builder._nodes_by_key)
+        # And spills to a second backing index combing all
+        builder.add_node(*nodes[3])
+        self.assertEqual(0, len(builder._nodes))
+        self.assertEqual(0, len(builder._keys))
+        self.assertEqual({}, builder._nodes_by_key)
+        self.assertEqual(2, len(builder._backing_indices))
+        self.assertEqual(None, builder._backing_indices[0])
+        self.assertEqual(4, builder._backing_indices[1].key_count())
+        # The next spills to the 2-len slot
+        builder.add_node(*nodes[4])
+        builder.add_node(*nodes[5])
+        self.assertEqual(0, len(builder._nodes))
+        self.assertEqual(0, len(builder._keys))
+        self.assertEqual({}, builder._nodes_by_key)
+        self.assertEqual(2, len(builder._backing_indices))
+        self.assertEqual(2, builder._backing_indices[0].key_count())
+        self.assertEqual(4, builder._backing_indices[1].key_count())
+        # Next spill combines
+        builder.add_node(*nodes[6])
+        builder.add_node(*nodes[7])
+        self.assertEqual(3, len(builder._backing_indices))
+        self.assertEqual(None, builder._backing_indices[0])
+        self.assertEqual(None, builder._backing_indices[1])
+        self.assertEqual(8, builder._backing_indices[2].key_count())
+        # And so forth - counting up in binary.
+        builder.add_node(*nodes[8])
+        builder.add_node(*nodes[9])
+        self.assertEqual(3, len(builder._backing_indices))
+        self.assertEqual(2, builder._backing_indices[0].key_count())
+        self.assertEqual(None, builder._backing_indices[1])
+        self.assertEqual(8, builder._backing_indices[2].key_count())
+        builder.add_node(*nodes[10])
+        builder.add_node(*nodes[11])
+        self.assertEqual(3, len(builder._backing_indices))
+        self.assertEqual(None, builder._backing_indices[0])
+        self.assertEqual(4, builder._backing_indices[1].key_count())
+        self.assertEqual(8, builder._backing_indices[2].key_count())
+        builder.add_node(*nodes[12])
+        # Test that memory and disk are both used for query methods; and that
+        # None is skipped over happily.
+        self.assertEqual([(builder,) + node for node in sorted(nodes[:13])],
+            list(builder.iter_all_entries()))
+        # Two nodes - one memory one disk
+        self.assertEqual(set([(builder,) + node for node in nodes[11:13]]),
+            set(builder.iter_entries([nodes[12][0], nodes[11][0]])))
+        self.assertEqual(13, builder.key_count())
+        self.assertEqual(set([(builder,) + node for node in nodes[11:13]]),
+            set(builder.iter_entries_prefix([nodes[12][0], nodes[11][0]])))
+        builder.add_node(*nodes[13])
+        self.assertEqual(3, len(builder._backing_indices))
+        self.assertEqual(2, builder._backing_indices[0].key_count())
+        self.assertEqual(4, builder._backing_indices[1].key_count())
+        self.assertEqual(8, builder._backing_indices[2].key_count())
+        builder.add_node(*nodes[14])
+        builder.add_node(*nodes[15])
+        self.assertEqual(4, len(builder._backing_indices))
+        self.assertEqual(None, builder._backing_indices[0])
+        self.assertEqual(None, builder._backing_indices[1])
+        self.assertEqual(None, builder._backing_indices[2])
+        self.assertEqual(16, builder._backing_indices[3].key_count())
+        # Now finish, and check we got a correctly ordered tree
+        transport = self.get_transport('')
+        size = transport.put_file('index', builder.finish())
+        index = btree_index.BTreeGraphIndex(transport, 'index', size)
+        nodes = list(index.iter_all_entries())
+        self.assertEqual(sorted(nodes), nodes)
+        self.assertEqual(16, len(nodes))
+
+    def test_spill_index_stress_2_2(self):
+        # test that references and longer keys don't confuse things.
+        builder = btree_index.BTreeBuilder(key_elements=2, reference_lists=2,
+            spill_at=2)
+        nodes = self.make_nodes(16, 2, 2)
+        builder.add_node(*nodes[0])
+        # Test the parts of the index that take up memory are doing so
+        # predictably.
+        self.assertEqual(1, len(builder._keys))
+        self.assertEqual(2, len(builder._nodes))
+        self.assertNotEqual({}, builder._nodes_by_key)
+        builder.add_node(*nodes[1])
+        self.assertEqual(0, len(builder._keys))
+        self.assertEqual(0, len(builder._nodes))
+        self.assertEqual({}, builder._nodes_by_key)
+        self.assertEqual(1, len(builder._backing_indices))
+        self.assertEqual(2, builder._backing_indices[0].key_count())
+        # now back to memory
+        builder.add_node(*nodes[2])
+        self.assertEqual(2, len(builder._nodes))
+        self.assertEqual(1, len(builder._keys))
+        self.assertNotEqual({}, builder._nodes_by_key)
+        # And spills to a second backing index combing all
+        builder.add_node(*nodes[3])
+        self.assertEqual(0, len(builder._nodes))
+        self.assertEqual(0, len(builder._keys))
+        self.assertEqual({}, builder._nodes_by_key)
+        self.assertEqual(2, len(builder._backing_indices))
+        self.assertEqual(None, builder._backing_indices[0])
+        self.assertEqual(4, builder._backing_indices[1].key_count())
+        # The next spills to the 2-len slot
+        builder.add_node(*nodes[4])
+        builder.add_node(*nodes[5])
+        self.assertEqual(0, len(builder._nodes))
+        self.assertEqual(0, len(builder._keys))
+        self.assertEqual({}, builder._nodes_by_key)
+        self.assertEqual(2, len(builder._backing_indices))
+        self.assertEqual(2, builder._backing_indices[0].key_count())
+        self.assertEqual(4, builder._backing_indices[1].key_count())
+        # Next spill combines
+        builder.add_node(*nodes[6])
+        builder.add_node(*nodes[7])
+        self.assertEqual(3, len(builder._backing_indices))
+        self.assertEqual(None, builder._backing_indices[0])
+        self.assertEqual(None, builder._backing_indices[1])
+        self.assertEqual(8, builder._backing_indices[2].key_count())
+        # And so forth - counting up in binary.
+        builder.add_node(*nodes[8])
+        builder.add_node(*nodes[9])
+        self.assertEqual(3, len(builder._backing_indices))
+        self.assertEqual(2, builder._backing_indices[0].key_count())
+        self.assertEqual(None, builder._backing_indices[1])
+        self.assertEqual(8, builder._backing_indices[2].key_count())
+        builder.add_node(*nodes[10])
+        builder.add_node(*nodes[11])
+        self.assertEqual(3, len(builder._backing_indices))
+        self.assertEqual(None, builder._backing_indices[0])
+        self.assertEqual(4, builder._backing_indices[1].key_count())
+        self.assertEqual(8, builder._backing_indices[2].key_count())
+        builder.add_node(*nodes[12])
+        # Test that memory and disk are both used for query methods; and that
+        # None is skipped over happily.
+        self.assertEqual([(builder,) + node for node in sorted(nodes[:13])],
+            list(builder.iter_all_entries()))
+        # Two nodes - one memory one disk
+        self.assertEqual(set([(builder,) + node for node in nodes[11:13]]),
+            set(builder.iter_entries([nodes[12][0], nodes[11][0]])))
+        self.assertEqual(13, builder.key_count())
+        self.assertEqual(set([(builder,) + node for node in nodes[11:13]]),
+            set(builder.iter_entries_prefix([nodes[12][0], nodes[11][0]])))
+        builder.add_node(*nodes[13])
+        self.assertEqual(3, len(builder._backing_indices))
+        self.assertEqual(2, builder._backing_indices[0].key_count())
+        self.assertEqual(4, builder._backing_indices[1].key_count())
+        self.assertEqual(8, builder._backing_indices[2].key_count())
+        builder.add_node(*nodes[14])
+        builder.add_node(*nodes[15])
+        self.assertEqual(4, len(builder._backing_indices))
+        self.assertEqual(None, builder._backing_indices[0])
+        self.assertEqual(None, builder._backing_indices[1])
+        self.assertEqual(None, builder._backing_indices[2])
+        self.assertEqual(16, builder._backing_indices[3].key_count())
+        # Now finish, and check we got a correctly ordered tree
+        transport = self.get_transport('')
+        size = transport.put_file('index', builder.finish())
+        index = btree_index.BTreeGraphIndex(transport, 'index', size)
+        nodes = list(index.iter_all_entries())
+        self.assertEqual(sorted(nodes), nodes)
+        self.assertEqual(16, len(nodes))
+
+    def test_spill_index_duplicate_key_caught_on_finish(self):
+        builder = btree_index.BTreeBuilder(key_elements=1, spill_at=2)
+        nodes = [node[0:2] for node in self.make_nodes(16, 1, 0)]
+        builder.add_node(*nodes[0])
+        builder.add_node(*nodes[1])
+        builder.add_node(*nodes[0])
+        self.assertRaises(bzrerrors.BadIndexDuplicateKey, builder.finish)
+
 
 class TestBTreeIndex(BTreeTestCase):
 




More information about the bazaar-commits mailing list