Rev 9: Bring in some limitations on the repacking, which shows decent performance wins. in http://bzr.arbash-meinel.com/plugins/index2
John Arbash Meinel
john at arbash-meinel.com
Wed Jul 2 06:30:20 BST 2008
At http://bzr.arbash-meinel.com/plugins/index2
------------------------------------------------------------
revno: 9
revision-id: john at arbash-meinel.com-20080702052941-xfx5k9vhlph5seut
parent: robertc at robertcollins.net-20080701224410-2lbqoqc2dc5v3iey
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: index2
timestamp: Wed 2008-07-02 00:29:41 -0500
message:
Bring in some limitations on the repacking, which shows decent performance wins.
-------------- next part --------------
=== modified file 'chunk_writer.py'
--- a/chunk_writer.py 2008-07-01 18:42:28 +0000
+++ b/chunk_writer.py 2008-07-02 05:29:41 +0000
@@ -33,6 +33,8 @@
- if the total seen bytes so far exceeds the chunk size, flush.
"""
+ _max_repack = 1
+
def __init__(self, chunk_size):
"""Create a ChunkWriter to write chunk_size chunks."""
self.chunk_size = chunk_size
@@ -40,6 +42,8 @@
self.bytes_in = []
self.bytes_list = []
self.compressed = None
+ self.seen_bytes = 0
+ self.num_repack = 0
self.unused_bytes = None
def finish(self):
@@ -51,6 +55,10 @@
self.bytes_in = None # Free the data cached so far, we don't need it
self.bytes_list.append(self.compressor.flush(Z_FINISH))
total_len = sum(len(b) for b in self.bytes_list)
+ if total_len > self.chunk_size:
+ raise AssertionError('Somehow we ended up with too much'
+ ' compressed data, %d > %d'
+ % (total_len, self.chunk_size))
nulls_needed = self.chunk_size - total_len % self.chunk_size
if nulls_needed:
self.bytes_list.append("\x00" * nulls_needed)
@@ -78,37 +86,51 @@
If the bytes fit, False is returned. Otherwise True is returned
and the bytes have not been added to the chunk.
"""
- # Add these bytes using Z_SYNC_FLUSH, if it puts us over budget, we
- # will try packing everything tighter, if that still fails, then we
- # will reject this request.
- out = self.compressor.compress(bytes)
- if out:
- self.bytes_list.append(out)
- out = self.compressor.flush(Z_SYNC_FLUSH)
- if out:
- self.bytes_list.append(out)
- total_len = sum(len(b) for b in self.bytes_list)
- # Give us some extra room for a final Z_FINISH call.
- if total_len + 10 > self.chunk_size:
- # We are over budget, try to squeeze this in without any
- # Z_SYNC_FLUSH calls
- bytes_out, compressor = self._recompress_all_bytes_in(bytes)
- this_len = sum(len(b) for b in bytes_out)
- if this_len + 10 > self.chunk_size:
- # No way we can add anymore, we need to re-pack because our
- # compressor is now out of sync
- bytes_out, compressor = self._recompress_all_bytes_in()
- self.compressor = compressor
- self.bytes_list = bytes_out
- self.unused_bytes = bytes
+ # Check quickly to see if this is likely to put us outside of our
+ # budget:
+ next_seen_size = self.seen_bytes + len(bytes)
+ if (next_seen_size < 2 * self.chunk_size):
+ # No need, we assume this will "just fit"
+ out = self.compressor.compress(bytes)
+ self.bytes_in.append(bytes)
+ self.seen_bytes = next_seen_size
+ if out:
+ self.bytes_list.append(out)
+ else:
+ if self.num_repack > self._max_repack:
+ # We have packed too many times already.
return True
+ # This may or may not fit, try to add it with Z_SYNC_FLUSH
+ out = self.compressor.compress(bytes)
+ if out:
+ self.bytes_list.append(out)
+ out = self.compressor.flush(Z_SYNC_FLUSH)
+ if out:
+ self.bytes_list.append(out)
+ total_len = sum(len(b) for b in self.bytes_list)
+ # Give us some extra room for a final Z_FINISH call.
+ if total_len + 10 > self.chunk_size:
+ # We are over budget, try to squeeze this in without any
+ # Z_SYNC_FLUSH calls
+ self.num_repack += 1
+ bytes_out, compressor = self._recompress_all_bytes_in(bytes)
+ this_len = sum(len(b) for b in bytes_out)
+ if this_len + 10 > self.chunk_size:
+ # No way we can add anymore, we need to re-pack because our
+ # compressor is now out of sync
+ bytes_out, compressor = self._recompress_all_bytes_in()
+ self.compressor = compressor
+ self.bytes_list = bytes_out
+ self.unused_bytes = bytes
+ return True
+ else:
+ # This fits when we pack it tighter, so use the new packing
+ self.compressor = compressor
+ self.bytes_in.append(bytes)
+ self.bytes_list = bytes_out
else:
- # This fits when we pack it tighter, so use the new packing
- self.compressor = compressor
+ # It fit, so mark it added
self.bytes_in.append(bytes)
- self.bytes_list = bytes_out
- else:
- # It fit, so mark it added
- self.bytes_in.append(bytes)
+ self.seen_bytes = next_seen_size
return False
=== modified file 'tests/test_btree_index.py'
--- a/tests/test_btree_index.py 2008-07-01 22:44:10 +0000
+++ b/tests/test_btree_index.py 2008-07-02 05:29:41 +0000
@@ -134,35 +134,35 @@
def test_2_leaves_1_0(self):
builder = btree_index.BTreeBuilder(key_elements=1, reference_lists=0)
- nodes = self.make_nodes(1200, 1, 0)
+ nodes = self.make_nodes(800, 1, 0)
for node in nodes:
builder.add_node(*node)
content = builder.finish().read()
self.assertEqual(4096*3, len(content))
self.assertEqual(
- "B+Tree Graph Index 1\nnode_ref_lists=0\nkey_elements=1\nlen=1200\n"
+ "B+Tree Graph Index 1\nnode_ref_lists=0\nkey_elements=1\nlen=800\n"
"row_lengths=1,2\n",
- content[:78])
- root = content[78:4096]
+ content[:77])
+ root = content[77:4096]
leaf1 = content[4096:8192]
leaf2 = content[8192:]
root_bytes = zlib.decompress(root)
expected_root = (
"type=internal\n"
"offset=0\n"
- "485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485\n"
+ "505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505\n"
)
self.assertEqual(expected_root, root_bytes)
# We already know serialisation works for leaves, check key selection:
leaf1_bytes = zlib.decompress(leaf1)
sorted_node_keys = sorted(node[0] for node in nodes)
node = btree_index._LeafNode(leaf1_bytes, 1, 0)
- self.assertEqual(629, len(node.keys))
- self.assertEqual(sorted_node_keys[:629], sorted(node.keys))
+ self.assertEqual(451, len(node.keys))
+ self.assertEqual(sorted_node_keys[:451], sorted(node.keys))
leaf2_bytes = zlib.decompress(leaf2)
node = btree_index._LeafNode(leaf2_bytes, 1, 0)
- self.assertEqual(1200 - 629, len(node.keys))
- self.assertEqual(sorted_node_keys[629:], sorted(node.keys))
+ self.assertEqual(800 - 451, len(node.keys))
+ self.assertEqual(sorted_node_keys[451:], sorted(node.keys))
def test_second_internal_node_pointer(self):
# The left most pointer in the second internal node in a row should
@@ -196,13 +196,13 @@
def test_2_leaves_2_2(self):
builder = btree_index.BTreeBuilder(key_elements=2, reference_lists=2)
- nodes = self.make_nodes(250, 2, 2)
+ nodes = self.make_nodes(200, 2, 2)
for node in nodes:
builder.add_node(*node)
content = builder.finish().read()
self.assertEqual(4096*3, len(content))
self.assertEqual(
- "B+Tree Graph Index 1\nnode_ref_lists=2\nkey_elements=2\nlen=500\n"
+ "B+Tree Graph Index 1\nnode_ref_lists=2\nkey_elements=2\nlen=400\n"
"row_lengths=1,2\n",
content[:77])
root = content[77:4096]
@@ -213,8 +213,8 @@
"type=internal\n"
"offset=0\n"
"1111111111111111111111111111111111111111\x00"
- "136136136136136136136136136136136136136136136136136136136136136"
- "136136136136136136136136136136136136136136136136136136136\n"
+ "127127127127127127127127127127127127127127127127127127127"
+ "127127127127127127127127127127127127127127127127127127127127127\n"
)
self.assertEqual(expected_root, root_bytes)
# We assume the other leaf nodes have been written correctly - layering FTW.
@@ -392,9 +392,9 @@
# Should have done linear scan IO up the index, ignoring
# the internal nodes:
# The entire index should have been read
- self.assertEqual(3223552, size)
+ self.assertEqual(4055040, size)
readv_request = []
- for offset in range(12288, 3223552, 4096):
+ for offset in range(16384, 4055040, 4096):
readv_request.append((offset, 4096))
self.assertEqual([('readv', 'index', [(0, 4096)], False, None),
('readv', 'index', readv_request, False, None)],
More information about the bazaar-commits
mailing list