Rev 9: Use a compressobj again. in http://bzr.arbash-meinel.com/plugins/index2
John Arbash Meinel
john at arbash-meinel.com
Tue Jul 1 19:39:32 BST 2008
At http://bzr.arbash-meinel.com/plugins/index2
------------------------------------------------------------
revno: 9
revision-id: john at arbash-meinel.com-20080701183855-ynb3lm34yrq6sp3w
parent: john at arbash-meinel.com-20080701175844-z5jnbgf6c1qvyuux
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: index2
timestamp: Tue 2008-07-01 13:38:55 -0500
message:
Use a compressobj again.
We can do the standard compression with Z_SYNC_FLUSH until we overflow,
and then we just try to repack. If that fails, then we know we are
done, we just need to use a new compressobj.
Timing tests show that it has minimal impact, but results in significantly
smaller indices. Specifically, 3.3MB versus 4.7MB or ~ 40% savings.
-------------- next part --------------
=== modified file 'chunk_writer.py'
--- a/chunk_writer.py 2008-07-01 17:58:44 +0000
+++ b/chunk_writer.py 2008-07-01 18:38:55 +0000
@@ -36,12 +36,12 @@
def __init__(self, chunk_size):
"""Create a ChunkWriter to write chunk_size chunks."""
self.chunk_size = chunk_size
- # self.compressor = zlib.compressobj()
- self.position = 0
- self.in_bytes_list = []
+ self.compressor = zlib.compressobj()
+ self.bytes_in = []
+ self.bytes_list = []
+ self.compressed = None
self.seen_bytes = 0
self.unused_bytes = None
- self.compressed = None
def finish(self):
"""Finish the chunk.
@@ -49,13 +49,29 @@
This returns the final compressed chunk, and either None, or the
bytes that did not fit in the chunk.
"""
- if self.compressed is None:
- self.compressed = zlib.compress(''.join(self.in_bytes_list))
- self.in_bytes_list = None
+ self.bytes_in = None # Free the data cached so far, we don't need it
+ self.bytes_list.append(self.compressor.flush(Z_FINISH))
+ total_len = sum(len(b) for b in self.bytes_list)
+ nulls_needed = self.chunk_size - total_len % self.chunk_size
+ if nulls_needed:
+ self.bytes_list.append("\x00" * nulls_needed)
+ return self.bytes_list, self.unused_bytes
- nulls_needed = self.chunk_size - len(self.compressed)
- nulls = '\x00' * nulls_needed
- return [self.compressed, nulls], self.unused_bytes
+ def _recompress_all_bytes_in(self, extra_bytes=None):
+ compressor = zlib.compressobj()
+ bytes_out = []
+ for accepted_bytes in self.bytes_in:
+ out = compressor.compress(accepted_bytes)
+ if out:
+ bytes_out.append(out)
+ if extra_bytes:
+ out = compressor.compress(extra_bytes)
+ if out:
+ bytes_out.append(out)
+ out = compressor.flush(Z_SYNC_FLUSH)
+ if out:
+ bytes_out.append(out)
+ return bytes_out, compressor
def write(self, bytes):
"""Write some bytes to the chunk.
@@ -63,38 +79,47 @@
If the bytes fit, False is returned. Otherwise True is returned
and the bytes have not been added to the chunk.
"""
- if (self.seen_bytes < 1.8 * self.chunk_size):
- # Just track the data
- self.in_bytes_list.append(bytes)
- self.seen_bytes += len(bytes)
+ # Check quickly to see if this is likely to put us outside of our
+ # budget:
+ next_seen_size = self.seen_bytes + len(bytes)
+ if (next_seen_size < self.chunk_size):
+ # No need, we assume this will "just fit"
+ out = self.compressor.compress(bytes)
+ self.bytes_in.append(bytes)
+ self.seen_bytes = next_seen_size
+ if out:
+ self.bytes_list.append(out)
else:
- # Try to compress all seen chunks
- next = self.in_bytes_list + [bytes]
- compressed = zlib.compress(''.join(next))
- if len(compressed) > self.chunk_size:
- self.unused_bytes = bytes
- return True
- # The compression succeeded, so stick with it for now
- self.in_bytes_list = next
- self.compressed = compressed
- self.seen_bytes += len(bytes)
+ # This may or may not fit, try to add it with Z_SYNC_FLUSH
+ out = self.compressor.compress(bytes)
+ if out:
+ self.bytes_list.append(out)
+ out = self.compressor.flush(Z_SYNC_FLUSH)
+ if out:
+ self.bytes_list.append(out)
+ total_len = sum(len(b) for b in self.bytes_list)
+ # Give us some extra room for a final Z_FINISH call.
+ if total_len + 10 > self.chunk_size:
+ # We are over budget, try to squeeze this in without any
+ # Z_SYNC_FLUSH calls
+ bytes_out, compressor = self._recompress_all_bytes_in(bytes)
+ this_len = sum(len(b) for b in bytes_out)
+ if this_len + 10 > self.chunk_size:
+ # No way we can add anymore, we need to re-pack because our
+ # compressor is now out of sync
+ bytes_out, compressor = self._recompress_all_bytes_in()
+ self.compressor = compressor
+ self.bytes_list = bytes_out
+ self.unused_bytes = bytes
+ return True
+ else:
+ # This fits when we pack it tighter, so use the new packing
+ self.compressor = compressor
+ self.bytes_in.append(bytes)
+ self.bytes_list = bytes_out
+ else:
+ # It fit, so mark it added
+ self.bytes_in.append(bytes)
+ self.seen_bytes = next_seen_size
return False
- # Reject content if its likely to fail to fit. The 10 constant is to
- # allow room for the zlib END_STREAM record in the Z_FINISH flush call.
- # if (self.seen_bytes > self.chunk_size and
- # self.position + 10 + len(bytes) > self.chunk_size):
- # self.unused_bytes = bytes
- # return True
- # self.bytes_list.append(self.compressor.compress(bytes))
- # self.position += len(self.bytes_list[-1])
- # self.seen_bytes += len(bytes)
- # # If we are at the end of what we know will fit, flush.
- # if self.seen_bytes > self.chunk_size:
- # # Note: we could strip the \x00\x00\xff\xff and reinsert it in the
- # # reader - see rfc1979. syncing on every call imposes a increase in
- # # compressed size. e.g. 3661 vs 4050 bytes for 40 200 byte rows.
- # self.bytes_list.append(self.compressor.flush(Z_SYNC_FLUSH))
- # self.position += len(self.bytes_list[-1])
- # return False
-
=== modified file 'tests/test_chunk_writer.py'
--- a/tests/test_chunk_writer.py 2008-06-30 23:45:37 +0000
+++ b/tests/test_chunk_writer.py 2008-07-01 18:38:55 +0000
@@ -48,7 +48,7 @@
def test_too_much_data_does_not_exceed_size(self):
# Generate enough data to exceed 4K
lines = []
- for group in range(44):
+ for group in range(48):
offset = group * 50
numbers = range(offset, offset + 50)
# Create a line with this group
@@ -59,8 +59,8 @@
break
bytes_list, unused = writer.finish()
node_bytes = self.check_chunk(bytes_list, 4096)
- # the first 42 lines should have been added
- expected_bytes = ''.join(lines[:42])
- self.assertEqual(expected_bytes, node_bytes)
+ # the first 46 lines should have been added
+ expected_bytes = ''.join(lines[:46])
+ self.assertEqualDiff(expected_bytes, node_bytes)
# And the line that failed should have been saved for us
- self.assertEqual(lines[42], unused)
+ self.assertEqual(lines[46], unused)
More information about the bazaar-commits
mailing list