Rev 5451: (jam) Reduce the peak memory in GroupCompressBlock in file:///home/pqm/archives/thelove/bzr/%2Btrunk/
Canonical.com Patch Queue Manager
pqm at pqm.ubuntu.com
Wed Sep 29 21:43:20 BST 2010
At file:///home/pqm/archives/thelove/bzr/%2Btrunk/
------------------------------------------------------------
revno: 5451 [merge]
revision-id: pqm at pqm.ubuntu.com-20100929204319-s4pxrbqu9osgas9f
parent: pqm at pqm.ubuntu.com-20100929095623-hbeqc58mblx1or7v
parent: john at arbash-meinel.com-20100929200127-9ay2lurvs66mvie4
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Wed 2010-09-29 21:43:19 +0100
message:
(jam) Reduce the peak memory in GroupCompressBlock
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/groupcompress.py groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
bzrlib/tests/per_lock/test_lock.py test_lock.py-20070313190612-mfpoa7t8kvrgrhj2-1
bzrlib/tests/test_groupcompress.py test_groupcompress.p-20080705181503-ccbxd6xuy1bdnrpu-13
=== modified file 'NEWS'
--- a/NEWS 2010-09-29 09:56:23 +0000
+++ b/NEWS 2010-09-29 20:01:27 +0000
@@ -48,6 +48,9 @@
Internals
*********
+* Small change to GroupCompressBlock to work more in terms of 'chunks'
+ rather than 'content' for its compressed storage. (John Arbash Meinel)
+
* When running ``bzr selftest --subunit`` the subunit stream will no
longer include the "log" information for tests which are considered to
be 'successes' (success, xfail, skip, etc) (John Arbash Meinel)
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py 2010-08-06 18:14:22 +0000
+++ b/bzrlib/groupcompress.py 2010-09-26 20:45:28 +0000
@@ -101,7 +101,7 @@
def __init__(self):
# map by key? or just order in file?
self._compressor_name = None
- self._z_content = None
+ self._z_content_chunks = None
self._z_content_decompressor = None
self._z_content_length = None
self._content_length = None
@@ -135,26 +135,30 @@
self._content = ''.join(self._content_chunks)
self._content_chunks = None
if self._content is None:
- if self._z_content is None:
+ # We join self._z_content_chunks here, because if we are
+ # decompressing, then it is *very* likely that we have a single
+ # chunk
+ if self._z_content_chunks is None:
raise AssertionError('No content to decompress')
- if self._z_content == '':
+ z_content = ''.join(self._z_content_chunks)
+ if z_content == '':
self._content = ''
elif self._compressor_name == 'lzma':
# We don't do partial lzma decomp yet
- self._content = pylzma.decompress(self._z_content)
+ self._content = pylzma.decompress(z_content)
elif self._compressor_name == 'zlib':
# Start a zlib decompressor
if num_bytes * 4 > self._content_length * 3:
# If we are requesting more that 3/4ths of the content,
# just extract the whole thing in a single pass
num_bytes = self._content_length
- self._content = zlib.decompress(self._z_content)
+ self._content = zlib.decompress(z_content)
else:
self._z_content_decompressor = zlib.decompressobj()
# Seed the decompressor with the uncompressed bytes, so
# that the rest of the code is simplified
self._content = self._z_content_decompressor.decompress(
- self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
+ z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
if not self._z_content_decompressor.unconsumed_tail:
self._z_content_decompressor = None
else:
@@ -207,7 +211,17 @@
# XXX: Define some GCCorrupt error ?
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
(len(bytes), pos, self._z_content_length))
- self._z_content = bytes[pos:]
+ self._z_content_chunks = (bytes[pos:],)
+
+ @property
+ def _z_content(self):
+ """Return z_content_chunks as a simple string.
+
+ Meant only to be used by the test suite.
+ """
+ if self._z_content_chunks is not None:
+ return ''.join(self._z_content_chunks)
+ return None
@classmethod
def from_bytes(cls, bytes):
@@ -269,13 +283,13 @@
self._content_length = length
self._content_chunks = content_chunks
self._content = None
- self._z_content = None
+ self._z_content_chunks = None
def set_content(self, content):
"""Set the content of this block."""
self._content_length = len(content)
self._content = content
- self._z_content = None
+ self._z_content_chunks = None
def _create_z_content_using_lzma(self):
if self._content_chunks is not None:
@@ -283,39 +297,49 @@
self._content_chunks = None
if self._content is None:
raise AssertionError('Nothing to compress')
- self._z_content = pylzma.compress(self._content)
- self._z_content_length = len(self._z_content)
+ z_content = pylzma.compress(self._content)
+ self._z_content_chunks = (z_content,)
+ self._z_content_length = len(z_content)
- def _create_z_content_from_chunks(self):
+ def _create_z_content_from_chunks(self, chunks):
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
- compressed_chunks = map(compressor.compress, self._content_chunks)
+ # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
+ # (measured peak is maybe 30MB over the above...)
+ compressed_chunks = map(compressor.compress, chunks)
compressed_chunks.append(compressor.flush())
- self._z_content = ''.join(compressed_chunks)
- self._z_content_length = len(self._z_content)
+ # Ignore empty chunks
+ self._z_content_chunks = [c for c in compressed_chunks if c]
+ self._z_content_length = sum(map(len, self._z_content_chunks))
def _create_z_content(self):
- if self._z_content is not None:
+ if self._z_content_chunks is not None:
return
if _USE_LZMA:
self._create_z_content_using_lzma()
return
if self._content_chunks is not None:
- self._create_z_content_from_chunks()
- return
- self._z_content = zlib.compress(self._content)
- self._z_content_length = len(self._z_content)
+ chunks = self._content_chunks
+ else:
+ chunks = (self._content,)
+ self._create_z_content_from_chunks(chunks)
- def to_bytes(self):
- """Encode the information into a byte stream."""
+ def to_chunks(self):
+ """Create the byte stream as a series of 'chunks'"""
self._create_z_content()
if _USE_LZMA:
header = self.GCB_LZ_HEADER
else:
header = self.GCB_HEADER
- chunks = [header,
- '%d\n%d\n' % (self._z_content_length, self._content_length),
- self._z_content,
+ chunks = ['%s%d\n%d\n'
+ % (header, self._z_content_length, self._content_length),
]
+ chunks.extend(self._z_content_chunks)
+ total_len = sum(map(len, chunks))
+ return total_len, chunks
+
+ def to_bytes(self):
+ """Encode the information into a byte stream."""
+ total_len, chunks = self.to_chunks()
return ''.join(chunks)
def _dump(self, include_text=False):
@@ -679,18 +703,21 @@
z_header_bytes = zlib.compress(header_bytes)
del header_bytes
z_header_bytes_len = len(z_header_bytes)
- block_bytes = self._block.to_bytes()
+ block_bytes_len, block_chunks = self._block.to_chunks()
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
- len(block_bytes)))
+ block_bytes_len))
lines.append(z_header_bytes)
- lines.append(block_bytes)
- del z_header_bytes, block_bytes
+ lines.extend(block_chunks)
+ del z_header_bytes, block_chunks
+ # TODO: This is a point where we will double the memory consumption. To
+ # avoid this, we probably have to switch to a 'chunked' api
return ''.join(lines)
@classmethod
def from_bytes(cls, bytes):
# TODO: This does extra string copying, probably better to do it a
- # different way
+ # different way. At a minimum this creates 2 copies of the
+ # compressed content
(storage_kind, z_header_len, header_len,
block_len, rest) = bytes.split('\n', 4)
del bytes
@@ -854,14 +881,6 @@
After calling this, the compressor should no longer be used
"""
- # TODO: this causes us to 'bloat' to 2x the size of content in the
- # group. This has an impact for 'commit' of large objects.
- # One possibility is to use self._content_chunks, and be lazy and
- # only fill out self._content as a full string when we actually
- # need it. That would at least drop the peak memory consumption
- # for 'commit' down to ~1x the size of the largest file, at a
- # cost of increased complexity within this code. 2x is still <<
- # 3x the size of the largest file, so we are doing ok.
self._block.set_chunked_content(self.chunks, self.endpoint)
self.chunks = None
self._delta_index = None
@@ -1630,8 +1649,19 @@
self._unadded_refs = {}
keys_to_add = []
def flush():
- bytes = self._compressor.flush().to_bytes()
+ bytes_len, chunks = self._compressor.flush().to_chunks()
self._compressor = GroupCompressor()
+ # Note: At this point we still have 1 copy of the fulltext (in
+ # record and the var 'bytes'), and this generates 2 copies of
+ # the compressed text (one for bytes, one in chunks)
+ # TODO: Push 'chunks' down into the _access api, so that we don't
+ # have to double compressed memory here
+ # TODO: Figure out how to indicate that we would be happy to free
+ # the fulltext content at this point. Note that sometimes we
+ # will want it later (streaming CHK pages), but most of the
+ # time we won't (everything else)
+ bytes = ''.join(chunks)
+ del chunks
index, start, length = self._access.add_raw_records(
[(None, len(bytes))], bytes)[0]
nodes = []
=== modified file 'bzrlib/tests/per_lock/test_lock.py'
--- a/bzrlib/tests/per_lock/test_lock.py 2010-09-23 16:37:27 +0000
+++ b/bzrlib/tests/per_lock/test_lock.py 2010-09-29 20:01:27 +0000
@@ -1,4 +1,4 @@
-# Copyright (C) 2007 Canonical Ltd
+# Copyright (C) 2007, 2009, 2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py 2010-08-05 16:27:35 +0000
+++ b/bzrlib/tests/test_groupcompress.py 2010-09-21 19:30:33 +0000
@@ -347,6 +347,30 @@
self.assertEqual(z_content, block._z_content)
self.assertEqual(content, block._content)
+ def test_to_chunks(self):
+ content_chunks = ['this is some content\n',
+ 'this content will be compressed\n']
+ content_len = sum(map(len, content_chunks))
+ content = ''.join(content_chunks)
+ gcb = groupcompress.GroupCompressBlock()
+ gcb.set_chunked_content(content_chunks, content_len)
+ total_len, block_chunks = gcb.to_chunks()
+ block_bytes = ''.join(block_chunks)
+ self.assertEqual(gcb._z_content_length, len(gcb._z_content))
+ self.assertEqual(total_len, len(block_bytes))
+ self.assertEqual(gcb._content_length, content_len)
+ expected_header =('gcb1z\n' # group compress block v1 zlib
+ '%d\n' # Length of compressed content
+ '%d\n' # Length of uncompressed content
+ ) % (gcb._z_content_length, gcb._content_length)
+ # The first chunk should be the header chunk. It is small, fixed size,
+ # and there is no compelling reason to split it up
+ self.assertEqual(expected_header, block_chunks[0])
+ self.assertStartsWith(block_bytes, expected_header)
+ remaining_bytes = block_bytes[len(expected_header):]
+ raw_bytes = zlib.decompress(remaining_bytes)
+ self.assertEqual(content, raw_bytes)
+
def test_to_bytes(self):
content = ('this is some content\n'
'this content will be compressed\n')
@@ -389,7 +413,7 @@
z_content = zlib.compress(content)
self.assertEqual(57182, len(z_content))
block = groupcompress.GroupCompressBlock()
- block._z_content = z_content
+ block._z_content_chunks = (z_content,)
block._z_content_length = len(z_content)
block._compressor_name = 'zlib'
block._content_length = 158634
@@ -434,7 +458,7 @@
z_content = zlib.compress(content)
self.assertEqual(57182, len(z_content))
block = groupcompress.GroupCompressBlock()
- block._z_content = z_content
+ block._z_content_chunks = (z_content,)
block._z_content_length = len(z_content)
block._compressor_name = 'zlib'
block._content_length = 158634
More information about the bazaar-commits
mailing list