Rev 11: Create bloom hashes for internal nodes. in

Robert Collins robertc at
Wed Jul 2 07:08:16 BST 2008


revno: 11
revision-id: robertc at
parent: robertc at
committer: Robert Collins <robertc at>
branch nick: trunk
timestamp: Wed 2008-07-02 16:08:10 +1000
  Create bloom hashes for internal nodes.
=== modified file ''
--- a/	2008-07-02 04:27:29 +0000
+++ b/	2008-07-02 06:08:10 +0000
@@ -19,6 +19,7 @@
 import array
 from bisect import bisect_right
+from copy import deepcopy
 import tempfile
 import zlib
@@ -67,6 +68,36 @@
         self.spool = tempfile.TemporaryFile()
         self.writer = None
+    def finish_node(self):
+        byte_lines, _ = self.writer.finish()
+        if self.nodes == 0:
+            # padded note:
+            self.spool.write("\x00" * 100)
+        self.spool.writelines(byte_lines)
+        if self.spool.tell() % _PAGE_SIZE != 0:
+            raise AssertionError("incorrect node length")
+        self.nodes += 1
+        self.writer = None
+class _InternalBuilderRow(_BuilderRow):
+    """The stored state accumulated while writing out internal rows."""
+    def __init__(self, current_global_bloom):
+        """Create a _BuilderRow."""
+        _BuilderRow.__init__(self)
+        self.bloom = deepcopy(current_global_bloom)
+    def finish_node(self):
+        bloom_bytes = ":bloom:\n" + self.bloom._array.tostring()
+        if self.writer.write_reserved(bloom_bytes):
+            raise AssertionError("Not enough space for bloom filter.")
+        _BuilderRow.finish_node(self)
+class _LeafBuilderRow(_BuilderRow):
+    """The stored state accumulated while writing out a leaf rows."""
 class BTreeBuilder(index.GraphIndexBuilder):
     """A Builder for B+Tree based Graph indices.
@@ -115,19 +146,8 @@
             # (self.rows[-1]). When we finish a chunk in a row,
             # propogate the key that didn't fit (comes after the chunk) to the
             # row above, transitively.
-            self.rows.append(_BuilderRow())
-            def finish_node(row):
-                byte_lines, next_node_line = row.writer.finish()
-                if row.nodes == 0:
-                    # padded note:
-                    row.spool.write("\x00" * 100)
-                row.spool.writelines(byte_lines)
-                if row.spool.tell() % _PAGE_SIZE != 0:
-                    import pdb; pdb.set_trace()
-                    raise AssertionError("incorrect node length")
-                row.nodes += 1
-                row.writer = None
+            global_bloom = BloomSHA1(256 * 8)
+            self.rows.append(_LeafBuilderRow())
             def add_key(string_key, key_line):
                 """Add a key to the current chunk.
@@ -143,8 +163,9 @@
                             length = _PAGE_SIZE
                             if internal_row.nodes == 0:
                                 length -= 100 # padded
+                            # reserve 256 for the bloom + 10 for ':bloom:\n'
                             internal_row.writer = chunk_writer.ChunkWriter(
-                                length)
+                                length, 266)
                             internal_row.writer.write(_INTERNAL_OFFSET +
                                 str(self.rows[pos + 1].nodes) + "\n")
@@ -156,7 +177,7 @@
                 if self.rows[-1].writer.write(line):
                     # this key did not fit in the node:
-                    finish_node(self.rows[-1])
+                    self.rows[-1].finish_node()
                     key_line = string_key + "\n"
                     new_row = True
                     for row in reversed(self.rows[:-1]):
@@ -164,7 +185,7 @@
                         # doesn't fit then propogate upwards until we find one that
                         # it does fit into.
                         if row.writer.write(key_line):
-                            finish_node(row)
+                            row.finish_node()
                             # We've found a node that can handle the pointer.
                             new_row = False
@@ -173,15 +194,21 @@
                     # division point, then we need a new root:
                     if new_row:
                         # We need a new row
