Rev 4: Move 4K chunk sizing logic to chunk_writer.py in http://people.ubuntu.com/~robertc/baz2.0/plugins/index2/trunk

Robert Collins robertc at robertcollins.net
Tue Jul 1 00:45:38 BST 2008


At http://people.ubuntu.com/~robertc/baz2.0/plugins/index2/trunk

------------------------------------------------------------
revno: 4
revision-id: robertc at robertcollins.net-20080630234537-5o71tqw764pyu2pc
parent: robertc at robertcollins.net-20080630221008-yvsw4wwkkwulerrk
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Tue 2008-07-01 09:45:37 +1000
message:
  Move 4K chunk sizing logic to chunk_writer.py
added:
  chunk_writer.py                chunk_writer.py-20080630234519-6ggn4id17nipovny-1
  tests/test_chunk_writer.py     test_chunk_writer.py-20080630234519-6ggn4id17nipovny-2
modified:
  btree_index.py                 index.py-20080624222253-p0x5f92uyh5hw734-7
  tests/__init__.py              __init__.py-20080624222253-p0x5f92uyh5hw734-10
=== modified file 'btree_index.py'
--- a/btree_index.py	2008-06-30 22:10:08 +0000
+++ b/btree_index.py	2008-06-30 23:45:37 +0000
@@ -22,7 +22,7 @@
 
 from bzrlib import index
 from bzrlib.index import _OPTION_NODE_REFS, _OPTION_KEY_ELEMENTS, _OPTION_LEN
-from bzrlib.plugins.index2 import errors
+from bzrlib.plugins.index2 import errors, chunk_writer
 
 
 _BTSIGNATURE = "B+Tree Graph Index 1\n"
@@ -61,7 +61,6 @@
 
     def finish(self):
         working = tempfile.TemporaryFile()
-        self.position = 100 # reserved for index header
         # forward sorted by key. In future we may consider topological sorting,
         # at the cost of table scans for direct lookup, or a second index for
         # direct lookup
@@ -75,37 +74,35 @@
         if len(nodes):
             self.rows = 1
             # write a leaf node, padded to 4096 (4K) bytes
-            if self.position:
-                # reserved space in first node
-                out_lines = ["\x00" * 100] # reserved
-            out_lines = []
             self.page_count = 0
             def finish_node():
-                # trailing data:
-                out_lines.append(compressor.flush())
-                working.writelines(out_lines)
-                self.position += sum(map(len, out_lines))
-                nulls_needed = 4096 - self.position % 4096
-                if nulls_needed:
-                    working.write("\x00" * nulls_needed)
-                del out_lines[:]
-                self.position = 0
+                byte_lines, next_node_line = writer.finish()
+                if self.row_lengths[-1] == 0:
+                    # padded note:
+                    working.write("\x00" * 100)
+                working.writelines(byte_lines)
                 self.page_count += 1
                 self.row_lengths[-1] += 1
 
             for key, (absent, references, value) in nodes:
                 if absent:
                     continue
-                if not out_lines:
+                if len(self.row_nodes) != self.rows:
+                    # seed some nodes
+                    for depth in range(self.rows - len(self.row_nodes) - 1):
+                        # add internal node
+                        assert False
                     if len(self.row_lengths) != self.rows:
                         # we have bumped up a row
-                        for depth in range(self.rows - len(self.row_nodes) - 1):
-                            # add internal node
-                            assert False
                         self.row_lengths.insert(0, 0)
                     # Time for a new node
-                    compressor = zlib.compressobj()
-                    out_lines.append(compressor.compress(_LEAF_FLAG))
+                    if self.row_lengths[0] == 0:
+                        # Pad the node, it might end up being the root
+                        writer = chunk_writer.ChunkWriter(3996)
+                    else:
+                        writer = chunk_writer.ChunkWriter(4096)
+                    self.row_nodes.append(writer)
+                    writer.write(_LEAF_FLAG)
 
                 key_count += 1
                 flattened_references = []
@@ -117,10 +114,9 @@
                 string_key = '\x00'.join(key)
                 line = ("%s\x00%s\x00%s\n" % (string_key,
                     '\t'.join(flattened_references), value))
-                out_lines.append(compressor.compress(line))
-                if False:
+                if writer.write(line):
                     # new node time
-                    pass
+                    assert False
             finish_node()
         working.flush()
         result = tempfile.TemporaryFile()
@@ -129,18 +125,19 @@
         lines.append(_OPTION_KEY_ELEMENTS + str(self._key_length) + '\n')
         lines.append(_OPTION_LEN + str(key_count) + '\n')
         lines.append(_OPTION_ROW_LENGTHS + ','.join(map(str, self.row_lengths)) + '\n')
+        # write the rows out:
         result.writelines(lines)
         reserved = 100 # reserved space
-        self.position = sum(map(len, lines))
+        position = sum(map(len, lines))
         working.seek(0)
         # copy nodes to the finalised file.
         node = working.read(4096)
         while node:
-            node = node[:4096-self.position]
+            node = node[reserved:]
             result.write(node)
-            result.write("\x00" * (reserved - self.position))
+            result.write("\x00" * (reserved - position))
             node = working.read(4096)
-            self.position = 0
+            position = 0
             reserved = 0
         result.flush()
         result.seek(0)

