Rev 9: Bring in some limitations on the repacking, which shows decent performance wins. in http://bzr.arbash-meinel.com/plugins/index2

John Arbash Meinel john at arbash-meinel.com
Wed Jul 2 06:30:20 BST 2008


At http://bzr.arbash-meinel.com/plugins/index2

------------------------------------------------------------
revno: 9
revision-id: john at arbash-meinel.com-20080702052941-xfx5k9vhlph5seut
parent: robertc at robertcollins.net-20080701224410-2lbqoqc2dc5v3iey
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: index2
timestamp: Wed 2008-07-02 00:29:41 -0500
message:
  Bring in some limitations on the repacking, which shows decent performance wins.
-------------- next part --------------
=== modified file 'chunk_writer.py'
--- a/chunk_writer.py	2008-07-01 18:42:28 +0000
+++ b/chunk_writer.py	2008-07-02 05:29:41 +0000
@@ -33,6 +33,8 @@
      - if the total seen bytes so far exceeds the chunk size, flush.
     """
 
+    _max_repack = 1
+
     def __init__(self, chunk_size):
         """Create a ChunkWriter to write chunk_size chunks."""
         self.chunk_size = chunk_size
@@ -40,6 +42,8 @@
         self.bytes_in = []
         self.bytes_list = []
         self.compressed = None
+        self.seen_bytes = 0
+        self.num_repack = 0
         self.unused_bytes = None
 
     def finish(self):
@@ -51,6 +55,10 @@
         self.bytes_in = None # Free the data cached so far, we don't need it
         self.bytes_list.append(self.compressor.flush(Z_FINISH))
         total_len = sum(len(b) for b in self.bytes_list)
+        if total_len > self.chunk_size:
+            raise AssertionError('Somehow we ended up with too much'
+                                 ' compressed data, %d > %d'
+                                 % (total_len, self.chunk_size))
         nulls_needed = self.chunk_size - total_len % self.chunk_size
         if nulls_needed:
             self.bytes_list.append("\x00" * nulls_needed)
@@ -78,37 +86,51 @@
         If the bytes fit, False is returned. Otherwise True is returned
         and the bytes have not been added to the chunk.
         """
-        # Add these bytes using Z_SYNC_FLUSH, if it puts us over budget, we
-        # will try packing everything tighter, if that still fails, then we
-        # will reject this request.
-        out = self.compressor.compress(bytes)
-        if out:
-            self.bytes_list.append(out)
-        out = self.compressor.flush(Z_SYNC_FLUSH)
-        if out:
-            self.bytes_list.append(out)
-        total_len = sum(len(b) for b in self.bytes_list)
-        # Give us some extra room for a final Z_FINISH call.
-        if total_len + 10 > self.chunk_size:
-            # We are over budget, try to squeeze this in without any
-            # Z_SYNC_FLUSH calls
-            bytes_out, compressor = self._recompress_all_bytes_in(bytes)
-            this_len = sum(len(b) for b in bytes_out)
-            if this_len + 10 > self.chunk_size:
-                # No way we can add anymore, we need to re-pack because our
-                # compressor is now out of sync
-                bytes_out, compressor = self._recompress_all_bytes_in()
-                self.compressor = compressor
-                self.bytes_list = bytes_out
-                self.unused_bytes = bytes
+        # Check quickly to see if this is likely to put us outside of our
+        # budget:
+        next_seen_size = self.seen_bytes + len(bytes)
+        if (next_seen_size < 2 * self.chunk_size):
+            # No need, we assume this will "just fit"
+            out = self.compressor.compress(bytes)
+            self.bytes_in.append(bytes)
+            self.seen_bytes = next_seen_size
+            if out:
+                self.bytes_list.append(out)
+        else:
+            if self.num_repack > self._max_repack:
+                # We have packed too many times already.
                 return True
+            # This may or may not fit, try to add it with Z_SYNC_FLUSH
+            out = self.compressor.compress(bytes)
+            if out:
+                self.bytes_list.append(out)
+            out = self.compressor.flush(Z_SYNC_FLUSH)
+            if out:
+                self.bytes_list.append(out)
+            total_len = sum(len(b) for b in self.bytes_list)
+            # Give us some extra room for a final Z_FINISH call.
+            if total_len + 10 > self.chunk_size:
+                # We are over budget, try to squeeze this in without any
+                # Z_SYNC_FLUSH calls
+                self.num_repack += 1
+                bytes_out, compressor = self._recompress_all_bytes_in(bytes)
+                this_len = sum(len(b) for b in bytes_out)
+                if this_len + 10 > self.chunk_size:
+                    # No way we can add anymore, we need to re-pack because our
+                    # compressor is now out of sync
+                    bytes_out, compressor = self._recompress_all_bytes_in()
+                    self.compressor = compressor
+                    self.bytes_list = bytes_out
+                    self.unused_bytes = bytes
+                    return True
+                else:
+                    # This fits when we pack it tighter, so use the new packing
+                    self.compressor = compressor
+                    self.bytes_in.append(bytes)
+                    self.bytes_list = bytes_out
             else:
