Rev 5440: Change GroupCompressBlock to work in self._z_compress_chunks in http://bazaar.launchpad.net/~jameinel/bzr/2.3-gcb-peak-mem

John Arbash Meinel john at arbash-meinel.com
Tue Sep 21 20:30:39 BST 2010


At http://bazaar.launchpad.net/~jameinel/bzr/2.3-gcb-peak-mem

------------------------------------------------------------
revno: 5440
revision-id: john at arbash-meinel.com-20100921193033-9ftw56og72mhlwo4
parent: pqm at pqm.ubuntu.com-20100921104823-0jks3g5o1bahesyq
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.3-gcb-peak-mem
timestamp: Tue 2010-09-21 14:30:33 -0500
message:
  Change GroupCompressBlock to work in self._z_compress_chunks
  
  This pushes down one of the peak memory locations. We still have a requirement
  during commit of 1 fulltext + 2 compressed texts, but at least this code
  path is now better about only using 1 fulltext and 1 compressed text.
  We need to push this into more apis to get a bigger benefit.
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2010-08-06 18:14:22 +0000
+++ b/bzrlib/groupcompress.py	2010-09-21 19:30:33 +0000
@@ -101,7 +101,7 @@
     def __init__(self):
         # map by key? or just order in file?
         self._compressor_name = None
-        self._z_content = None
+        self._z_content_chunks = None
         self._z_content_decompressor = None
         self._z_content_length = None
         self._content_length = None
@@ -135,26 +135,30 @@
                 self._content = ''.join(self._content_chunks)
                 self._content_chunks = None
         if self._content is None:
-            if self._z_content is None:
+            # We join self._z_content_chunks here, because if we are
+            # decompressing, then it is *very* likely that we have a single
+            # chunk
+            if self._z_content_chunks is None:
                 raise AssertionError('No content to decompress')
-            if self._z_content == '':
+            z_content = ''.join(self._z_content_chunks)
+            if z_content == '':
                 self._content = ''
             elif self._compressor_name == 'lzma':
                 # We don't do partial lzma decomp yet
-                self._content = pylzma.decompress(self._z_content)
+                self._content = pylzma.decompress(z_content)
             elif self._compressor_name == 'zlib':
                 # Start a zlib decompressor
                 if num_bytes * 4 > self._content_length * 3:
                     # If we are requesting more that 3/4ths of the content,
                     # just extract the whole thing in a single pass
                     num_bytes = self._content_length
-                    self._content = zlib.decompress(self._z_content)
+                    self._content = zlib.decompress(z_content)
                 else:
                     self._z_content_decompressor = zlib.decompressobj()
                     # Seed the decompressor with the uncompressed bytes, so
                     # that the rest of the code is simplified
                     self._content = self._z_content_decompressor.decompress(
-                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
+                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
                     if not self._z_content_decompressor.unconsumed_tail:
                         self._z_content_decompressor = None
             else:
@@ -207,7 +211,13 @@
             # XXX: Define some GCCorrupt error ?
             raise AssertionError('Invalid bytes: (%d) != %d + %d' %
                                  (len(bytes), pos, self._z_content_length))
-        self._z_content = bytes[pos:]
+        self._z_content_chunks = (bytes[pos:],)
+
+    @property
+    def _z_content(self):
+        if self._z_content_chunks is not None:
+            return ''.join(self._z_content_chunks)
+        return None
 
     @classmethod
     def from_bytes(cls, bytes):
@@ -269,13 +279,13 @@
         self._content_length = length
         self._content_chunks = content_chunks
         self._content = None
-        self._z_content = None
+        self._z_content_chunks = None
 
     def set_content(self, content):
         """Set the content of this block."""
         self._content_length = len(content)
         self._content = content
-        self._z_content = None
+        self._z_content_chunks = None
 
     def _create_z_content_using_lzma(self):
         if self._content_chunks is not None:
@@ -283,39 +293,49 @@
             self._content_chunks = None
         if self._content is None:
             raise AssertionError('Nothing to compress')
-        self._z_content = pylzma.compress(self._content)
-        self._z_content_length = len(self._z_content)
+        z_content = pylzma.compress(self._content)
+        self._z_content_chunks = (z_content,)
+        self._z_content_length = len(z_content)
 
-    def _create_z_content_from_chunks(self):
+    def _create_z_content_from_chunks(self, chunks):
         compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
-        compressed_chunks = map(compressor.compress, self._content_chunks)
+        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
+        # (measured peak is maybe 30MB over the above...)
+        compressed_chunks = map(compressor.compress, chunks)
         compressed_chunks.append(compressor.flush())
-        self._z_content = ''.join(compressed_chunks)
-        self._z_content_length = len(self._z_content)
+        # Ignore empty chunks
+        self._z_content_chunks = [c for c in compressed_chunks if c]
+        self._z_content_length = sum(map(len, self._z_content_chunks))
 
     def _create_z_content(self):
-        if self._z_content is not None:
+        if self._z_content_chunks is not None:
             return
         if _USE_LZMA:
             self._create_z_content_using_lzma()
             return
         if self._content_chunks is not None:
-            self._create_z_content_from_chunks()
-            return
-        self._z_content = zlib.compress(self._content)
-        self._z_content_length = len(self._z_content)
+            chunks = self._content_chunks
+        else:
+            chunks = (self._content,)
+        self._create_z_content_from_chunks(chunks)
 
-    def to_bytes(self):
-        """Encode the information into a byte stream."""
+    def to_chunks(self):
+        """Create the byte stream as a series of 'chunks'"""
         self._create_z_content()
         if _USE_LZMA:
             header = self.GCB_LZ_HEADER
         else:
             header = self.GCB_HEADER
-        chunks = [header,
-                  '%d\n%d\n' % (self._z_content_length, self._content_length),
-                  self._z_content,
+        chunks = ['%s%d\n%d\n'
+                  % (header, self._z_content_length, self._content_length),
                  ]
+        chunks.extend(self._z_content_chunks)
+        total_len = sum(map(len, chunks))
+        return total_len, chunks
+
+    def to_bytes(self):
+        """Encode the information into a byte stream."""
+        total_len, chunks = self.to_chunks()
         return ''.join(chunks)
 
     def _dump(self, include_text=False):
@@ -679,18 +699,21 @@
         z_header_bytes = zlib.compress(header_bytes)
         del header_bytes
         z_header_bytes_len = len(z_header_bytes)
-        block_bytes = self._block.to_bytes()
+        block_bytes_len, block_chunks = self._block.to_chunks()
         lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
-                                       len(block_bytes)))
+                                       block_bytes_len))
         lines.append(z_header_bytes)
