Rev 9: Use a compressobj again. in http://bzr.arbash-meinel.com/plugins/index2

John Arbash Meinel john at arbash-meinel.com
Tue Jul 1 19:39:32 BST 2008


At http://bzr.arbash-meinel.com/plugins/index2

------------------------------------------------------------
revno: 9
revision-id: john at arbash-meinel.com-20080701183855-ynb3lm34yrq6sp3w
parent: john at arbash-meinel.com-20080701175844-z5jnbgf6c1qvyuux
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: index2
timestamp: Tue 2008-07-01 13:38:55 -0500
message:
  Use a compressobj again. 
  
  We can do the standard compression with Z_SYNC_FLUSH until we overflow,
  and then we just try to repack. If that fails, then we know we are
  done, we just need to use a new compressobj.
  Timing tests show that it has minimal impact, but results in significantly
  smaller indices. Specifically, 3.3MB versus 4.7MB or ~ 40% savings.
-------------- next part --------------
=== modified file 'chunk_writer.py'
--- a/chunk_writer.py	2008-07-01 17:58:44 +0000
+++ b/chunk_writer.py	2008-07-01 18:38:55 +0000
@@ -36,12 +36,12 @@
     def __init__(self, chunk_size):
         """Create a ChunkWriter to write chunk_size chunks."""
         self.chunk_size = chunk_size
-        # self.compressor = zlib.compressobj()
-        self.position = 0
-        self.in_bytes_list = []
+        self.compressor = zlib.compressobj()
+        self.bytes_in = []
+        self.bytes_list = []
+        self.compressed = None
         self.seen_bytes = 0
         self.unused_bytes = None
-        self.compressed = None
 
     def finish(self):
         """Finish the chunk.
@@ -49,13 +49,29 @@
         This returns the final compressed chunk, and either None, or the
         bytes that did not fit in the chunk.
         """
-        if self.compressed is None:
-            self.compressed = zlib.compress(''.join(self.in_bytes_list))
-        self.in_bytes_list = None
+        self.bytes_in = None # Free the data cached so far, we don't need it
+        self.bytes_list.append(self.compressor.flush(Z_FINISH))
+        total_len = sum(len(b) for b in self.bytes_list)
+        nulls_needed = self.chunk_size - total_len % self.chunk_size
+        if nulls_needed:
+            self.bytes_list.append("\x00" * nulls_needed)
+        return self.bytes_list, self.unused_bytes
 
-        nulls_needed = self.chunk_size - len(self.compressed)
-        nulls = '\x00' * nulls_needed
-        return [self.compressed, nulls], self.unused_bytes
+    def _recompress_all_bytes_in(self, extra_bytes=None):
+        compressor = zlib.compressobj()
+        bytes_out = []
+        for accepted_bytes in self.bytes_in:
+            out = compressor.compress(accepted_bytes)
+            if out:
+                bytes_out.append(out)
+        if extra_bytes:
+            out = compressor.compress(extra_bytes)
+            if out:
+                bytes_out.append(out)
+            out = compressor.flush(Z_SYNC_FLUSH)
+            if out:
+                bytes_out.append(out)
+        return bytes_out, compressor
 
     def write(self, bytes):
         """Write some bytes to the chunk.
@@ -63,38 +79,47 @@
         If the bytes fit, False is returned. Otherwise True is returned
         and the bytes have not been added to the chunk.
         """
-        if (self.seen_bytes < 1.8 * self.chunk_size):
-            # Just track the data
-            self.in_bytes_list.append(bytes)
-            self.seen_bytes += len(bytes)
+        # Check quickly to see if this is likely to put us outside of our
+        # budget:
+        next_seen_size = self.seen_bytes + len(bytes)
+        if (next_seen_size < self.chunk_size):
+            # No need, we assume this will "just fit"
+            out = self.compressor.compress(bytes)
+            self.bytes_in.append(bytes)
+            self.seen_bytes = next_seen_size
+            if out:
+                self.bytes_list.append(out)
         else:
