Rev 4: Move 4K chunk sizing logic to chunk_writer.py in http://people.ubuntu.com/~robertc/baz2.0/plugins/index2/trunk
Robert Collins
robertc at robertcollins.net
Tue Jul 1 00:45:38 BST 2008
At http://people.ubuntu.com/~robertc/baz2.0/plugins/index2/trunk
------------------------------------------------------------
revno: 4
revision-id: robertc at robertcollins.net-20080630234537-5o71tqw764pyu2pc
parent: robertc at robertcollins.net-20080630221008-yvsw4wwkkwulerrk
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Tue 2008-07-01 09:45:37 +1000
message:
Move 4K chunk sizing logic to chunk_writer.py
added:
chunk_writer.py chunk_writer.py-20080630234519-6ggn4id17nipovny-1
tests/test_chunk_writer.py test_chunk_writer.py-20080630234519-6ggn4id17nipovny-2
modified:
btree_index.py index.py-20080624222253-p0x5f92uyh5hw734-7
tests/__init__.py __init__.py-20080624222253-p0x5f92uyh5hw734-10
=== modified file 'btree_index.py'
--- a/btree_index.py 2008-06-30 22:10:08 +0000
+++ b/btree_index.py 2008-06-30 23:45:37 +0000
@@ -22,7 +22,7 @@
from bzrlib import index
from bzrlib.index import _OPTION_NODE_REFS, _OPTION_KEY_ELEMENTS, _OPTION_LEN
-from bzrlib.plugins.index2 import errors
+from bzrlib.plugins.index2 import errors, chunk_writer
_BTSIGNATURE = "B+Tree Graph Index 1\n"
@@ -61,7 +61,6 @@
def finish(self):
working = tempfile.TemporaryFile()
- self.position = 100 # reserved for index header
# forward sorted by key. In future we may consider topological sorting,
# at the cost of table scans for direct lookup, or a second index for
# direct lookup
@@ -75,37 +74,35 @@
if len(nodes):
self.rows = 1
# write a leaf node, padded to 4096 (4K) bytes
- if self.position:
- # reserved space in first node
- out_lines = ["\x00" * 100] # reserved
- out_lines = []
self.page_count = 0
def finish_node():
- # trailing data:
- out_lines.append(compressor.flush())
- working.writelines(out_lines)
- self.position += sum(map(len, out_lines))
- nulls_needed = 4096 - self.position % 4096
- if nulls_needed:
- working.write("\x00" * nulls_needed)
- del out_lines[:]
- self.position = 0
+ byte_lines, next_node_line = writer.finish()
+ if self.row_lengths[-1] == 0:
+ # padded note:
+ working.write("\x00" * 100)
+ working.writelines(byte_lines)
self.page_count += 1
self.row_lengths[-1] += 1
for key, (absent, references, value) in nodes:
if absent:
continue
- if not out_lines:
+ if len(self.row_nodes) != self.rows:
+ # seed some nodes
+ for depth in range(self.rows - len(self.row_nodes) - 1):
+ # add internal node
+ assert False
if len(self.row_lengths) != self.rows:
# we have bumped up a row
- for depth in range(self.rows - len(self.row_nodes) - 1):
- # add internal node
- assert False
self.row_lengths.insert(0, 0)
# Time for a new node
- compressor = zlib.compressobj()
- out_lines.append(compressor.compress(_LEAF_FLAG))
+ if self.row_lengths[0] == 0:
+ # Pad the node, it might end up being the root
+ writer = chunk_writer.ChunkWriter(3996)
+ else:
+ writer = chunk_writer.ChunkWriter(4096)
+ self.row_nodes.append(writer)
+ writer.write(_LEAF_FLAG)
key_count += 1
flattened_references = []
@@ -117,10 +114,9 @@
string_key = '\x00'.join(key)
line = ("%s\x00%s\x00%s\n" % (string_key,
'\t'.join(flattened_references), value))
- out_lines.append(compressor.compress(line))
- if False:
+ if writer.write(line):
# new node time
- pass
+ assert False
finish_node()
working.flush()
result = tempfile.TemporaryFile()
@@ -129,18 +125,19 @@
lines.append(_OPTION_KEY_ELEMENTS + str(self._key_length) + '\n')
lines.append(_OPTION_LEN + str(key_count) + '\n')
lines.append(_OPTION_ROW_LENGTHS + ','.join(map(str, self.row_lengths)) + '\n')
+ # write the rows out:
result.writelines(lines)
reserved = 100 # reserved space
- self.position = sum(map(len, lines))
+ position = sum(map(len, lines))
working.seek(0)
# copy nodes to the finalised file.
node = working.read(4096)
while node:
- node = node[:4096-self.position]
+ node = node[reserved:]
result.write(node)
- result.write("\x00" * (reserved - self.position))
+ result.write("\x00" * (reserved - position))
node = working.read(4096)
- self.position = 0
+ position = 0
reserved = 0
result.flush()
result.seek(0)
=== added file 'chunk_writer.py'
--- a/chunk_writer.py 1970-01-01 00:00:00 +0000
+++ b/chunk_writer.py 2008-06-30 23:45:37 +0000
@@ -0,0 +1,81 @@
+# index2, a bzr plugin providing experimental index types.
+# Copyright (C) 2008 Canonical Limited.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as published
+# by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+#
+
+"""ChunkWriter: write compressed data out with a fixed upper bound."""
+
+import zlib
+from zlib import Z_FINISH, Z_SYNC_FLUSH
+
+
+class ChunkWriter(object):
+ """ChunkWriter allows writing of compressed data with a fixed size.
+
+ If less data is supplied than fills a chunk, the chunk is padded with
+ NULL bytes. If more data is supplied, then the writer packs as much
+ in as it can, but never splits any item it was given.
+
+ The algorithm for packing is open to improvement! Current it is:
+ - write the bytes given
+ - if the total seen bytes so far exceeds the chunk size, flush.
+ """
+
+ def __init__(self, chunk_size):
+ """Create a ChunkWriter to write chunk_size chunks."""
+ self.chunk_size = chunk_size
+ self.compressor = zlib.compressobj()
+ self.bytes_list = []
+ self.position = 0
+ self.seen_bytes = 0
+ self.unused_bytes = None
+
+ def finish(self):
+ """Finish the chunk.
+
+ This returns the final compressed chunk, and either None, or the
+ bytes that did not fit in the chunk.
+ """
+ self.bytes_list.append(self.compressor.flush(Z_FINISH))
+ self.position += len(self.bytes_list[-1])
+ nulls_needed = self.chunk_size - self.position % self.chunk_size
+ if nulls_needed:
+ self.bytes_list.append("\x00" * nulls_needed)
+ return self.bytes_list, self.unused_bytes
+
+ def write(self, bytes):
+ """Write some bytes to the chunk.
+
+ If the bytes fit, False is returned. Otherwise True is returned
+ and the bytes have not been added to the chunk.
+ """
+ # Reject content if its likely to fail to fit. The 10 constant is to
+ # allow room for the zlib END_STREAM record in the Z_FINISH flush call.
+ if (self.seen_bytes > self.chunk_size and
+ self.position + 10 + len(bytes) > self.chunk_size):
+ self.unused_bytes = bytes
+ return True
+ self.bytes_list.append(self.compressor.compress(bytes))
+ self.position += len(self.bytes_list[-1])
+ self.seen_bytes += len(bytes)
+ # If we are at the end of what we know will fit, flush.
+ if self.seen_bytes > self.chunk_size:
+ # Note: we could strip the \x00\x00\xff\xff and reinsert it in the
+ # reader - see rfc1979. syncing on every call imposes a increase in
+ # compressed size. e.g. 3661 vs 4050 bytes for 40 200 byte rows.
+ self.bytes_list.append(self.compressor.flush(Z_SYNC_FLUSH))
+ self.position += len(self.bytes_list[-1])
+ return False
+
=== modified file 'tests/__init__.py'
--- a/tests/__init__.py 2008-06-30 22:10:08 +0000
+++ b/tests/__init__.py 2008-06-30 23:45:37 +0000
@@ -23,6 +23,7 @@
test_modules = [
'errors',
'btree_index',
+ 'chunk_writer',
]
standard_tests.addTests(loader.loadTestsFromModuleNames(
['bzrlib.plugins.index2.tests.test_' + name for
=== added file 'tests/test_chunk_writer.py'
--- a/tests/test_chunk_writer.py 1970-01-01 00:00:00 +0000
+++ b/tests/test_chunk_writer.py 2008-06-30 23:45:37 +0000
@@ -0,0 +1,66 @@
+# index2, a bzr plugin providing experimental index types.
+# Copyright (C) 2008 Canonical Limited.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as published
+# by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+#
+
+"""Tests for writing fixed size chunks with compression."""
+
+import zlib
+
+from bzrlib.plugins.index2 import chunk_writer
+from bzrlib.tests import TestCaseWithTransport
+
+
+class TestWriter(TestCaseWithTransport):
+
+ def check_chunk(self, bytes_list, size):
+ bytes = ''.join(bytes_list)
+ self.assertEqual(size, len(bytes))
+ return zlib.decompress(bytes)
+
+ def test_chunk_writer_empty(self):
+ writer = chunk_writer.ChunkWriter(4096)
+ bytes_list, unused = writer.finish()
+ node_bytes = self.check_chunk(bytes_list, 4096)
+ self.assertEqual("", node_bytes)
+ self.assertEqual(None, unused)
+
+ def test_some_data(self):
+ writer = chunk_writer.ChunkWriter(4096)
+ writer.write("foo bar baz quux\n")
+ bytes_list, unused = writer.finish()
+ node_bytes = self.check_chunk(bytes_list, 4096)
+ self.assertEqual("foo bar baz quux\n", node_bytes)
+ self.assertEqual(None, unused)
+
+ def test_too_much_data_does_not_exceed_size(self):
+ # Generate enough data to exceed 4K
+ lines = []
+ for group in range(44):
+ offset = group * 50
+ numbers = range(offset, offset + 50)
+ # Create a line with this group
+ lines.append(''.join(map(str, numbers)) + '\n')
+ writer = chunk_writer.ChunkWriter(4096)
+ for line in lines:
+ if writer.write(line):
+ break
+ bytes_list, unused = writer.finish()
+ node_bytes = self.check_chunk(bytes_list, 4096)
+ # the first 42 lines should have been added
+ expected_bytes = ''.join(lines[:42])
+ self.assertEqual(expected_bytes, node_bytes)
+ # And the line that failed should have been saved for us
+ self.assertEqual(lines[42], unused)
More information about the bazaar-commits
mailing list