Rev 4747: change the GroupcompressBlock code a bit. in http://bazaar.launchpad.net/~jameinel/bzr/2.1-peak-mem-tweak

John Arbash Meinel john at arbash-meinel.com
Sat Oct 17 05:30:09 BST 2009


At http://bazaar.launchpad.net/~jameinel/bzr/2.1-peak-mem-tweak

------------------------------------------------------------
revno: 4747
revision-id: john at arbash-meinel.com-20091017042950-bazb5obtctyq8333
parent: john at arbash-meinel.com-20091015204421-mstpm5cfeldojxoq
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.1-peak-mem-tweak
timestamp: Fri 2009-10-16 23:29:50 -0500
message:
  change the GroupcompressBlock code a bit.
  If the first decompress request is big enough, just decompress everything.
  And when we do that, let go of the decompressobj.
  
  After digging through the zlib code, it looks like 1 zlib stream object
  contains a 5kB internal state, and another 4*64kB buffers. (about 260kB
  of total state.)
  That turns out to be quite a lot if you think about it.
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2009-09-15 02:57:23 +0000
+++ b/bzrlib/groupcompress.py	2009-10-17 04:29:50 +0000
@@ -119,13 +119,8 @@
         :param num_bytes: Ensure that we have extracted at least num_bytes of
             content. If None, consume everything
         """
-        # TODO: If we re-use the same content block at different times during
-        #       get_record_stream(), it is possible that the first pass will
-        #       get inserted, triggering an extract/_ensure_content() which
-        #       will get rid of _z_content. And then the next use of the block
-        #       will try to access _z_content (to send it over the wire), and
-        #       fail because it is already extracted. Consider never releasing
-        #       _z_content because of this.
+        if self._content_length is None:
+            raise AssertionError('self._content_length should never be None')
         if num_bytes is None:
             num_bytes = self._content_length
         elif (self._content_length is not None
@@ -148,7 +143,10 @@
                 self._content = pylzma.decompress(self._z_content)
             elif self._compressor_name == 'zlib':
                 # Start a zlib decompressor
-                if num_bytes is None:
+                if num_bytes * 4 > self._content_length * 3:
+                    # If we are requesting more that 3/4ths of the content,
+                    # just extract the whole thing in a single pass
+                    num_bytes = self._content_length
                     self._content = zlib.decompress(self._z_content)
                 else:
                     self._z_content_decompressor = zlib.decompressobj()
@@ -156,6 +154,8 @@
                     # that the rest of the code is simplified
                     self._content = self._z_content_decompressor.decompress(
                         self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
+                    if not self._z_content_decompressor.unconsumed_tail:
+                        self._z_content_decompressor = None
             else:
                 raise AssertionError('Unknown compressor: %r'
                                      % self._compressor_name)
@@ -163,45 +163,28 @@
         # 'unconsumed_tail'
 
         # Do we have enough bytes already?
-        if num_bytes is not None and len(self._content) >= num_bytes:
-            return
-        if num_bytes is None and self._z_content_decompressor is None:
-            # We must have already decompressed everything
+        if len(self._content) >= num_bytes:
             return
         # If we got this far, and don't have a decompressor, something is wrong
         if self._z_content_decompressor is None:
             raise AssertionError(
                 'No decompressor to decompress %d bytes' % num_bytes)
         remaining_decomp = self._z_content_decompressor.unconsumed_tail
-        if num_bytes is None:
-            if remaining_decomp:
-                # We don't know how much is left, but we'll decompress it all
-                self._content += self._z_content_decompressor.decompress(
-                    remaining_decomp)
-                # Note: There's what I consider a bug in zlib.decompressobj
-                #       If you pass back in the entire unconsumed_tail, only
-                #       this time you don't pass a max-size, it doesn't
-                #       change the unconsumed_tail back to None/''.
-                #       However, we know we are done with the whole stream
-                self._z_content_decompressor = None
-            # XXX: Why is this the only place in this routine we set this?
-            self._content_length = len(self._content)
-        else:
-            if not remaining_decomp:
-                raise AssertionError('Nothing left to decompress')
-            needed_bytes = num_bytes - len(self._content)
-            # We always set max_size to 32kB over the minimum needed, so that
-            # zlib will give us as much as we really want.
-            # TODO: If this isn't good enough, we could make a loop here,
-            #       that keeps expanding the request until we get enough
-            self._content += self._z_content_decompressor.decompress(
-                remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
-            if len(self._content) < num_bytes:
-                raise AssertionError('%d bytes wanted, only %d available'
-                                     % (num_bytes, len(self._content)))
-            if not self._z_content_decompressor.unconsumed_tail:
-                # The stream is finished
-                self._z_content_decompressor = None
+        if not remaining_decomp:
+            raise AssertionError('Nothing left to decompress')
+        needed_bytes = num_bytes - len(self._content)
+        # We always set max_size to 32kB over the minimum needed, so that
+        # zlib will give us as much as we really want.
+        # TODO: If this isn't good enough, we could make a loop here,
+        #       that keeps expanding the request until we get enough
+        self._content += self._z_content_decompressor.decompress(
+            remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
+        if len(self._content) < num_bytes:
+            raise AssertionError('%d bytes wanted, only %d available'
+                                 % (num_bytes, len(self._content)))
+        if not self._z_content_decompressor.unconsumed_tail:
+            # The stream is finished
+            self._z_content_decompressor = None
 
     def _parse_bytes(self, bytes, pos):
         """Read the various lengths from the header.