=== added file 'chunk_writer.py'
--- a/chunk_writer.py	1970-01-01 00:00:00 +0000
+++ b/chunk_writer.py	2008-06-30 23:45:37 +0000
@@ -0,0 +1,81 @@
+# index2, a bzr plugin providing experimental index types.
+# Copyright (C) 2008 Canonical Limited.
+# 
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as published
+# by the Free Software Foundation.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+# 
+
+"""ChunkWriter: write compressed data out with a fixed upper bound."""
+
+import zlib
+from zlib import Z_FINISH, Z_SYNC_FLUSH
+
+
+class ChunkWriter(object):
+    """ChunkWriter allows writing of compressed data with a fixed size.
+
+    If less data is supplied than fills a chunk, the chunk is padded with
+    NULL bytes. If more data is supplied, then the writer packs as much
+    in as it can, but never splits any item it was given.
+
+    The algorithm for packing is open to improvement! Current it is:
+     - write the bytes given
+     - if the total seen bytes so far exceeds the chunk size, flush.
+    """
+
+    def __init__(self, chunk_size):
+        """Create a ChunkWriter to write chunk_size chunks."""
+        self.chunk_size = chunk_size
+        self.compressor = zlib.compressobj()
+        self.bytes_list = []
+        self.position = 0
+        self.seen_bytes = 0
+        self.unused_bytes = None
+
+    def finish(self):
+        """Finish the chunk.
+
+        This returns the final compressed chunk, and either None, or the
+        bytes that did not fit in the chunk.
+        """
+        self.bytes_list.append(self.compressor.flush(Z_FINISH))
+        self.position += len(self.bytes_list[-1])
+        nulls_needed = self.chunk_size - self.position % self.chunk_size
+        if nulls_needed:
+            self.bytes_list.append("\x00" * nulls_needed)
+        return self.bytes_list, self.unused_bytes
+
+    def write(self, bytes):
+        """Write some bytes to the chunk.
+
+        If the bytes fit, False is returned. Otherwise True is returned
+        and the bytes have not been added to the chunk.
+        """
+        # Reject content if its likely to fail to fit. The 10 constant is to
+        # allow room for the zlib END_STREAM record in the Z_FINISH flush call.
+        if (self.seen_bytes > self.chunk_size and
+            self.position + 10 + len(bytes) > self.chunk_size):
+            self.unused_bytes = bytes
+            return True
+        self.bytes_list.append(self.compressor.compress(bytes))
+        self.position += len(self.bytes_list[-1])
+        self.seen_bytes += len(bytes)
+        # If we are at the end of what we know will fit, flush.
+        if self.seen_bytes > self.chunk_size:
+            # Note: we could strip the \x00\x00\xff\xff and reinsert it in the
+            # reader - see rfc1979. syncing on every call imposes a increase in
+            # compressed size. e.g. 3661 vs 4050 bytes for 40 200 byte rows.
+            self.bytes_list.append(self.compressor.flush(Z_SYNC_FLUSH))
+            self.position += len(self.bytes_list[-1])
+        return False
+

=== modified file 'tests/__init__.py'
--- a/tests/__init__.py	2008-06-30 22:10:08 +0000
+++ b/tests/__init__.py	2008-06-30 23:45:37 +0000
@@ -23,6 +23,7 @@
     test_modules = [
         'errors',
         'btree_index',
+        'chunk_writer',
         ]
     standard_tests.addTests(loader.loadTestsFromModuleNames(
         ['bzrlib.plugins.index2.tests.test_' + name for 

=== added file 'tests/test_chunk_writer.py'
--- a/tests/test_chunk_writer.py	1970-01-01 00:00:00 +0000
+++ b/tests/test_chunk_writer.py	2008-06-30 23:45:37 +0000
@@ -0,0 +1,66 @@
+# index2, a bzr plugin providing experimental index types.
+# Copyright (C) 2008 Canonical Limited.
+# 
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as published
+# by the Free Software Foundation.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+# 
+
+"""Tests for writing fixed size chunks with compression."""
+
+import zlib
+
+from bzrlib.plugins.index2 import chunk_writer
+from bzrlib.tests import TestCaseWithTransport
+
+
+class TestWriter(TestCaseWithTransport):
+
+    def check_chunk(self, bytes_list, size):
+        bytes = ''.join(bytes_list)
+        self.assertEqual(size, len(bytes))
+        return zlib.decompress(bytes)
+
+    def test_chunk_writer_empty(self):
+        writer = chunk_writer.ChunkWriter(4096)
+        bytes_list, unused = writer.finish()
+        node_bytes = self.check_chunk(bytes_list, 4096)
+        self.assertEqual("", node_bytes)
+        self.assertEqual(None, unused)
+
+    def test_some_data(self):
+        writer = chunk_writer.ChunkWriter(4096)
+        writer.write("foo bar baz quux\n")
+        bytes_list, unused = writer.finish()
+        node_bytes = self.check_chunk(bytes_list, 4096)
+        self.assertEqual("foo bar baz quux\n", node_bytes)
+        self.assertEqual(None, unused)
+
+    def test_too_much_data_does_not_exceed_size(self):
+        # Generate enough data to exceed 4K
+        lines = []
+        for group in range(44):
+            offset = group * 50
+            numbers = range(offset, offset + 50)
+            # Create a line with this group
+            lines.append(''.join(map(str, numbers)) + '\n')
+        writer = chunk_writer.ChunkWriter(4096)
+        for line in lines:
+            if writer.write(line):
+                break
+        bytes_list, unused = writer.finish()
+        node_bytes = self.check_chunk(bytes_list, 4096)
+        # the first 42 lines should have been added
+        expected_bytes = ''.join(lines[:42])
+        self.assertEqual(expected_bytes, node_bytes)
+        # And the line that failed should have been saved for us
+        self.assertEqual(lines[42], unused)




More information about the bazaar-commits mailing list