-                # This fits when we pack it tighter, so use the new packing
-                self.compressor = compressor
+                # It fit, so mark it added
                 self.bytes_in.append(bytes)
-                self.bytes_list = bytes_out
-        else:
-            # It fit, so mark it added
-            self.bytes_in.append(bytes)
+                self.seen_bytes = next_seen_size
         return False
 

=== modified file 'tests/test_btree_index.py'
--- a/tests/test_btree_index.py	2008-07-01 22:44:10 +0000
+++ b/tests/test_btree_index.py	2008-07-02 05:29:41 +0000
@@ -134,35 +134,35 @@
 
     def test_2_leaves_1_0(self):
         builder = btree_index.BTreeBuilder(key_elements=1, reference_lists=0)
-        nodes = self.make_nodes(1200, 1, 0)
+        nodes = self.make_nodes(800, 1, 0)
         for node in nodes:
             builder.add_node(*node)
         content = builder.finish().read()
         self.assertEqual(4096*3, len(content))
         self.assertEqual(
-            "B+Tree Graph Index 1\nnode_ref_lists=0\nkey_elements=1\nlen=1200\n"
+            "B+Tree Graph Index 1\nnode_ref_lists=0\nkey_elements=1\nlen=800\n"
             "row_lengths=1,2\n",
-            content[:78])
-        root = content[78:4096]
+            content[:77])
+        root = content[77:4096]
         leaf1 = content[4096:8192]
         leaf2 = content[8192:]
         root_bytes = zlib.decompress(root)
         expected_root = (
             "type=internal\n"
             "offset=0\n"
-            "485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485485\n"
+            "505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505505\n"
             )
         self.assertEqual(expected_root, root_bytes)
         # We already know serialisation works for leaves, check key selection:
         leaf1_bytes = zlib.decompress(leaf1)
         sorted_node_keys = sorted(node[0] for node in nodes)
         node = btree_index._LeafNode(leaf1_bytes, 1, 0)
-        self.assertEqual(629, len(node.keys))
-        self.assertEqual(sorted_node_keys[:629], sorted(node.keys))
+        self.assertEqual(451, len(node.keys))
+        self.assertEqual(sorted_node_keys[:451], sorted(node.keys))
         leaf2_bytes = zlib.decompress(leaf2)
         node = btree_index._LeafNode(leaf2_bytes, 1, 0)
-        self.assertEqual(1200 - 629, len(node.keys))
-        self.assertEqual(sorted_node_keys[629:], sorted(node.keys))
+        self.assertEqual(800 - 451, len(node.keys))
+        self.assertEqual(sorted_node_keys[451:], sorted(node.keys))
 
     def test_second_internal_node_pointer(self):
         # The left most pointer in the second internal node in a row should
@@ -196,13 +196,13 @@
 
     def test_2_leaves_2_2(self):
         builder = btree_index.BTreeBuilder(key_elements=2, reference_lists=2)
-        nodes = self.make_nodes(250, 2, 2)
+        nodes = self.make_nodes(200, 2, 2)
         for node in nodes:
             builder.add_node(*node)
         content = builder.finish().read()
         self.assertEqual(4096*3, len(content))
         self.assertEqual(
-            "B+Tree Graph Index 1\nnode_ref_lists=2\nkey_elements=2\nlen=500\n"
+            "B+Tree Graph Index 1\nnode_ref_lists=2\nkey_elements=2\nlen=400\n"
             "row_lengths=1,2\n",
             content[:77])
         root = content[77:4096]
@@ -213,8 +213,8 @@
             "type=internal\n"
             "offset=0\n"
             "1111111111111111111111111111111111111111\x00"
-            "136136136136136136136136136136136136136136136136136136136136136"
-            "136136136136136136136136136136136136136136136136136136136\n"
+            "127127127127127127127127127127127127127127127127127127127"
+            "127127127127127127127127127127127127127127127127127127127127127\n"
             )
         self.assertEqual(expected_root, root_bytes)
         # We assume the other leaf nodes have been written correctly - layering FTW.
@@ -392,9 +392,9 @@
         # Should have done linear scan IO up the index, ignoring
         # the internal nodes:
         # The entire index should have been read
-        self.assertEqual(3223552, size)
+        self.assertEqual(4055040, size)
         readv_request = []
-        for offset in range(12288, 3223552, 4096):
+        for offset in range(16384, 4055040, 4096):
             readv_request.append((offset, 4096))
         self.assertEqual([('readv', 'index', [(0, 4096)], False, None),
              ('readv',  'index', readv_request, False, None)],



More information about the bazaar-commits mailing list