-                        new_row = _BuilderRow()
+                        new_row = _InternalBuilderRow(global_bloom)
                         self.rows.insert(0, new_row)
                         # This will be padded, hence the -100
-                        new_row.writer = chunk_writer.ChunkWriter(_PAGE_SIZE - 100)
+                        # reserve 256 for the bloom + 10 for ':bloom:\n'
+                        new_row.writer = chunk_writer.ChunkWriter(
+                            _PAGE_SIZE - 100, 266)
                         new_row.writer.write(_INTERNAL_OFFSET +
                             str(self.rows[1].nodes - 1) + "\n")
                     add_key(string_key, key_line)
+                else:
+                    for row in self.rows[:-1]:
+                        row.bloom.insert(string_key)
+                    global_bloom.insert(string_key)
             for key, (absent, references, value) in nodes:
                 if absent:
@@ -198,7 +225,7 @@
                     '\t'.join(flattened_references), value))
                 add_key(string_key, line)
             for row in reversed(self.rows):
-                finish_node(row)
+                row.finish_node()
         result = tempfile.TemporaryFile()
         lines = [_BTSIGNATURE]
         lines.append(_OPTION_NODE_REFS + str(self.reference_lists) + '\n')

=== modified file ''
--- a/	2008-07-01 18:42:28 +0000
+++ b/	2008-07-02 06:08:10 +0000
@@ -33,14 +33,21 @@
      - if the total seen bytes so far exceeds the chunk size, flush.
-    def __init__(self, chunk_size):
-        """Create a ChunkWriter to write chunk_size chunks."""
+    def __init__(self, chunk_size, reserved=0):
+        """Create a ChunkWriter to write chunk_size chunks.
+        :param chunk_size: The total byte count to emit at the end of the
+            chunk.
+        :param reserved: How many bytes to allow for reserved data. reserved
+            data space can only be written to via the write_reserved method.
+        """
         self.chunk_size = chunk_size
         self.compressor = zlib.compressobj()
         self.bytes_in = []
         self.bytes_list = []
         self.compressed = None
         self.unused_bytes = None