=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py	2009-09-07 03:35:06 +0000
+++ b/bzrlib/tests/test_groupcompress.py	2009-10-17 04:29:50 +0000
@@ -418,8 +418,12 @@
         # And the decompressor is finalized
         self.assertIs(None, block._z_content_decompressor)
 
-    def test_partial_decomp_no_known_length(self):
+    def test__ensure_all_content(self):
         content_chunks = []
+        # We need a sufficient amount of data so that zlib.decompress has
+        # partial decompression to work with. Most auto-generated data
+        # compresses a bit too well, we want a combination, so we combine a sha
+        # hash with compressible data.
         for i in xrange(2048):
             next_content = '%d\nThis is a bit of duplicate text\n' % (i,)
             content_chunks.append(next_content)
@@ -433,30 +437,13 @@
         block._z_content = z_content
         block._z_content_length = len(z_content)
         block._compressor_name = 'zlib'
-        block._content_length = None # Don't tell the decompressed length
+        block._content_length = 158634
         self.assertIs(None, block._content)
-        block._ensure_content(100)
-        self.assertIsNot(None, block._content)
-        # We have decompressed at least 100 bytes
-        self.assertTrue(len(block._content) >= 100)
-        # We have not decompressed the whole content
-        self.assertTrue(len(block._content) < 158634)
-        self.assertEqualDiff(content[:len(block._content)], block._content)
-        # ensuring content that we already have shouldn't cause any more data
-        # to be extracted
-        cur_len = len(block._content)
-        block._ensure_content(cur_len - 10)
-        self.assertEqual(cur_len, len(block._content))
-        # Now we want a bit more content
-        cur_len += 10
-        block._ensure_content(cur_len)
-        self.assertTrue(len(block._content) >= cur_len)
-        self.assertTrue(len(block._content) < 158634)
-        self.assertEqualDiff(content[:len(block._content)], block._content)
-        # And now lets finish
-        block._ensure_content()
+        # The first _ensure_content got all of the required data
+        block._ensure_content(158634)
         self.assertEqualDiff(content, block._content)
-        # And the decompressor is finalized
+        # And we should have released the _z_content_decompressor since it was
+        # fully consumed
         self.assertIs(None, block._z_content_decompressor)
 
     def test__dump(self):



More information about the bazaar-commits mailing list