Rev 5451: (jam) Reduce the peak memory in GroupCompressBlock in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Wed Sep 29 21:43:20 BST 2010


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 5451 [merge]
revision-id: pqm at pqm.ubuntu.com-20100929204319-s4pxrbqu9osgas9f
parent: pqm at pqm.ubuntu.com-20100929095623-hbeqc58mblx1or7v
parent: john at arbash-meinel.com-20100929200127-9ay2lurvs66mvie4
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Wed 2010-09-29 21:43:19 +0100
message:
  (jam) Reduce the peak memory in GroupCompressBlock
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
  bzrlib/tests/per_lock/test_lock.py test_lock.py-20070313190612-mfpoa7t8kvrgrhj2-1
  bzrlib/tests/test_groupcompress.py test_groupcompress.p-20080705181503-ccbxd6xuy1bdnrpu-13
=== modified file 'NEWS'
--- a/NEWS	2010-09-29 09:56:23 +0000
+++ b/NEWS	2010-09-29 20:01:27 +0000
@@ -48,6 +48,9 @@
 Internals
 *********
 
+* Small change to GroupCompressBlock to work more in terms of 'chunks'
+  rather than 'content' for its compressed storage. (John Arbash Meinel)
+
 * When running ``bzr selftest --subunit`` the subunit stream will no
   longer include the "log" information for tests which are considered to
   be 'successes' (success, xfail, skip, etc) (John Arbash Meinel)

=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2010-08-06 18:14:22 +0000
+++ b/bzrlib/groupcompress.py	2010-09-26 20:45:28 +0000
@@ -101,7 +101,7 @@
     def __init__(self):
         # map by key? or just order in file?
         self._compressor_name = None
-        self._z_content = None
+        self._z_content_chunks = None
         self._z_content_decompressor = None
         self._z_content_length = None
         self._content_length = None
@@ -135,26 +135,30 @@
                 self._content = ''.join(self._content_chunks)
                 self._content_chunks = None
         if self._content is None:
-            if self._z_content is None:
+            # We join self._z_content_chunks here, because if we are
+            # decompressing, then it is *very* likely that we have a single
+            # chunk
+            if self._z_content_chunks is None:
                 raise AssertionError('No content to decompress')
-            if self._z_content == '':
+            z_content = ''.join(self._z_content_chunks)
+            if z_content == '':
                 self._content = ''
             elif self._compressor_name == 'lzma':
                 # We don't do partial lzma decomp yet
-                self._content = pylzma.decompress(self._z_content)
+                self._content = pylzma.decompress(z_content)
             elif self._compressor_name == 'zlib':
                 # Start a zlib decompressor
                 if num_bytes * 4 > self._content_length * 3:
                     # If we are requesting more that 3/4ths of the content,
                     # just extract the whole thing in a single pass
                     num_bytes = self._content_length
