Rev 11: Create bloom hashes for internal nodes. in http://people.ubuntu.com/~robertc/baz2.0/plugins/index2/trunk
Robert Collins
robertc at robertcollins.net
Wed Jul 2 07:08:16 BST 2008
At http://people.ubuntu.com/~robertc/baz2.0/plugins/index2/trunk
------------------------------------------------------------
revno: 11
revision-id: robertc at robertcollins.net-20080702060810-2zn70b8n5dbc1jf8
parent: robertc at robertcollins.net-20080702043345-w3m0o6tubke13swa
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Wed 2008-07-02 16:08:10 +1000
message:
Create bloom hashes for internal nodes.
modified:
btree_index.py index.py-20080624222253-p0x5f92uyh5hw734-7
chunk_writer.py chunk_writer.py-20080630234519-6ggn4id17nipovny-1
tests/test_btree_index.py test_index.py-20080624222253-p0x5f92uyh5hw734-13
tests/test_chunk_writer.py test_chunk_writer.py-20080630234519-6ggn4id17nipovny-2
=== modified file 'btree_index.py'
--- a/btree_index.py 2008-07-02 04:27:29 +0000
+++ b/btree_index.py 2008-07-02 06:08:10 +0000
@@ -19,6 +19,7 @@
import array
from bisect import bisect_right
+from copy import deepcopy
import tempfile
import zlib
@@ -67,6 +68,36 @@
self.spool = tempfile.TemporaryFile()
self.writer = None
+ def finish_node(self):
+ byte_lines, _ = self.writer.finish()
+ if self.nodes == 0:
+ # padded note:
+ self.spool.write("\x00" * 100)
+ self.spool.writelines(byte_lines)
+ if self.spool.tell() % _PAGE_SIZE != 0:
+ raise AssertionError("incorrect node length")
+ self.nodes += 1
+ self.writer = None
+
+
+class _InternalBuilderRow(_BuilderRow):
+ """The stored state accumulated while writing out internal rows."""
+
+ def __init__(self, current_global_bloom):
+ """Create a _BuilderRow."""
+ _BuilderRow.__init__(self)
+ self.bloom = deepcopy(current_global_bloom)
+
+ def finish_node(self):
+ bloom_bytes = ":bloom:\n" + self.bloom._array.tostring()
+ if self.writer.write_reserved(bloom_bytes):
+ raise AssertionError("Not enough space for bloom filter.")
+ _BuilderRow.finish_node(self)
+
+
+class _LeafBuilderRow(_BuilderRow):
+ """The stored state accumulated while writing out a leaf rows."""
+
class BTreeBuilder(index.GraphIndexBuilder):
"""A Builder for B+Tree based Graph indices.
@@ -115,19 +146,8 @@
# (self.rows[-1]). When we finish a chunk in a row,
# propogate the key that didn't fit (comes after the chunk) to the
# row above, transitively.
- self.rows.append(_BuilderRow())
- def finish_node(row):
- byte_lines, next_node_line = row.writer.finish()
- if row.nodes == 0:
- # padded note:
- row.spool.write("\x00" * 100)
- row.spool.writelines(byte_lines)
- if row.spool.tell() % _PAGE_SIZE != 0:
- import pdb; pdb.set_trace()
- raise AssertionError("incorrect node length")
- row.nodes += 1
- row.writer = None
-
+ global_bloom = BloomSHA1(256 * 8)
+ self.rows.append(_LeafBuilderRow())
def add_key(string_key, key_line):
"""Add a key to the current chunk.
@@ -143,8 +163,9 @@
length = _PAGE_SIZE
if internal_row.nodes == 0:
length -= 100 # padded
+ # reserve 256 for the bloom + 10 for ':bloom:\n'
internal_row.writer = chunk_writer.ChunkWriter(
- length)
+ length, 266)
internal_row.writer.write(_INTERNAL_FLAG)
internal_row.writer.write(_INTERNAL_OFFSET +
str(self.rows[pos + 1].nodes) + "\n")
@@ -156,7 +177,7 @@
self.rows[-1].writer.write(_LEAF_FLAG)
if self.rows[-1].writer.write(line):
# this key did not fit in the node:
- finish_node(self.rows[-1])
+ self.rows[-1].finish_node()
key_line = string_key + "\n"
new_row = True
for row in reversed(self.rows[:-1]):
@@ -164,7 +185,7 @@
# doesn't fit then propogate upwards until we find one that
# it does fit into.
if row.writer.write(key_line):
- finish_node(row)
+ row.finish_node()
else:
# We've found a node that can handle the pointer.
new_row = False
@@ -173,15 +194,21 @@
# division point, then we need a new root:
if new_row:
# We need a new row
- new_row = _BuilderRow()
+ new_row = _InternalBuilderRow(global_bloom)
self.rows.insert(0, new_row)
# This will be padded, hence the -100
- new_row.writer = chunk_writer.ChunkWriter(_PAGE_SIZE - 100)
+ # reserve 256 for the bloom + 10 for ':bloom:\n'
+ new_row.writer = chunk_writer.ChunkWriter(
+ _PAGE_SIZE - 100, 266)
new_row.writer.write(_INTERNAL_FLAG)
new_row.writer.write(_INTERNAL_OFFSET +
str(self.rows[1].nodes - 1) + "\n")
new_row.writer.write(key_line)
add_key(string_key, key_line)
+ else:
+ for row in self.rows[:-1]:
+ row.bloom.insert(string_key)
+ global_bloom.insert(string_key)
for key, (absent, references, value) in nodes:
if absent:
@@ -198,7 +225,7 @@
'\t'.join(flattened_references), value))
add_key(string_key, line)
for row in reversed(self.rows):
- finish_node(row)
+ row.finish_node()
result = tempfile.TemporaryFile()
lines = [_BTSIGNATURE]
lines.append(_OPTION_NODE_REFS + str(self.reference_lists) + '\n')
=== modified file 'chunk_writer.py'
--- a/chunk_writer.py 2008-07-01 18:42:28 +0000
+++ b/chunk_writer.py 2008-07-02 06:08:10 +0000
@@ -33,14 +33,21 @@
- if the total seen bytes so far exceeds the chunk size, flush.
"""
- def __init__(self, chunk_size):
- """Create a ChunkWriter to write chunk_size chunks."""
+ def __init__(self, chunk_size, reserved=0):
+ """Create a ChunkWriter to write chunk_size chunks.
+
+ :param chunk_size: The total byte count to emit at the end of the
+ chunk.
+ :param reserved: How many bytes to allow for reserved data. reserved
+ data space can only be written to via the write_reserved method.
+ """
self.chunk_size = chunk_size
self.compressor = zlib.compressobj()
self.bytes_in = []
self.bytes_list = []
self.compressed = None
self.unused_bytes = None
+ self.reserved_size = reserved
def finish(self):
"""Finish the chunk.
@@ -78,6 +85,17 @@
If the bytes fit, False is returned. Otherwise True is returned
and the bytes have not been added to the chunk.
"""
+ return self._write(bytes, False)
+
+ def write_reserved(self, bytes):
+ """Write some bytes to the chunk bypassing the reserved check.
+
+ If the bytes fit, False is returned. Otherwise True is returned
+ and the bytes have not been added to the chunk.
+ """
+ return self._write(bytes, True)
+
+ def _write(self, bytes, reserved):
# Add these bytes using Z_SYNC_FLUSH, if it puts us over budget, we
# will try packing everything tighter, if that still fails, then we
# will reject this request.
@@ -89,12 +107,16 @@
self.bytes_list.append(out)
total_len = sum(len(b) for b in self.bytes_list)
# Give us some extra room for a final Z_FINISH call.
- if total_len + 10 > self.chunk_size:
+ if reserved:
+ capacity = self.chunk_size
+ else:
+ capacity = self.chunk_size - self.reserved_size
+ if total_len + 10 > capacity:
# We are over budget, try to squeeze this in without any
# Z_SYNC_FLUSH calls
bytes_out, compressor = self._recompress_all_bytes_in(bytes)
this_len = sum(len(b) for b in bytes_out)
- if this_len + 10 > self.chunk_size:
+ if this_len + 10 > capacity:
# No way we can add anymore, we need to re-pack because our
# compressor is now out of sync
bytes_out, compressor = self._recompress_all_bytes_in()
=== modified file 'tests/test_btree_index.py'
--- a/tests/test_btree_index.py 2008-07-02 04:33:45 +0000
+++ b/tests/test_btree_index.py 2008-07-02 06:08:10 +0000
@@ -148,10 +148,19 @@
leaf1 = content[4096:8192]
leaf2 = content[8192:]
root_bytes = zlib.decompress(root)
+ # Create a little bloom by hand
+ bloom = BloomSHA1(256 * 8)
+ # set a a bit to test
+ for node in nodes:
+ bloom.insert('\x00'.join(node[0]))
+ # get bytes
+ bloom_bytes = bloom._array.tostring()
expected_root = (
"type=internal\n"
"offset=0\n"
"485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485\n"
+ ":bloom:\n"
+ + bloom_bytes
)
self.assertEqual(expected_root, root_bytes)
# We already know serialisation works for leaves, check key selection:
@@ -165,7 +174,7 @@
self.assertEqual(1200 - 629, len(node.keys))
self.assertEqual(sorted_node_keys[629:], sorted(node.keys))
- def test_second_internal_node_pointer(self):
+ def test_three_level_tree_details(self):
# The left most pointer in the second internal node in a row should
# pointer to the second node that the internal node is for, _not_
# the first, otherwise the first node overlaps with the last node of
@@ -184,6 +193,8 @@
self.assertEqual(3, len(index._row_lengths),
"Not enough rows: %r" % index._row_lengths)
internal_node1 = index._get_node(1)
+ # Must have a bloom on the first node.
+ self.assertNotEqual(None, internal_node1.bloom)
internal_node2 = index._get_node(2)
# The left most node node2 points at should be one after the right most node pointed at by
# node1.
@@ -194,6 +205,17 @@
pos = index._row_lengths[0] + index._row_lengths[1] + internal_node2.offset + 1
leaf = index._get_node(pos)
self.assertTrue(internal_node2.keys[0] in leaf.keys)
+ # Check the bloom filter for internal_node2: all the keys in the leaf
+ # should appear to be present
+ for key in leaf.keys:
+ self.assertTrue('\x00'.join(key) in internal_node2.bloom)
+ # Check the bloom filter for internal_node1 with its first two nodes in
+ # the same fashion.
+ for offset in [0, 1]:
+ pos = index._row_lengths[0] + index._row_lengths[1] + offset
+ leaf = index._get_node(pos)
+ for key in leaf.keys:
+ self.assertTrue('\x00'.join(key) in internal_node1.bloom)
def test_2_leaves_2_2(self):
builder = btree_index.BTreeBuilder(key_elements=2, reference_lists=2)
@@ -210,12 +232,21 @@
leaf1 = content[4096:8192]
leaf2 = content[8192:]
root_bytes = zlib.decompress(root)
+ # Create a little bloom by hand
+ bloom = BloomSHA1(256 * 8)
+ # set a a bit to test
+ for node in nodes:
+ bloom.insert('\x00'.join(node[0]))
+ # get bytes
+ bloom_bytes = bloom._array.tostring()
expected_root = (
"type=internal\n"
"offset=0\n"
"1111111111111111111111111111111111111111\x00"
"136136136136136136136136136136136136136136136136136136136136136"
"136136136136136136136136136136136136136136136136136136136\n"
+ ":bloom:\n"
+ + bloom_bytes
)
self.assertEqual(expected_root, root_bytes)
# We assume the other leaf nodes have been written correctly - layering FTW.
=== modified file 'tests/test_chunk_writer.py'
--- a/tests/test_chunk_writer.py 2008-07-01 18:38:55 +0000
+++ b/tests/test_chunk_writer.py 2008-07-02 06:08:10 +0000
@@ -64,3 +64,24 @@
self.assertEqualDiff(expected_bytes, node_bytes)
# And the line that failed should have been saved for us
self.assertEqual(lines[46], unused)
+
+ def test_too_much_data_preserves_reserve_space(self):
+ # Generate enough data to exceed 4K
+ lines = []
+ for group in range(48):
+ offset = group * 50
+ numbers = range(offset, offset + 50)
+ # Create a line with this group
+ lines.append(''.join(map(str, numbers)) + '\n')
+ writer = chunk_writer.ChunkWriter(4096, 256)
+ for line in lines:
+ if writer.write(line):
+ break
+ self.assertFalse(writer.write_reserved("A"*256))
+ bytes_list, unused = writer.finish()
+ node_bytes = self.check_chunk(bytes_list, 4096)
+ # the first 44 lines should have been added
+ expected_bytes = ''.join(lines[:44]) + "A"*256
+ self.assertEqualDiff(expected_bytes, node_bytes)
+ # And the line that failed should have been saved for us
+ self.assertEqual(lines[44], unused)
More information about the bazaar-commits
mailing list