Rev 3674: (broken, but hopeful) Change the compact logic. in http://bzr.arbash-meinel.com/branches/bzr/1.7-dev/btree

John Arbash Meinel john at arbash-meinel.com
Fri Aug 22 06:40:16 BST 2008


At http://bzr.arbash-meinel.com/branches/bzr/1.7-dev/btree

------------------------------------------------------------
revno: 3674
revision-id: john at arbash-meinel.com-20080822054012-ikrwmq9nm2q4h6q8
parent: john at arbash-meinel.com-20080822035819-yx19e7qxdvjgaeql
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: btree
timestamp: Fri 2008-08-22 00:40:12 -0500
message:
  (broken, but hopeful) Change the compact logic.
  Instead of only paying attention to the total bytes read,
  use the fact that we *know* some of that is already compressed well.
  So instead, we just pay attention to the bytes that are added since
  the last sync. This means we Z_SYNC_FLUSH much less often.
  (After syncing, we end up with more room to add without syncing.)
  This improves both the time to compress and the final compressed
  size. Need to update tests with the new offsets.
  Also, we seem to have found a case where using Z_SYNC_FLUSH
  in the middle of a stream will actually generate *better*
  compression than compressing the whole stream in one pass.
  test_too_much_data_does_not_exceed_size triggers this.
  It *can* be packed in more than 100 bytes less than the
  amount given by a full compress.
modified:
  bzrlib/chunk_writer.py         chunk_writer.py-20080630234519-6ggn4id17nipovny-1
  bzrlib/tests/test_chunk_writer.py test_chunk_writer.py-20080630234519-6ggn4id17nipovny-2
-------------- next part --------------
=== modified file 'bzrlib/chunk_writer.py'
--- a/bzrlib/chunk_writer.py	2008-08-22 03:58:19 +0000
+++ b/bzrlib/chunk_writer.py	2008-08-22 05:40:12 +0000
@@ -20,7 +20,9 @@
 import zlib
 from zlib import Z_FINISH, Z_SYNC_FLUSH
 
-_stats = [0, 0, 0]
+# [max_repack, buffer_full, repacks_with_space, min_compression,
+#  total_bytes_in, total_bytes_out, avg_comp]
+_stats = [0, 0, 0, 999, 0, 0, 0]
 
 class ChunkWriter(object):
     """ChunkWriter allows writing of compressed data with a fixed size.
@@ -54,17 +56,9 @@
              3      60.3  13.5  3407            197
              4      66.7  13.4  3203            2154
             20      69.3  13.4  0               3380
-
-    :cvar _default_min_compression_size: The expected minimum compression.
-        While packing nodes into the page, we won't Z_SYNC_FLUSH until we have
-        received this much input data. This saves time, because we don't bloat
-        the result with SYNC entries (and then need to repack), but if it is
-        set too high we will accept data that will never fit and trigger a
-        fault later.
     """
 
     _max_repack = 2
-    _default_min_compression_size = 1.8
 
     def __init__(self, chunk_size, reserved=0):
         """Create a ChunkWriter to write chunk_size chunks.
@@ -81,10 +75,12 @@
         self.bytes_out_len = 0
         self.compressed = None
         self.seen_bytes = 0
+        # bytes that have been seen, but not included in a flush to out yet
+        self.unflushed_in_bytes = 0
         self.num_repack = 0
+        self.done = False # We will accept no more bytes
         self.unused_bytes = None
         self.reserved_size = reserved
-        self.min_compress_size = self._default_min_compression_size
 
     def finish(self):
         """Finish the chunk.
@@ -96,6 +92,14 @@
         out = self.compressor.flush(Z_FINISH)
         self.bytes_list.append(out)
         self.bytes_out_len += len(out)
