Rev 5870: (jameinel) Add bzr.groupcompress.max_bytes_to_index to limit peak memory in file:///home/pqm/archives/thelove/bzr/%2Btrunk/
Canonical.com Patch Queue Manager
pqm at pqm.ubuntu.com
Mon May 16 17:33:32 UTC 2011
At file:///home/pqm/archives/thelove/bzr/%2Btrunk/
------------------------------------------------------------
revno: 5870 [merge]
revision-id: pqm at pqm.ubuntu.com-20110516173327-5ehst0ttceohsf5w
parent: pqm at pqm.ubuntu.com-20110516164050-frzpzaz8l2v33b6k
parent: john at arbash-meinel.com-20110516142654-4ji514qg2pvakd3n
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Mon 2011-05-16 17:33:27 +0000
message:
(jameinel) Add bzr.groupcompress.max_bytes_to_index to limit peak memory
when delta-compressing large files (bug #602614) (John A Meinel)
modified:
bzrlib/_groupcompress_pyx.pyx _groupcompress_c.pyx-20080724041824-yelg6ii7c7zxt4z0-1
bzrlib/delta.h delta.h-20090227173129-qsu3u43vowf1q3ay-1
bzrlib/diff-delta.c diffdelta.c-20090226042143-l9wzxynyuxnb5hus-1
bzrlib/groupcompress.py groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
bzrlib/tests/test__groupcompress.py test__groupcompress_-20080724145854-koifwb7749cfzrvj-1
bzrlib/tests/test_groupcompress.py test_groupcompress.p-20080705181503-ccbxd6xuy1bdnrpu-13
doc/en/release-notes/bzr-2.4.txt bzr2.4.txt-20110114053217-k7ym9jfz243fddjm-1
=== modified file 'bzrlib/_groupcompress_pyx.pyx'
--- a/bzrlib/_groupcompress_pyx.pyx 2011-03-22 15:58:09 +0000
+++ b/bzrlib/_groupcompress_pyx.pyx 2011-05-16 14:26:54 +0000
@@ -56,7 +56,8 @@
DELTA_SIZE_TOO_BIG
delta_result create_delta_index(source_info *src,
delta_index *old,
- delta_index **fresh) nogil
+ delta_index **fresh,
+ int max_entries) nogil
delta_result create_delta_index_from_delta(source_info *delta,
delta_index *old,
delta_index **fresh) nogil
@@ -70,6 +71,10 @@
unsigned char *top) nogil
unsigned long sizeof_delta_index(delta_index *index)
Py_ssize_t DELTA_SIZE_MIN
+ int get_hash_offset(delta_index *index, int pos, unsigned int *hash_offset)
+ int get_entry_summary(delta_index *index, int pos,
+ unsigned int *global_offset, unsigned int *hash_val)
+ unsigned int rabin_hash (unsigned char *data)
cdef void *safe_malloc(size_t count) except NULL:
@@ -113,6 +118,15 @@
return AssertionError("Unrecognised delta result code: %d" % result)
+def _rabin_hash(content):
+ if not PyString_CheckExact(content):
+ raise ValueError('content must be a string')
+ if len(content) < 16:
+ raise ValueError('content must be at least 16 bytes long')
+ # Try to cast it to an int, if it can fit
+ return int(rabin_hash(<unsigned char*>(PyString_AS_STRING(content))))
+
+
cdef class DeltaIndex:
# We need Pyrex 0.9.8+ to understand a 'list' definition, and this object
@@ -123,14 +137,18 @@
cdef delta_index *_index
cdef public unsigned long _source_offset
cdef readonly unsigned int _max_num_sources
+ cdef public int _max_bytes_to_index
- def __init__(self, source=None):
+ def __init__(self, source=None, max_bytes_to_index=None):
self._sources = []
self._index = NULL
self._max_num_sources = 65000
self._source_infos = <source_info *>safe_malloc(sizeof(source_info)
* self._max_num_sources)
self._source_offset = 0
+ self._max_bytes_to_index = 0
+ if max_bytes_to_index is not None:
+ self._max_bytes_to_index = max_bytes_to_index
if source is not None:
self.add_source(source, 0)
@@ -164,6 +182,44 @@
def _has_index(self):
return (self._index != NULL)
+ def _dump_index(self):
+ """Dump the pointers in the index.
+
+ This is an arbitrary layout, used for testing. It is not meant to be
+ used in production code.
+
+ :return: (hash_list, entry_list)
+ hash_list A list of offsets, so hash[i] points to the 'hash
+ bucket' starting at the given offset and going until
+ hash[i+1]
+ entry_list A list of (text_offset, hash_val). text_offset is the
+ offset in the "source" texts, and hash_val is the RABIN
+ hash for that offset.
+ Note that the entry should be in the hash bucket
+ defined by
+ hash[(hash_val & mask)] && hash[(hash_val & mask) + 1]
+ """
+ cdef int pos
+ cdef unsigned int text_offset
+ cdef unsigned int hash_val
+ cdef unsigned int hash_offset
+ if self._index == NULL:
+ return None
+ hash_list = []
+ pos = 0
+ while get_hash_offset(self._index, pos, &hash_offset):
+ hash_list.append(int(hash_offset))
+ pos += 1
+ entry_list = []
+ pos = 0
+ while get_entry_summary(self._index, pos, &text_offset, &hash_val):
+ # Map back using 'int' so that we don't get Long everywhere, when
+ # almost everything is <2**31.
+ val = tuple(map(int, [text_offset, hash_val]))
+ entry_list.append(val)
+ pos += 1
+ return hash_list, entry_list
+
def add_delta_source(self, delta, unadded_bytes):
"""Add a new delta to the source texts.
@@ -207,6 +263,10 @@
:param source: The text in question, this must be a byte string
:param unadded_bytes: Assume there are this many bytes that didn't get
added between this source and the end of the previous source.
+ :param max_pointers: Add no more than this many entries to the index.
+ By default, we sample every 16 bytes, if that would require more
+ than max_entries, we will reduce the sampling rate.
+ A value of 0 means unlimited, None means use the default limit.
"""
cdef char *c_source
cdef Py_ssize_t c_source_size
@@ -215,6 +275,7 @@
cdef unsigned int source_location
cdef source_info *src
cdef unsigned int num_indexes
+ cdef int max_num_entries
if not PyString_CheckExact(source):
raise TypeError('source is not a str')
@@ -237,7 +298,8 @@
# We delay creating the index on the first insert
if source_location != 0:
with nogil:
- res = create_delta_index(src, self._index, &index)
+ res = create_delta_index(src, self._index, &index,
+ self._max_bytes_to_index)
if res != DELTA_OK:
raise _translate_delta_failure(res)
if index != self._index:
@@ -254,7 +316,8 @@
# We know that self._index is already NULL, so create_delta_index
# will always create a new index unless there's a malloc failure
with nogil:
- res = create_delta_index(&self._source_infos[0], NULL, &index)
+ res = create_delta_index(&self._source_infos[0], NULL, &index,
+ self._max_bytes_to_index)
if res != DELTA_OK:
raise _translate_delta_failure(res)
self._index = index
=== modified file 'bzrlib/delta.h'
--- a/bzrlib/delta.h 2011-03-22 00:33:21 +0000
+++ b/bzrlib/delta.h 2011-05-16 14:26:54 +0000
@@ -21,7 +21,6 @@
aggregate source */
};
-
/* result type for functions that have multiple failure modes */
typedef enum {
DELTA_OK, /* Success */
@@ -42,11 +41,17 @@
* free_delta_index(). Other values are a failure, and *fresh is unset.
* The given buffer must not be freed nor altered before free_delta_index() is
* called. The resultant struct must be freed using free_delta_index().
+ *
+ * :param max_bytes_to_index: Limit the number of regions to sample to this
+ * amount of text. We will store at most max_bytes_to_index / RABIN_WINDOW
+ * pointers into the source text. Useful if src can be unbounded in size,
+ * and you are willing to trade match accuracy for peak memory.
*/
extern delta_result
create_delta_index(const struct source_info *src,
struct delta_index *old,
- struct delta_index **fresh);
+ struct delta_index **fresh,
+ int max_bytes_to_index);
/*
@@ -118,4 +123,32 @@
return size;
}
+/*
+ * Return the basic information about a given delta index.
+ * :param index: The delta_index object
+ * :param pos: The offset in the entry list. Start at 0, and walk until you get
+ * 0 as a return code.
+ * :param global_offset: return value, distance to the beginning of all sources
+ * :param hash_val: return value, the RABIN hash associated with this pointer
+ * :param hash_offset: Location for this entry in the hash array.
+ * :return: 1 if pos != -1 (there was data produced)
+ */
+extern int
+get_entry_summary(const struct delta_index *index, int pos,
+ unsigned int *text_offset, unsigned int *hash_val);
+
+/*
+ * Determine what entry index->hash[X] points to.
+ */
+extern int
+get_hash_offset(const struct delta_index *index, int pos,
+ unsigned int *entry_offset);
+
+/*
+ * Compute the rabin_hash of the given data, it is assumed the data is at least
+ * RABIN_WINDOW wide (16 bytes).
+ */
+extern unsigned int
+rabin_hash(const unsigned char *data);
+
#endif
=== modified file 'bzrlib/diff-delta.c'
--- a/bzrlib/diff-delta.c 2011-03-20 18:58:43 +0000
+++ b/bzrlib/diff-delta.c 2011-05-16 14:26:54 +0000
@@ -376,10 +376,11 @@
delta_result
create_delta_index(const struct source_info *src,
struct delta_index *old,
- struct delta_index **fresh)
+ struct delta_index **fresh,
+ int max_bytes_to_index)
{
unsigned int i, hsize, hmask, num_entries, prev_val, *hash_count;
- unsigned int total_num_entries;
+ unsigned int total_num_entries, stride, max_entries;
const unsigned char *data, *buffer;
struct delta_index *index;
struct unpacked_index_entry *entry, **hash;
@@ -391,9 +392,20 @@
buffer = src->buf;
/* Determine index hash size. Note that indexing skips the
- first byte to allow for optimizing the Rabin's polynomial
- initialization in create_delta(). */
+ first byte so we subtract 1 to get the edge cases right.
+ */
+ stride = RABIN_WINDOW;
num_entries = (src->size - 1) / RABIN_WINDOW;
+ if (max_bytes_to_index > 0) {
+ max_entries = (unsigned int) (max_bytes_to_index / RABIN_WINDOW);
+ if (num_entries > max_entries) {
+ /* Limit the max number of matching entries. This reduces the 'best'
+ * possible match, but means we don't consume all of ram.
+ */
+ num_entries = max_entries;
+ stride = (src->size - 1) / num_entries;
+ }
+ }
if (old != NULL)
total_num_entries = num_entries + old->num_entries;
else
@@ -428,9 +440,9 @@
/* then populate the index for the new data */
prev_val = ~0;
- for (data = buffer + num_entries * RABIN_WINDOW - RABIN_WINDOW;
+ for (data = buffer + num_entries * stride - RABIN_WINDOW;
data >= buffer;
- data -= RABIN_WINDOW) {
+ data -= stride) {
unsigned int val = 0;
for (i = 1; i <= RABIN_WINDOW; i++)
val = ((val << 8) | data[i]) ^ T[val >> RABIN_SHIFT];
@@ -476,7 +488,7 @@
unsigned int hsize)
{
unsigned int hash_offset, hmask, memsize;
- struct index_entry *entry, *last_entry;
+ struct index_entry *entry;
struct index_entry_linked_list *out_entry, **hash;
void *mem;
@@ -496,7 +508,6 @@
/* We know that entries are in the order we want in the output, but they
* aren't "grouped" by hash bucket yet.
*/
- last_entry = entries + num_entries;
for (entry = entries + num_entries - 1; entry >= entries; --entry) {
hash_offset = entry->val & hmask;
out_entry->p_entry = entry;
@@ -521,7 +532,7 @@
unsigned int i, j, hsize, hmask, total_num_entries;
struct delta_index *index;
struct index_entry *entry, *packed_entry, **packed_hash;
- struct index_entry *last_entry, null_entry = {0};
+ struct index_entry null_entry = {0};
void *mem;
unsigned long memsize;
struct index_entry_linked_list *unpacked_entry, **mini_hash;
@@ -572,7 +583,6 @@
free(index);
return NULL;
}
- last_entry = entries + num_entries;
for (i = 0; i < hsize; i++) {
/*
* Coalesce all entries belonging in one hash bucket
@@ -1100,5 +1110,74 @@
return DELTA_OK;
}
+
+int
+get_entry_summary(const struct delta_index *index, int pos,
+ unsigned int *text_offset, unsigned int *hash_val)
+{
+ int hsize;
+ const struct index_entry *entry;
+ const struct index_entry *start_of_entries;
+ unsigned int offset;
+ if (pos < 0 || text_offset == NULL || hash_val == NULL
+ || index == NULL)
+ {
+ return 0;
+ }
+ hsize = index->hash_mask + 1;
+ start_of_entries = (struct index_entry *)(((struct index_entry **)index->hash) + (hsize + 1));
+ entry = start_of_entries + pos;
+ if (entry > index->last_entry) {
+ return 0;
+ }
+ if (entry->ptr == NULL) {
+ *text_offset = 0;
+ *hash_val = 0;
+ } else {
+ offset = entry->src->agg_offset;
+ offset += (entry->ptr - ((unsigned char *)entry->src->buf));
+ *text_offset = offset;
+ *hash_val = entry->val;
+ }
+ return 1;
+}
+
+
+int
+get_hash_offset(const struct delta_index *index, int pos,
+ unsigned int *entry_offset)
+{
+ int hsize;
+ const struct index_entry *entry;
+ const struct index_entry *start_of_entries;
+ if (pos < 0 || index == NULL || entry_offset == NULL)
+ {
+ return 0;
+ }
+ hsize = index->hash_mask + 1;
+ start_of_entries = (struct index_entry *)(((struct index_entry **)index->hash) + (hsize + 1));
+ if (pos >= hsize) {
+ return 0;
+ }
+ entry = index->hash[pos];
+ if (entry == NULL) {
+ *entry_offset = -1;
+ } else {
+ *entry_offset = (entry - start_of_entries);
+ }
+ return 1;
+}
+
+
+unsigned int
+rabin_hash(const unsigned char *data)
+{
+ int i;
+ unsigned int val = 0;
+ for (i = 0; i < RABIN_WINDOW; i++)
+ val = ((val << 8) | data[i]) ^ T[val >> RABIN_SHIFT];
+ return val;
+}
+
/* vim: et ts=4 sw=4 sts=4
*/
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py 2011-05-16 10:08:01 +0000
+++ b/bzrlib/groupcompress.py 2011-05-16 17:33:27 +0000
@@ -27,6 +27,7 @@
lazy_import(globals(), """
from bzrlib import (
annotate,
+ config,
debug,
errors,
graph as _mod_graph,
@@ -490,11 +491,25 @@
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
- def __init__(self, block):
+ def __init__(self, block, get_compressor_settings=None):
self._block = block
# We need to preserve the ordering
self._factories = []
self._last_byte = 0
+ self._get_settings = get_compressor_settings
+ self._compressor_settings = None
+
+ def _get_compressor_settings(self):
+ if self._compressor_settings is not None:
+ return self._compressor_settings
+ settings = None
+ if self._get_settings is not None:
+ settings = self._get_settings()
+ if settings is None:
+ vf = GroupCompressVersionedFiles
+ settings = vf._DEFAULT_COMPRESSOR_SETTINGS
+ self._compressor_settings = settings
+ return self._compressor_settings
def add_factory(self, key, parents, start, end):
if not self._factories:
@@ -533,9 +548,12 @@
new_block.set_content(self._block._content[:last_byte])
self._block = new_block
+ def _make_group_compressor(self):
+ return GroupCompressor(self._get_compressor_settings())
+
def _rebuild_block(self):
"""Create a new GroupCompressBlock with only the referenced texts."""
- compressor = GroupCompressor()
+ compressor = self._make_group_compressor()
tstart = time.time()
old_length = self._block._content_length
end_point = 0
@@ -553,6 +571,11 @@
# block? It seems hard to come up with a method that it would
# expand, since we do full compression again. Perhaps based on a
# request that ends up poorly ordered?
+ # TODO: If the content would have expanded, then we would want to
+ # handle a case where we need to split the block.
+ # Now that we have a user-tweakable option
+ # (max_bytes_to_index), it is possible that one person set it
+ # to a very low value, causing poor compression.
delta = time.time() - tstart
self._block = new_block
trace.mutter('creating new compressed block on-the-fly in %.3fs'
@@ -781,7 +804,7 @@
class _CommonGroupCompressor(object):
- def __init__(self):
+ def __init__(self, settings=None):
"""Create a GroupCompressor."""
self.chunks = []
self._last = None
@@ -790,6 +813,10 @@
self.labels_deltas = {}
self._delta_index = None # Set by the children
self._block = GroupCompressBlock()
+ if settings is None:
+ self._settings = {}
+ else:
+ self._settings = settings
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
"""Compress lines with label key.
@@ -910,12 +937,12 @@
class PythonGroupCompressor(_CommonGroupCompressor):
- def __init__(self):
+ def __init__(self, settings=None):
"""Create a GroupCompressor.
Used only if the pyrex version is not available.
"""
- super(PythonGroupCompressor, self).__init__()
+ super(PythonGroupCompressor, self).__init__(settings)
self._delta_index = LinesDeltaIndex([])
# The actual content is managed by LinesDeltaIndex
self.chunks = self._delta_index.lines
@@ -969,9 +996,10 @@
left side.
"""
- def __init__(self):
- super(PyrexGroupCompressor, self).__init__()
- self._delta_index = DeltaIndex()
+ def __init__(self, settings=None):
+ super(PyrexGroupCompressor, self).__init__(settings)
+ max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
+ self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
def _compress(self, key, bytes, max_delta_size, soft=False):
"""see _CommonGroupCompressor._compress"""
@@ -1068,12 +1096,12 @@
class _BatchingBlockFetcher(object):
"""Fetch group compress blocks in batches.
-
+
:ivar total_bytes: int of expected number of bytes needed to fetch the
currently pending batch.
"""
- def __init__(self, gcvf, locations):
+ def __init__(self, gcvf, locations, get_compressor_settings=None):
self.gcvf = gcvf
self.locations = locations
self.keys = []
@@ -1082,10 +1110,11 @@
self.total_bytes = 0
self.last_read_memo = None
self.manager = None
+ self._get_compressor_settings = get_compressor_settings
def add_key(self, key):
"""Add another to key to fetch.
-
+
:return: The estimated number of bytes needed to fetch the batch so
far.
"""
@@ -1116,7 +1145,7 @@
# and then.
self.batch_memos[read_memo] = cached_block
return self.total_bytes
-
+
def _flush_manager(self):
if self.manager is not None:
for factory in self.manager.get_record_stream():
@@ -1127,7 +1156,7 @@
def yield_factories(self, full_flush=False):
"""Yield factories for keys added since the last yield. They will be
returned in the order they were added via add_key.
-
+
:param full_flush: by default, some results may not be returned in case
they can be part of the next batch. If full_flush is True, then
all results are returned.
@@ -1161,7 +1190,8 @@
memos_to_get_stack.pop()
else:
block = self.batch_memos[read_memo]
- self.manager = _LazyGroupContentManager(block)
+ self.manager = _LazyGroupContentManager(block,
+ get_compressor_settings=self._get_compressor_settings)
self.last_read_memo = read_memo
start, end = index_memo[3:5]
self.manager.add_factory(key, parents, start, end)
@@ -1177,8 +1207,20 @@
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
"""A group-compress based VersionedFiles implementation."""
+ # This controls how the GroupCompress DeltaIndex works. Basically, we
+ # compute hash pointers into the source blocks (so hash(text) => text).
+ # However each of these references costs some memory in trade against a
+ # more accurate match result. For very large files, they either are
+ # pre-compressed and change in bulk whenever they change, or change in just
+ # local blocks. Either way, 'improved resolution' is not very helpful,
+ # versus running out of memory trying to track everything. The default max
+ # gives 100% sampling of a 1MB file.
+ _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
+ _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
+ _DEFAULT_MAX_BYTES_TO_INDEX}
+
def __init__(self, index, access, delta=True, _unadded_refs=None,
- _group_cache=None):
+ _group_cache=None):
"""Create a GroupCompressVersionedFiles object.
:param index: The index object storing access and graph data.
@@ -1197,6 +1239,7 @@
_group_cache = LRUSizeCache(max_size=50*1024*1024)
self._group_cache = _group_cache
self._immediate_fallback_vfs = []
+ self._max_bytes_to_index = None
def without_fallbacks(self):
"""Return a clone of this object without any fallbacks configured."""
@@ -1560,7 +1603,8 @@
# - we encounter an unadded ref, or
# - we run out of keys, or
# - the total bytes to retrieve for this batch > BATCH_SIZE
- batcher = _BatchingBlockFetcher(self, locations)
+ batcher = _BatchingBlockFetcher(self, locations,
+ get_compressor_settings=self._get_compressor_settings)
for source, keys in source_keys:
if source is self:
for key in keys:
@@ -1612,6 +1656,30 @@
for _ in self._insert_record_stream(stream, random_id=False):
pass
+ def _get_compressor_settings(self):
+ if self._max_bytes_to_index is None:
+ # TODO: VersionedFiles don't know about their containing
+ # repository, so they don't have much of an idea about their
+ # location. So for now, this is only a global option.
+ c = config.GlobalConfig()
+ val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
+ if val is not None:
+ try:
+ val = int(val)
+ except ValueError, e:
+ trace.warning('Value for '
+ '"bzr.groupcompress.max_bytes_to_index"'
+ ' %r is not an integer'
+ % (val,))
+ val = None
+ if val is None:
+ val = self._DEFAULT_MAX_BYTES_TO_INDEX
+ self._max_bytes_to_index = val
+ return {'max_bytes_to_index': self._max_bytes_to_index}
+
+ def _make_group_compressor(self):
+ return GroupCompressor(self._get_compressor_settings())
+
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
reuse_blocks=True):
"""Internal core to insert a record stream into this container.
@@ -1640,12 +1708,12 @@
return adapter
# This will go up to fulltexts for gc to gc fetching, which isn't
# ideal.
- self._compressor = GroupCompressor()
+ self._compressor = self._make_group_compressor()
self._unadded_refs = {}
keys_to_add = []
def flush():
bytes_len, chunks = self._compressor.flush().to_chunks()
- self._compressor = GroupCompressor()
+ self._compressor = self._make_group_compressor()
# Note: At this point we still have 1 copy of the fulltext (in
# record and the var 'bytes'), and this generates 2 copies of
# the compressed text (one for bytes, one in chunks)
=== modified file 'bzrlib/tests/test__groupcompress.py'
--- a/bzrlib/tests/test__groupcompress.py 2011-01-12 01:01:53 +0000
+++ b/bzrlib/tests/test__groupcompress.py 2011-05-14 10:28:59 +0000
@@ -264,6 +264,63 @@
di = self._gc_module.DeltaIndex('test text\n')
self.assertEqual('DeltaIndex(1, 10)', repr(di))
+ def test__dump_no_index(self):
+ di = self._gc_module.DeltaIndex()
+ self.assertEqual(None, di._dump_index())
+
+ def test__dump_index_simple(self):
+ di = self._gc_module.DeltaIndex()
+ di.add_source(_text1, 0)
+ self.assertFalse(di._has_index())
+ self.assertEqual(None, di._dump_index())
+ _ = di.make_delta(_text1)
+ self.assertTrue(di._has_index())
+ hash_list, entry_list = di._dump_index()
+ self.assertEqual(16, len(hash_list))
+ self.assertEqual(68, len(entry_list))
+ just_entries = [(idx, text_offset, hash_val)
+ for idx, (text_offset, hash_val)
+ in enumerate(entry_list)
+ if text_offset != 0 or hash_val != 0]
+ rabin_hash = self._gc_module._rabin_hash
+ self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
+ (25, 48, rabin_hash(_text1[33:49])),
+ (34, 32, rabin_hash(_text1[17:33])),
+ (47, 64, rabin_hash(_text1[49:65])),
+ ], just_entries)
+ # This ensures that the hash map points to the location we expect it to
+ for entry_idx, text_offset, hash_val in just_entries:
+ self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
+
+ def test__dump_index_two_sources(self):
+ di = self._gc_module.DeltaIndex()
+ di.add_source(_text1, 0)
+ di.add_source(_text2, 2)
+ start2 = len(_text1) + 2
+ self.assertTrue(di._has_index())
+ hash_list, entry_list = di._dump_index()
+ self.assertEqual(16, len(hash_list))
+ self.assertEqual(68, len(entry_list))
+ just_entries = [(idx, text_offset, hash_val)
+ for idx, (text_offset, hash_val)
+ in enumerate(entry_list)
+ if text_offset != 0 or hash_val != 0]
+ rabin_hash = self._gc_module._rabin_hash
+ self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
+ (9, start2+16, rabin_hash(_text2[1:17])),
+ (25, 48, rabin_hash(_text1[33:49])),
+ (30, start2+64, rabin_hash(_text2[49:65])),
+ (34, 32, rabin_hash(_text1[17:33])),
+ (35, start2+32, rabin_hash(_text2[17:33])),
+ (43, start2+48, rabin_hash(_text2[33:49])),
+ (47, 64, rabin_hash(_text1[49:65])),
+ ], just_entries)
+ # Each entry should be in the appropriate hash bucket.
+ for entry_idx, text_offset, hash_val in just_entries:
+ hash_idx = hash_val & 0xf
+ self.assertTrue(
+ hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
+
def test_first_add_source_doesnt_index_until_make_delta(self):
di = self._gc_module.DeltaIndex()
self.assertFalse(di._has_index())
@@ -275,6 +332,27 @@
self.assertTrue(di._has_index())
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
+ def test_add_source_max_bytes_to_index(self):
+ di = self._gc_module.DeltaIndex()
+ di._max_bytes_to_index = 3*16
+ di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
+ di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
+ start2 = len(_text1) + 3
+ hash_list, entry_list = di._dump_index()
+ self.assertEqual(16, len(hash_list))
+ self.assertEqual(67, len(entry_list))
+ just_entries = sorted([(text_offset, hash_val)
+ for text_offset, hash_val in entry_list
+ if text_offset != 0 or hash_val != 0])
+ rabin_hash = self._gc_module._rabin_hash
+ self.assertEqual([(25, rabin_hash(_text1[10:26])),
+ (50, rabin_hash(_text1[35:51])),
+ (75, rabin_hash(_text1[60:76])),
+ (start2+44, rabin_hash(_text3[29:45])),
+ (start2+88, rabin_hash(_text3[73:89])),
+ (start2+132, rabin_hash(_text3[117:133])),
+ ], just_entries)
+
def test_second_add_source_triggers_make_index(self):
di = self._gc_module.DeltaIndex()
self.assertFalse(di._has_index())
=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py 2011-01-10 22:20:12 +0000
+++ b/bzrlib/tests/test_groupcompress.py 2011-05-14 10:33:28 +0000
@@ -20,6 +20,7 @@
from bzrlib import (
btree_index,
+ config,
groupcompress,
errors,
index as _mod_index,
@@ -552,6 +553,23 @@
'as-requested', False)]
self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
+ def test_get_record_stream_max_bytes_to_index_default(self):
+ vf = self.make_test_vf(True, dir='source')
+ vf.add_lines(('a',), (), ['lines\n'])
+ vf.writer.end()
+ record = vf.get_record_stream([('a',)], 'unordered', True).next()
+ self.assertEqual(vf._DEFAULT_COMPRESSOR_SETTINGS,
+ record._manager._get_compressor_settings())
+
+ def test_get_record_stream_accesses_compressor_settings(self):
+ vf = self.make_test_vf(True, dir='source')
+ vf.add_lines(('a',), (), ['lines\n'])
+ vf.writer.end()
+ vf._max_bytes_to_index = 1234
+ record = vf.get_record_stream([('a',)], 'unordered', True).next()
+ self.assertEqual(dict(max_bytes_to_index=1234),
+ record._manager._get_compressor_settings())
+
def test_insert_record_stream_reuses_blocks(self):
vf = self.make_test_vf(True, dir='source')
def grouped_stream(revision_ids, first_parents=()):
@@ -770,6 +788,48 @@
self.assertEqual(0, len(vf._group_cache))
+class TestGroupCompressConfig(tests.TestCaseWithTransport):
+
+ def make_test_vf(self):
+ t = self.get_transport('.')
+ t.ensure_base()
+ factory = groupcompress.make_pack_factory(graph=True,
+ delta=False, keylength=1, inconsistency_fatal=True)
+ vf = factory(t)
+ self.addCleanup(groupcompress.cleanup_pack_group, vf)
+ return vf
+
+ def test_max_bytes_to_index_default(self):
+ vf = self.make_test_vf()
+ gc = vf._make_group_compressor()
+ self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
+ vf._max_bytes_to_index)
+ if isinstance(gc, groupcompress.PyrexGroupCompressor):
+ self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
+ gc._delta_index._max_bytes_to_index)
+
+ def test_max_bytes_to_index_in_config(self):
+ c = config.GlobalConfig()
+ c.set_user_option('bzr.groupcompress.max_bytes_to_index', '10000')
+ vf = self.make_test_vf()
+ gc = vf._make_group_compressor()
+ self.assertEqual(10000, vf._max_bytes_to_index)
+ if isinstance(gc, groupcompress.PyrexGroupCompressor):
+ self.assertEqual(10000, gc._delta_index._max_bytes_to_index)
+
+ def test_max_bytes_to_index_bad_config(self):
+ c = config.GlobalConfig()
+ c.set_user_option('bzr.groupcompress.max_bytes_to_index', 'boogah')
+ vf = self.make_test_vf()
+ # TODO: This is triggering a warning, we might want to trap and make
+ # sure it is readable.
+ gc = vf._make_group_compressor()
+ self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
+ vf._max_bytes_to_index)
+ if isinstance(gc, groupcompress.PyrexGroupCompressor):
+ self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
+ gc._delta_index._max_bytes_to_index)
+
class StubGCVF(object):
def __init__(self, canned_get_blocks=None):
@@ -1046,6 +1106,54 @@
self.assertEqual(self._texts[record.key],
record.get_bytes_as('fulltext'))
+ def test_manager_default_compressor_settings(self):
+ locations, old_block = self.make_block(self._texts)
+ manager = groupcompress._LazyGroupContentManager(old_block)
+ gcvf = groupcompress.GroupCompressVersionedFiles
+ # It doesn't greedily evaluate _max_bytes_to_index
+ self.assertIs(None, manager._compressor_settings)
+ self.assertEqual(gcvf._DEFAULT_COMPRESSOR_SETTINGS,
+ manager._get_compressor_settings())
+
+ def test_manager_custom_compressor_settings(self):
+ locations, old_block = self.make_block(self._texts)
+ called = []
+ def compressor_settings():
+ called.append('called')
+ return (10,)
+ manager = groupcompress._LazyGroupContentManager(old_block,
+ get_compressor_settings=compressor_settings)
+ gcvf = groupcompress.GroupCompressVersionedFiles
+ # It doesn't greedily evaluate compressor_settings
+ self.assertIs(None, manager._compressor_settings)
+ self.assertEqual((10,), manager._get_compressor_settings())
+ self.assertEqual((10,), manager._get_compressor_settings())
+ self.assertEqual((10,), manager._compressor_settings)
+ # Only called 1 time
+ self.assertEqual(['called'], called)
+
+ def test__rebuild_handles_compressor_settings(self):
+ if not isinstance(groupcompress.GroupCompressor,
+ groupcompress.PyrexGroupCompressor):
+ raise tests.TestNotApplicable('pure-python compressor'
+ ' does not handle compressor_settings')
+ locations, old_block = self.make_block(self._texts)
+ manager = groupcompress._LazyGroupContentManager(old_block,
+ get_compressor_settings=lambda: dict(max_bytes_to_index=32))
+ gc = manager._make_group_compressor()
+ self.assertEqual(32, gc._delta_index._max_bytes_to_index)
+ self.add_key_to_manager(('key3',), locations, old_block, manager)
+ self.add_key_to_manager(('key4',), locations, old_block, manager)
+ action, last_byte, total_bytes = manager._check_rebuild_action()
+ self.assertEqual('rebuild', action)
+ manager._rebuild_block()
+ new_block = manager._block
+ self.assertIsNot(old_block, new_block)
+ # Because of the new max_bytes_to_index, we do a poor job of
+ # rebuilding. This is a side-effect of the change, but at least it does
+ # show the setting had an effect.
+ self.assertTrue(old_block._content_length < new_block._content_length)
+
def test_check_is_well_utilized_all_keys(self):
block, manager = self.make_block_and_full_manager(self._texts)
self.assertFalse(manager.check_is_well_utilized())
=== modified file 'doc/en/release-notes/bzr-2.4.txt'
--- a/doc/en/release-notes/bzr-2.4.txt 2011-05-16 11:17:40 +0000
+++ b/doc/en/release-notes/bzr-2.4.txt 2011-05-16 17:33:27 +0000
@@ -24,6 +24,15 @@
.. New commands, options, etc that users may wish to try out.
+* The text compressor used for 2a repositories now has a tweakable
+ parameter that can be set in bazaar.conf.
+ ``bzr.groupcompress.max_entries_per_source`` default of 65536.
+ When doing compression, we build up an index of locations to match
+ against. Setting this higher will result in slightly better compression,
+ at a cost of more memory. Note that a value of 65k represents fully
+ sampling a 1MB file. So this only has an effect when compressing texts
+ larger than N*16 bytes. (John Arbash Meinel, #602614)
+
Improvements
************
More information about the bazaar-commits
mailing list