-        lines.append(block_bytes)
-        del z_header_bytes, block_bytes
+        lines.extend(block_chunks)
+        del z_header_bytes, block_chunks
+        # TODO: This is a point where we will double the memory consumption. To
+        #       avoid this, we probably have to switch to a 'chunked' api
         return ''.join(lines)
 
     @classmethod
     def from_bytes(cls, bytes):
         # TODO: This does extra string copying, probably better to do it a
-        #       different way
+        #       different way. At a minimum this creates 2 copies of the
+        #       compressed content
         (storage_kind, z_header_len, header_len,
          block_len, rest) = bytes.split('\n', 4)
         del bytes
@@ -854,14 +877,6 @@
 
         After calling this, the compressor should no longer be used
         """
-        # TODO: this causes us to 'bloat' to 2x the size of content in the
-        #       group. This has an impact for 'commit' of large objects.
-        #       One possibility is to use self._content_chunks, and be lazy and
-        #       only fill out self._content as a full string when we actually
-        #       need it. That would at least drop the peak memory consumption
-        #       for 'commit' down to ~1x the size of the largest file, at a
-        #       cost of increased complexity within this code. 2x is still <<
-        #       3x the size of the largest file, so we are doing ok.
         self._block.set_chunked_content(self.chunks, self.endpoint)
         self.chunks = None
         self._delta_index = None
@@ -1630,8 +1645,19 @@
         self._unadded_refs = {}
         keys_to_add = []
         def flush():
-            bytes = self._compressor.flush().to_bytes()
+            bytes_len, chunks = self._compressor.flush().to_chunks()
             self._compressor = GroupCompressor()
+            # Note: At this point we still have 1 copy of the fulltext (in
+            #       record and the var 'bytes'), and this generates 2 copies of
+            #       the compressed text (one for bytes, one in chunks)
+            # TODO: Push 'chunks' down into the _access api, so that we don't
+            #       have to double compressed memory here
+            # TODO: Figure out how to indicate that we would be happy to free
+            #       the fulltext content at this point. Note that sometimes we
+            #       will want it later (streaming CHK pages), but most of the
+            #       time we won't (everything else)
+            bytes = ''.join(chunks)
+            del chunks
             index, start, length = self._access.add_raw_records(
                 [(None, len(bytes))], bytes)[0]
             nodes = []

=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py	2010-08-05 16:27:35 +0000
+++ b/bzrlib/tests/test_groupcompress.py	2010-09-21 19:30:33 +0000
@@ -347,6 +347,30 @@
         self.assertEqual(z_content, block._z_content)
         self.assertEqual(content, block._content)
 
+    def test_to_chunks(self):
+        content_chunks = ['this is some content\n',
+                          'this content will be compressed\n']
+        content_len = sum(map(len, content_chunks))
+        content = ''.join(content_chunks)
+        gcb = groupcompress.GroupCompressBlock()
+        gcb.set_chunked_content(content_chunks, content_len)
+        total_len, block_chunks = gcb.to_chunks()
+        block_bytes = ''.join(block_chunks)
+        self.assertEqual(gcb._z_content_length, len(gcb._z_content))
+        self.assertEqual(total_len, len(block_bytes))
+        self.assertEqual(gcb._content_length, content_len)
+        expected_header =('gcb1z\n' # group compress block v1 zlib
+                          '%d\n' # Length of compressed content
+                          '%d\n' # Length of uncompressed content
+                         ) % (gcb._z_content_length, gcb._content_length)
+        # The first chunk should be the header chunk. It is small, fixed size,
+        # and there is no compelling reason to split it up
+        self.assertEqual(expected_header, block_chunks[0])
+        self.assertStartsWith(block_bytes, expected_header)
+        remaining_bytes = block_bytes[len(expected_header):]
+        raw_bytes = zlib.decompress(remaining_bytes)
+        self.assertEqual(content, raw_bytes)
+
     def test_to_bytes(self):
         content = ('this is some content\n'
                    'this content will be compressed\n')
@@ -389,7 +413,7 @@
         z_content = zlib.compress(content)
         self.assertEqual(57182, len(z_content))
         block = groupcompress.GroupCompressBlock()
-        block._z_content = z_content
+        block._z_content_chunks = (z_content,)
         block._z_content_length = len(z_content)
         block._compressor_name = 'zlib'
         block._content_length = 158634
@@ -434,7 +458,7 @@
         z_content = zlib.compress(content)
         self.assertEqual(57182, len(z_content))
         block = groupcompress.GroupCompressBlock()
-        block._z_content = z_content
+        block._z_content_chunks = (z_content,)
         block._z_content_length = len(z_content)
         block._compressor_name = 'zlib'
         block._content_length = 158634



More information about the bazaar-commits mailing list