-                    self._content = zlib.decompress(self._z_content)
+                    self._content = zlib.decompress(z_content)
                 else:
                     self._z_content_decompressor = zlib.decompressobj()
                     # Seed the decompressor with the uncompressed bytes, so
                     # that the rest of the code is simplified
                     self._content = self._z_content_decompressor.decompress(
-                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
+                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
                     if not self._z_content_decompressor.unconsumed_tail:
                         self._z_content_decompressor = None
             else:
@@ -207,7 +211,17 @@
             # XXX: Define some GCCorrupt error ?
             raise AssertionError('Invalid bytes: (%d) != %d + %d' %
                                  (len(bytes), pos, self._z_content_length))
-        self._z_content = bytes[pos:]
+        self._z_content_chunks = (bytes[pos:],)
+
+    @property
+    def _z_content(self):
+        """Return z_content_chunks as a simple string.
+
+        Meant only to be used by the test suite.
+        """
+        if self._z_content_chunks is not None:
+            return ''.join(self._z_content_chunks)
+        return None
 
     @classmethod
     def from_bytes(cls, bytes):
@@ -269,13 +283,13 @@
         self._content_length = length
         self._content_chunks = content_chunks
         self._content = None
-        self._z_content = None
+        self._z_content_chunks = None
 
     def set_content(self, content):
         """Set the content of this block."""
         self._content_length = len(content)
         self._content = content
-        self._z_content = None
+        self._z_content_chunks = None
 
     def _create_z_content_using_lzma(self):
         if self._content_chunks is not None:
@@ -283,39 +297,49 @@
             self._content_chunks = None
         if self._content is None:
             raise AssertionError('Nothing to compress')
-        self._z_content = pylzma.compress(self._content)
-        self._z_content_length = len(self._z_content)
+        z_content = pylzma.compress(self._content)
+        self._z_content_chunks = (z_content,)
+        self._z_content_length = len(z_content)
 
-    def _create_z_content_from_chunks(self):
+    def _create_z_content_from_chunks(self, chunks):
         compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
-        compressed_chunks = map(compressor.compress, self._content_chunks)
+        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
+        # (measured peak is maybe 30MB over the above...)
+        compressed_chunks = map(compressor.compress, chunks)
         compressed_chunks.append(compressor.flush())
-        self._z_content = ''.join(compressed_chunks)
-        self._z_content_length = len(self._z_content)
+        # Ignore empty chunks
+        self._z_content_chunks = [c for c in compressed_chunks if c]
+        self._z_content_length = sum(map(len, self._z_content_chunks))
 
     def _create_z_content(self):
-        if self._z_content is not None:
+        if self._z_content_chunks is not None:
             return
         if _USE_LZMA:
             self._create_z_content_using_lzma()
             return
         if self._content_chunks is not None:
-            self._create_z_content_from_chunks()
-            return
-        self._z_content = zlib.compress(self._content)
-        self._z_content_length = len(self._z_content)
+            chunks = self._content_chunks
+        else:
+            chunks = (self._content,)
+        self._create_z_content_from_chunks(chunks)
 
-    def to_bytes(self):
-        """Encode the information into a byte stream."""
+    def to_chunks(self):
+        """Create the byte stream as a series of 'chunks'"""
         self._create_z_content()
         if _USE_LZMA:
             header = self.GCB_LZ_HEADER
         else:
             header = self.GCB_HEADER
-        chunks = [header,
-                  '%d\n%d\n' % (self._z_content_length, self._content_length),
-                  self._z_content,
+        chunks = ['%s%d\n%d\n'
+                  % (header, self._z_content_length, self._content_length),
                  ]
+        chunks.extend(self._z_content_chunks)
+        total_len = sum(map(len, chunks))
+        return total_len, chunks
+
+    def to_bytes(self):
+        """Encode the information into a byte stream."""
+        total_len, chunks = self.to_chunks()
         return ''.join(chunks)
 
     def _dump(self, include_text=False):
@@ -679,18 +703,21 @@
         z_header_bytes = zlib.compress(header_bytes)
         del header_bytes
         z_header_bytes_len = len(z_header_bytes)
-        block_bytes = self._block.to_bytes()
+        block_bytes_len, block_chunks = self._block.to_chunks()
         lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
-                                       len(block_bytes)))
+                                       block_bytes_len))
         lines.append(z_header_bytes)
-        lines.append(block_bytes)
-        del z_header_bytes, block_bytes
+        lines.extend(block_chunks)
+        del z_header_bytes, block_chunks
+        # TODO: This is a point where we will double the memory consumption. To
+        #       avoid this, we probably have to switch to a 'chunked' api
         return ''.join(lines)
 
     @classmethod
     def from_bytes(cls, bytes):
         # TODO: This does extra string copying, probably better to do it a
-        #       different way
+        #       different way. At a minimum this creates 2 copies of the
+        #       compressed content
         (storage_kind, z_header_len, header_len,
          block_len, rest) = bytes.split('\n', 4)
         del bytes