-            # Try to compress all seen chunks
-            next = self.in_bytes_list + [bytes]
-            compressed = zlib.compress(''.join(next))
-            if len(compressed) > self.chunk_size:
-                self.unused_bytes = bytes
-                return True
-            # The compression succeeded, so stick with it for now
-            self.in_bytes_list = next
-            self.compressed = compressed
-            self.seen_bytes += len(bytes)
+            # This may or may not fit, try to add it with Z_SYNC_FLUSH
+            out = self.compressor.compress(bytes)
+            if out:
+                self.bytes_list.append(out)
+            out = self.compressor.flush(Z_SYNC_FLUSH)
+            if out:
+                self.bytes_list.append(out)
+            total_len = sum(len(b) for b in self.bytes_list)
+            # Give us some extra room for a final Z_FINISH call.
+            if total_len + 10 > self.chunk_size:
+                # We are over budget, try to squeeze this in without any
+                # Z_SYNC_FLUSH calls
+                bytes_out, compressor = self._recompress_all_bytes_in(bytes)
+                this_len = sum(len(b) for b in bytes_out)
+                if this_len + 10 > self.chunk_size:
+                    # No way we can add anymore, we need to re-pack because our
+                    # compressor is now out of sync
+                    bytes_out, compressor = self._recompress_all_bytes_in()
+                    self.compressor = compressor
+                    self.bytes_list = bytes_out
+                    self.unused_bytes = bytes
+                    return True
+                else:
+                    # This fits when we pack it tighter, so use the new packing
+                    self.compressor = compressor
+                    self.bytes_in.append(bytes)
+                    self.bytes_list = bytes_out
+            else:
+                # It fit, so mark it added
+                self.bytes_in.append(bytes)
+                self.seen_bytes = next_seen_size
         return False
 
-        # Reject content if its likely to fail to fit. The 10 constant is to
-        # allow room for the zlib END_STREAM record in the Z_FINISH flush call.
-        # if (self.seen_bytes > self.chunk_size and
-        #     self.position + 10 + len(bytes) > self.chunk_size):
-        #     self.unused_bytes = bytes
-        #     return True
-        # self.bytes_list.append(self.compressor.compress(bytes))
-        # self.position += len(self.bytes_list[-1])
-        # self.seen_bytes += len(bytes)
-        # # If we are at the end of what we know will fit, flush.
-        # if self.seen_bytes > self.chunk_size:
-        #     # Note: we could strip the \x00\x00\xff\xff and reinsert it in the
-        #     # reader - see rfc1979. syncing on every call imposes a increase in
-        #     # compressed size. e.g. 3661 vs 4050 bytes for 40 200 byte rows.
-        #     self.bytes_list.append(self.compressor.flush(Z_SYNC_FLUSH))
-        #     self.position += len(self.bytes_list[-1])
-        # return False
-

=== modified file 'tests/test_chunk_writer.py'
--- a/tests/test_chunk_writer.py	2008-06-30 23:45:37 +0000
+++ b/tests/test_chunk_writer.py	2008-07-01 18:38:55 +0000
@@ -48,7 +48,7 @@
     def test_too_much_data_does_not_exceed_size(self):
         # Generate enough data to exceed 4K
         lines = []
-        for group in range(44):
+        for group in range(48):
             offset = group * 50
             numbers = range(offset, offset + 50)
             # Create a line with this group
@@ -59,8 +59,8 @@
                 break
         bytes_list, unused = writer.finish()
         node_bytes = self.check_chunk(bytes_list, 4096)
-        # the first 42 lines should have been added
-        expected_bytes = ''.join(lines[:42])
-        self.assertEqual(expected_bytes, node_bytes)
+        # the first 46 lines should have been added
+        expected_bytes = ''.join(lines[:46])
+        self.assertEqualDiff(expected_bytes, node_bytes)
         # And the line that failed should have been saved for us
-        self.assertEqual(lines[42], unused)
+        self.assertEqual(lines[46], unused)



More information about the bazaar-commits mailing list