+        if self.num_repack > 0 and self.bytes_out_len > 0:
+            comp = float(self.seen_bytes) / self.bytes_out_len
+            if comp < _stats[3]:
+                _stats[3] = comp
+        _stats[4] += self.seen_bytes
+        _stats[5] += self.bytes_out_len
+        _stats[6] = float(_stats[4]) / _stats[5]
+
         if self.bytes_out_len > self.chunk_size:
             raise AssertionError('Somehow we ended up with too much'
                                  ' compressed data, %d > %d'
@@ -132,9 +136,9 @@
                 append(out)
         if extra_bytes:
             out = compress(extra_bytes)
-            out += compressor.flush(Z_SYNC_FLUSH)
             if out:
                 append(out)
+        append(compressor.flush(Z_SYNC_FLUSH))
         bytes_out_len = sum(map(len, bytes_out))
         return bytes_out, bytes_out_len, compressor
 
@@ -144,36 +148,39 @@
         If the bytes fit, False is returned. Otherwise True is returned
         and the bytes have not been added to the chunk.
         """
+        if self.num_repack > self._max_repack and not reserved:
+            self.unused_bytes = bytes
+            return True
         if reserved:
             capacity = self.chunk_size
         else:
             capacity = self.chunk_size - self.reserved_size
-        # Check quickly to see if this is likely to put us outside of our
-        # budget:
-        next_seen_size = self.seen_bytes + len(bytes)
         comp = self.compressor
-        if (next_seen_size < self.min_compress_size * capacity):
-            # No need, we assume this will "just fit"
+        # Check to see if the currently unflushed bytes would fit with a bit of
+        # room to spare, assuming no compression.
+        next_unflushed = self.unflushed_in_bytes + len(bytes)
+        remaining_capacity = capacity - self.bytes_out_len - 10
+        if (next_unflushed < remaining_capacity):
+            # Yes, just push it in, assuming it will fit
             out = comp.compress(bytes)
             if out:
                 self.bytes_list.append(out)
                 self.bytes_out_len += len(out)
             self.bytes_in.append(bytes)
-            self.seen_bytes = next_seen_size
+            self.seen_bytes += len(bytes)
+            self.unflushed_in_bytes += len(bytes)
         else:
-            if self.num_repack > self._max_repack and not reserved:
-                self.unused_bytes = bytes
-                return True
             # This may or may not fit, try to add it with Z_SYNC_FLUSH
             out = comp.compress(bytes)
             out += comp.flush(Z_SYNC_FLUSH)
+            self.unflushed_in_bytes = 0
             if out:
                 self.bytes_list.append(out)
                 self.bytes_out_len += len(out)
             if self.bytes_out_len + 10 <= capacity:
                 # It fit, so mark it added
                 self.bytes_in.append(bytes)
-                self.seen_bytes = next_seen_size
+                self.seen_bytes += len(bytes)
             else:
                 # We are over budget, try to squeeze this in without any
                 # Z_SYNC_FLUSH calls
@@ -186,12 +193,12 @@
                     self.num_repack += 1
                     _stats[0] += 1
                 if this_len + 10 > capacity:
-                    # In real-world testing, this only happens when _max_repack
-                    # is set >2, and even then rarely (46 out of 1022)
                     (bytes_out, this_len,
                      compressor) = self._recompress_all_bytes_in()
                     _stats[1] += 1
                     self.compressor = compressor
+                    # Force us to not allow more data
+                    self.num_repack = self._max_repack + 1
                     self.bytes_list = bytes_out
                     self.bytes_out_len = this_len
                     self.unused_bytes = bytes

=== modified file 'bzrlib/tests/test_chunk_writer.py'
--- a/bzrlib/tests/test_chunk_writer.py	2008-08-22 03:58:19 +0000
+++ b/bzrlib/tests/test_chunk_writer.py	2008-08-22 05:40:12 +0000
@@ -58,8 +58,11 @@
             # Create a line with this group
             lines.append(''.join(map(str, numbers)) + '\n')
         writer = chunk_writer.ChunkWriter(4096)
-        for line in lines:
+        for idx, line in enumerate(lines):
+            if idx >= 45:
+                import pdb; pdb.set_trace()
             if writer.write(line):
+                self.assertEqual(47, idx)
                 break
         bytes_list, unused, _ = writer.finish()
         node_bytes = self.check_chunk(bytes_list, 4096)



More information about the bazaar-commits mailing list