@@ -854,14 +881,6 @@
 
         After calling this, the compressor should no longer be used
         """
-        # TODO: this causes us to 'bloat' to 2x the size of content in the
-        #       group. This has an impact for 'commit' of large objects.
-        #       One possibility is to use self._content_chunks, and be lazy and
-        #       only fill out self._content as a full string when we actually
-        #       need it. That would at least drop the peak memory consumption
-        #       for 'commit' down to ~1x the size of the largest file, at a
-        #       cost of increased complexity within this code. 2x is still <<
-        #       3x the size of the largest file, so we are doing ok.
         self._block.set_chunked_content(self.chunks, self.endpoint)
         self.chunks = None
         self._delta_index = None
@@ -1630,8 +1649,19 @@
         self._unadded_refs = {}
         keys_to_add = []
         def flush():
-            bytes = self._compressor.flush().to_bytes()
+            bytes_len, chunks = self._compressor.flush().to_chunks()
             self._compressor = GroupCompressor()
+            # Note: At this point we still have 1 copy of the fulltext (in
+            #       record and the var 'bytes'), and this generates 2 copies of
+            #       the compressed text (one for bytes, one in chunks)
+            # TODO: Push 'chunks' down into the _access api, so that we don't
+            #       have to double compressed memory here
+            # TODO: Figure out how to indicate that we would be happy to free
+            #       the fulltext content at this point. Note that sometimes we
+            #       will want it later (streaming CHK pages), but most of the
+            #       time we won't (everything else)
+            bytes = ''.join(chunks)
+            del chunks
             index, start, length = self._access.add_raw_records(
                 [(None, len(bytes))], bytes)[0]
             nodes = []

=== modified file 'bzrlib/tests/per_lock/test_lock.py'
--- a/bzrlib/tests/per_lock/test_lock.py	2010-09-23 16:37:27 +0000
+++ b/bzrlib/tests/per_lock/test_lock.py	2010-09-29 20:01:27 +0000
@@ -1,4 +1,4 @@
-# Copyright (C) 2007 Canonical Ltd
+# Copyright (C) 2007, 2009, 2010 Canonical Ltd
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by

=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py	2010-08-05 16:27:35 +0000
+++ b/bzrlib/tests/test_groupcompress.py	2010-09-21 19:30:33 +0000
@@ -347,6 +347,30 @@
         self.assertEqual(z_content, block._z_content)
         self.assertEqual(content, block._content)
 
+    def test_to_chunks(self):
+        content_chunks = ['this is some content\n',
+                          'this content will be compressed\n']
+        content_len = sum(map(len, content_chunks))
+        content = ''.join(content_chunks)
+        gcb = groupcompress.GroupCompressBlock()
+        gcb.set_chunked_content(content_chunks, content_len)
+        total_len, block_chunks = gcb.to_chunks()
+        block_bytes = ''.join(block_chunks)
+        self.assertEqual(gcb._z_content_length, len(gcb._z_content))
+        self.assertEqual(total_len, len(block_bytes))
+        self.assertEqual(gcb._content_length, content_len)
+        expected_header =('gcb1z\n' # group compress block v1 zlib
+                          '%d\n' # Length of compressed content
+                          '%d\n' # Length of uncompressed content
+                         ) % (gcb._z_content_length, gcb._content_length)
+        # The first chunk should be the header chunk. It is small, fixed size,
+        # and there is no compelling reason to split it up
+        self.assertEqual(expected_header, block_chunks[0])
+        self.assertStartsWith(block_bytes, expected_header)
+        remaining_bytes = block_bytes[len(expected_header):]
+        raw_bytes = zlib.decompress(remaining_bytes)
+        self.assertEqual(content, raw_bytes)
+
     def test_to_bytes(self):
         content = ('this is some content\n'
                    'this content will be compressed\n')
@@ -389,7 +413,7 @@
         z_content = zlib.compress(content)
         self.assertEqual(57182, len(z_content))
         block = groupcompress.GroupCompressBlock()
-        block._z_content = z_content
+        block._z_content_chunks = (z_content,)
         block._z_content_length = len(z_content)
         block._compressor_name = 'zlib'
         block._content_length = 158634
@@ -434,7 +458,7 @@
         z_content = zlib.compress(content)
         self.assertEqual(57182, len(z_content))
         block = groupcompress.GroupCompressBlock()
-        block._z_content = z_content
+        block._z_content_chunks = (z_content,)
         block._z_content_length = len(z_content)
         block._compressor_name = 'zlib'
         block._content_length = 158634




More information about the bazaar-commits mailing list