+        self.reserved_size = reserved
     def finish(self):
         """Finish the chunk.
@@ -78,6 +85,17 @@
         If the bytes fit, False is returned. Otherwise True is returned
         and the bytes have not been added to the chunk.
+        return self._write(bytes, False)
+    def write_reserved(self, bytes):
+        """Write some bytes to the chunk bypassing the reserved check.
+        If the bytes fit, False is returned. Otherwise True is returned
+        and the bytes have not been added to the chunk.
+        """
+        return self._write(bytes, True)
+    def _write(self, bytes, reserved):
         # Add these bytes using Z_SYNC_FLUSH, if it puts us over budget, we
         # will try packing everything tighter, if that still fails, then we
         # will reject this request.
@@ -89,12 +107,16 @@
         total_len = sum(len(b) for b in self.bytes_list)
         # Give us some extra room for a final Z_FINISH call.
-        if total_len + 10 > self.chunk_size:
+        if reserved:
+            capacity = self.chunk_size
+        else:
+            capacity = self.chunk_size - self.reserved_size
+        if total_len + 10 > capacity:
             # We are over budget, try to squeeze this in without any
             # Z_SYNC_FLUSH calls
             bytes_out, compressor = self._recompress_all_bytes_in(bytes)
             this_len = sum(len(b) for b in bytes_out)
-            if this_len + 10 > self.chunk_size:
+            if this_len + 10 > capacity:
                 # No way we can add anymore, we need to re-pack because our
                 # compressor is now out of sync
                 bytes_out, compressor = self._recompress_all_bytes_in()

=== modified file 'tests/'
--- a/tests/	2008-07-02 04:33:45 +0000
+++ b/tests/	2008-07-02 06:08:10 +0000
@@ -148,10 +148,19 @@
         leaf1 = content[4096:8192]
         leaf2 = content[8192:]
         root_bytes = zlib.decompress(root)
+        # Create a little bloom by hand
+        bloom = BloomSHA1(256 * 8)
+        # set a a bit to test
+        for node in nodes:
+            bloom.insert('\x00'.join(node[0]))
+        # get bytes
+        bloom_bytes = bloom._array.tostring()
         expected_root = (
+            ":bloom:\n"
+            + bloom_bytes
         self.assertEqual(expected_root, root_bytes)
         # We already know serialisation works for leaves, check key selection:
@@ -165,7 +174,7 @@
         self.assertEqual(1200 - 629, len(node.keys))
         self.assertEqual(sorted_node_keys[629:], sorted(node.keys))
-    def test_second_internal_node_pointer(self):
+    def test_three_level_tree_details(self):
         # The left most pointer in the second internal node in a row should
         # pointer to the second node that the internal node is for, _not_
         # the first, otherwise the first node overlaps with the last node of
@@ -184,6 +193,8 @@
         self.assertEqual(3, len(index._row_lengths),
             "Not enough rows: %r" % index._row_lengths)
         internal_node1 = index._get_node(1)
+        # Must have a bloom on the first node.
+        self.assertNotEqual(None, internal_node1.bloom)
         internal_node2 = index._get_node(2)
         # The left most node node2 points at should be one after the right most node pointed at by
         # node1.
@@ -194,6 +205,17 @@
         pos = index._row_lengths[0] + index._row_lengths[1] + internal_node2.offset + 1
         leaf = index._get_node(pos)
         self.assertTrue(internal_node2.keys[0] in leaf.keys)
+        # Check the bloom filter for internal_node2: all the keys in the leaf
+        # should appear to be present
+        for key in leaf.keys:
+            self.assertTrue('\x00'.join(key) in internal_node2.bloom)
+        # Check the bloom filter for internal_node1 with its first two nodes in
+        # the same fashion.
+        for offset in [0, 1]:
+            pos = index._row_lengths[0] + index._row_lengths[1] + offset
+            leaf = index._get_node(pos)
+            for key in leaf.keys:
+                self.assertTrue('\x00'.join(key) in internal_node1.bloom)
     def test_2_leaves_2_2(self):
         builder = btree_index.BTreeBuilder(key_elements=2, reference_lists=2)
@@ -210,12 +232,21 @@
         leaf1 = content[4096:8192]
         leaf2 = content[8192:]
         root_bytes = zlib.decompress(root)
+        # Create a little bloom by hand
+        bloom = BloomSHA1(256 * 8)
+        # set a a bit to test
+        for node in nodes:
+            bloom.insert('\x00'.join(node[0]))
+        # get bytes
+        bloom_bytes = bloom._array.tostring()
         expected_root = (
+            ":bloom:\n"
+            + bloom_bytes
         self.assertEqual(expected_root, root_bytes)
         # We assume the other leaf nodes have been written correctly - layering FTW.

=== modified file 'tests/'
--- a/tests/	2008-07-01 18:38:55 +0000
+++ b/tests/	2008-07-02 06:08:10 +0000
@@ -64,3 +64,24 @@
         self.assertEqualDiff(expected_bytes, node_bytes)
         # And the line that failed should have been saved for us
         self.assertEqual(lines[46], unused)
+    def test_too_much_data_preserves_reserve_space(self):
+        # Generate enough data to exceed 4K
+        lines = []
+        for group in range(48):
+            offset = group * 50
+            numbers = range(offset, offset + 50)
+            # Create a line with this group
+            lines.append(''.join(map(str, numbers)) + '\n')
+        writer = chunk_writer.ChunkWriter(4096, 256)
+        for line in lines:
+            if writer.write(line):
+                break
+        self.assertFalse(writer.write_reserved("A"*256))
+        bytes_list, unused = writer.finish()
+        node_bytes = self.check_chunk(bytes_list, 4096)
+        # the first 44 lines should have been added
+        expected_bytes = ''.join(lines[:44]) + "A"*256
+        self.assertEqualDiff(expected_bytes, node_bytes)
+        # And the line that failed should have been saved for us
+        self.assertEqual(lines[44], unused)

