Rev 5440: Change GroupCompressBlock to work in self._z_compress_chunks in http://bazaar.launchpad.net/~jameinel/bzr/2.3-gcb-peak-mem
John Arbash Meinel
john at arbash-meinel.com
Tue Sep 21 20:30:39 BST 2010
At http://bazaar.launchpad.net/~jameinel/bzr/2.3-gcb-peak-mem
------------------------------------------------------------
revno: 5440
revision-id: john at arbash-meinel.com-20100921193033-9ftw56og72mhlwo4
parent: pqm at pqm.ubuntu.com-20100921104823-0jks3g5o1bahesyq
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.3-gcb-peak-mem
timestamp: Tue 2010-09-21 14:30:33 -0500
message:
Change GroupCompressBlock to work in self._z_compress_chunks
This pushes down one of the peak memory locations. We still have a requirement
during commit of 1 fulltext + 2 compressed texts, but at least this code
path is now better about only using 1 fulltext and 1 compressed text.
We need to push this into more apis to get a bigger benefit.
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py 2010-08-06 18:14:22 +0000
+++ b/bzrlib/groupcompress.py 2010-09-21 19:30:33 +0000
@@ -101,7 +101,7 @@
def __init__(self):
# map by key? or just order in file?
self._compressor_name = None
- self._z_content = None
+ self._z_content_chunks = None
self._z_content_decompressor = None
self._z_content_length = None
self._content_length = None
@@ -135,26 +135,30 @@
self._content = ''.join(self._content_chunks)
self._content_chunks = None
if self._content is None:
- if self._z_content is None:
+ # We join self._z_content_chunks here, because if we are
+ # decompressing, then it is *very* likely that we have a single
+ # chunk
+ if self._z_content_chunks is None:
raise AssertionError('No content to decompress')
- if self._z_content == '':
+ z_content = ''.join(self._z_content_chunks)
+ if z_content == '':
self._content = ''
elif self._compressor_name == 'lzma':
# We don't do partial lzma decomp yet
- self._content = pylzma.decompress(self._z_content)
+ self._content = pylzma.decompress(z_content)
elif self._compressor_name == 'zlib':
# Start a zlib decompressor
if num_bytes * 4 > self._content_length * 3:
# If we are requesting more that 3/4ths of the content,
# just extract the whole thing in a single pass
num_bytes = self._content_length
- self._content = zlib.decompress(self._z_content)
+ self._content = zlib.decompress(z_content)
else:
self._z_content_decompressor = zlib.decompressobj()
# Seed the decompressor with the uncompressed bytes, so
# that the rest of the code is simplified
self._content = self._z_content_decompressor.decompress(
- self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
+ z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
if not self._z_content_decompressor.unconsumed_tail:
self._z_content_decompressor = None
else:
@@ -207,7 +211,13 @@
# XXX: Define some GCCorrupt error ?
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
(len(bytes), pos, self._z_content_length))
- self._z_content = bytes[pos:]
+ self._z_content_chunks = (bytes[pos:],)
+
+ @property
+ def _z_content(self):
+ if self._z_content_chunks is not None:
+ return ''.join(self._z_content_chunks)
+ return None
@classmethod
def from_bytes(cls, bytes):
@@ -269,13 +279,13 @@
self._content_length = length
self._content_chunks = content_chunks
self._content = None
- self._z_content = None
+ self._z_content_chunks = None
def set_content(self, content):
"""Set the content of this block."""
self._content_length = len(content)
self._content = content
- self._z_content = None
+ self._z_content_chunks = None
def _create_z_content_using_lzma(self):
if self._content_chunks is not None:
@@ -283,39 +293,49 @@
self._content_chunks = None
if self._content is None:
raise AssertionError('Nothing to compress')
- self._z_content = pylzma.compress(self._content)
- self._z_content_length = len(self._z_content)
+ z_content = pylzma.compress(self._content)
+ self._z_content_chunks = (z_content,)
+ self._z_content_length = len(z_content)
- def _create_z_content_from_chunks(self):
+ def _create_z_content_from_chunks(self, chunks):
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
- compressed_chunks = map(compressor.compress, self._content_chunks)
+ # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
+ # (measured peak is maybe 30MB over the above...)
+ compressed_chunks = map(compressor.compress, chunks)
compressed_chunks.append(compressor.flush())
- self._z_content = ''.join(compressed_chunks)
- self._z_content_length = len(self._z_content)
+ # Ignore empty chunks
+ self._z_content_chunks = [c for c in compressed_chunks if c]
+ self._z_content_length = sum(map(len, self._z_content_chunks))
def _create_z_content(self):
- if self._z_content is not None:
+ if self._z_content_chunks is not None:
return
if _USE_LZMA:
self._create_z_content_using_lzma()
return
if self._content_chunks is not None:
- self._create_z_content_from_chunks()
- return
- self._z_content = zlib.compress(self._content)
- self._z_content_length = len(self._z_content)
+ chunks = self._content_chunks
+ else:
+ chunks = (self._content,)
+ self._create_z_content_from_chunks(chunks)
- def to_bytes(self):
- """Encode the information into a byte stream."""
+ def to_chunks(self):
+ """Create the byte stream as a series of 'chunks'"""
self._create_z_content()
if _USE_LZMA:
header = self.GCB_LZ_HEADER
else:
header = self.GCB_HEADER
- chunks = [header,
- '%d\n%d\n' % (self._z_content_length, self._content_length),
- self._z_content,
+ chunks = ['%s%d\n%d\n'
+ % (header, self._z_content_length, self._content_length),
]
+ chunks.extend(self._z_content_chunks)
+ total_len = sum(map(len, chunks))
+ return total_len, chunks
+
+ def to_bytes(self):
+ """Encode the information into a byte stream."""
+ total_len, chunks = self.to_chunks()
return ''.join(chunks)
def _dump(self, include_text=False):
@@ -679,18 +699,21 @@
z_header_bytes = zlib.compress(header_bytes)
del header_bytes
z_header_bytes_len = len(z_header_bytes)
- block_bytes = self._block.to_bytes()
+ block_bytes_len, block_chunks = self._block.to_chunks()
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
- len(block_bytes)))
+ block_bytes_len))
lines.append(z_header_bytes)
- lines.append(block_bytes)
- del z_header_bytes, block_bytes
+ lines.extend(block_chunks)
+ del z_header_bytes, block_chunks
+ # TODO: This is a point where we will double the memory consumption. To
+ # avoid this, we probably have to switch to a 'chunked' api
return ''.join(lines)
@classmethod
def from_bytes(cls, bytes):
# TODO: This does extra string copying, probably better to do it a
- # different way
+ # different way. At a minimum this creates 2 copies of the
+ # compressed content
(storage_kind, z_header_len, header_len,
block_len, rest) = bytes.split('\n', 4)
del bytes
@@ -854,14 +877,6 @@
After calling this, the compressor should no longer be used
"""
- # TODO: this causes us to 'bloat' to 2x the size of content in the
- # group. This has an impact for 'commit' of large objects.
- # One possibility is to use self._content_chunks, and be lazy and
- # only fill out self._content as a full string when we actually
- # need it. That would at least drop the peak memory consumption
- # for 'commit' down to ~1x the size of the largest file, at a
- # cost of increased complexity within this code. 2x is still <<
- # 3x the size of the largest file, so we are doing ok.
self._block.set_chunked_content(self.chunks, self.endpoint)
self.chunks = None
self._delta_index = None
@@ -1630,8 +1645,19 @@
self._unadded_refs = {}
keys_to_add = []
def flush():
- bytes = self._compressor.flush().to_bytes()
+ bytes_len, chunks = self._compressor.flush().to_chunks()
self._compressor = GroupCompressor()
+ # Note: At this point we still have 1 copy of the fulltext (in
+ # record and the var 'bytes'), and this generates 2 copies of
+ # the compressed text (one for bytes, one in chunks)
+ # TODO: Push 'chunks' down into the _access api, so that we don't
+ # have to double compressed memory here
+ # TODO: Figure out how to indicate that we would be happy to free
+ # the fulltext content at this point. Note that sometimes we
+ # will want it later (streaming CHK pages), but most of the
+ # time we won't (everything else)
+ bytes = ''.join(chunks)
+ del chunks
index, start, length = self._access.add_raw_records(
[(None, len(bytes))], bytes)[0]
nodes = []
=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py 2010-08-05 16:27:35 +0000
+++ b/bzrlib/tests/test_groupcompress.py 2010-09-21 19:30:33 +0000
@@ -347,6 +347,30 @@
self.assertEqual(z_content, block._z_content)
self.assertEqual(content, block._content)
+ def test_to_chunks(self):
+ content_chunks = ['this is some content\n',
+ 'this content will be compressed\n']
+ content_len = sum(map(len, content_chunks))
+ content = ''.join(content_chunks)
+ gcb = groupcompress.GroupCompressBlock()
+ gcb.set_chunked_content(content_chunks, content_len)
+ total_len, block_chunks = gcb.to_chunks()
+ block_bytes = ''.join(block_chunks)
+ self.assertEqual(gcb._z_content_length, len(gcb._z_content))
+ self.assertEqual(total_len, len(block_bytes))
+ self.assertEqual(gcb._content_length, content_len)
+ expected_header =('gcb1z\n' # group compress block v1 zlib
+ '%d\n' # Length of compressed content
+ '%d\n' # Length of uncompressed content
+ ) % (gcb._z_content_length, gcb._content_length)
+ # The first chunk should be the header chunk. It is small, fixed size,
+ # and there is no compelling reason to split it up
+ self.assertEqual(expected_header, block_chunks[0])
+ self.assertStartsWith(block_bytes, expected_header)
+ remaining_bytes = block_bytes[len(expected_header):]
+ raw_bytes = zlib.decompress(remaining_bytes)
+ self.assertEqual(content, raw_bytes)
+
def test_to_bytes(self):
content = ('this is some content\n'
'this content will be compressed\n')
@@ -389,7 +413,7 @@
z_content = zlib.compress(content)
self.assertEqual(57182, len(z_content))
block = groupcompress.GroupCompressBlock()
- block._z_content = z_content
+ block._z_content_chunks = (z_content,)
block._z_content_length = len(z_content)
block._compressor_name = 'zlib'
block._content_length = 158634
@@ -434,7 +458,7 @@
z_content = zlib.compress(content)
self.assertEqual(57182, len(z_content))
block = groupcompress.GroupCompressBlock()
- block._z_content = z_content
+ block._z_content_chunks = (z_content,)
block._z_content_length = len(z_content)
block._compressor_name = 'zlib'
block._content_length = 158634
More information about the bazaar-commits
mailing list