Rev 5870: (jameinel) Add bzr.groupcompress.max_bytes_to_index to limit peak memory in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Mon May 16 17:33:32 UTC 2011


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 5870 [merge]
revision-id: pqm at pqm.ubuntu.com-20110516173327-5ehst0ttceohsf5w
parent: pqm at pqm.ubuntu.com-20110516164050-frzpzaz8l2v33b6k
parent: john at arbash-meinel.com-20110516142654-4ji514qg2pvakd3n
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Mon 2011-05-16 17:33:27 +0000
message:
  (jameinel) Add bzr.groupcompress.max_bytes_to_index to limit peak memory
   when delta-compressing large files (bug #602614) (John A Meinel)
modified:
  bzrlib/_groupcompress_pyx.pyx  _groupcompress_c.pyx-20080724041824-yelg6ii7c7zxt4z0-1
  bzrlib/delta.h                 delta.h-20090227173129-qsu3u43vowf1q3ay-1
  bzrlib/diff-delta.c            diffdelta.c-20090226042143-l9wzxynyuxnb5hus-1
  bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
  bzrlib/tests/test__groupcompress.py test__groupcompress_-20080724145854-koifwb7749cfzrvj-1
  bzrlib/tests/test_groupcompress.py test_groupcompress.p-20080705181503-ccbxd6xuy1bdnrpu-13
  doc/en/release-notes/bzr-2.4.txt bzr2.4.txt-20110114053217-k7ym9jfz243fddjm-1
=== modified file 'bzrlib/_groupcompress_pyx.pyx'
--- a/bzrlib/_groupcompress_pyx.pyx	2011-03-22 15:58:09 +0000
+++ b/bzrlib/_groupcompress_pyx.pyx	2011-05-16 14:26:54 +0000
@@ -56,7 +56,8 @@
         DELTA_SIZE_TOO_BIG
     delta_result create_delta_index(source_info *src,
                                     delta_index *old,
-                                    delta_index **fresh) nogil
+                                    delta_index **fresh,
+                                    int max_entries) nogil
     delta_result create_delta_index_from_delta(source_info *delta,
                                                delta_index *old,
                                                delta_index **fresh) nogil
@@ -70,6 +71,10 @@
                                      unsigned char *top) nogil
     unsigned long sizeof_delta_index(delta_index *index)
     Py_ssize_t DELTA_SIZE_MIN
+    int get_hash_offset(delta_index *index, int pos, unsigned int *hash_offset)
+    int get_entry_summary(delta_index *index, int pos,
+                          unsigned int *global_offset, unsigned int *hash_val)
+    unsigned int rabin_hash (unsigned char *data)
 
 
 cdef void *safe_malloc(size_t count) except NULL:
@@ -113,6 +118,15 @@
     return AssertionError("Unrecognised delta result code: %d" % result)
 
 
+def _rabin_hash(content):
+    if not PyString_CheckExact(content):
+        raise ValueError('content must be a string')
+    if len(content) < 16:
+        raise ValueError('content must be at least 16 bytes long')
+    # Try to cast it to an int, if it can fit
+    return int(rabin_hash(<unsigned char*>(PyString_AS_STRING(content))))
+
+
 cdef class DeltaIndex:
 
     # We need Pyrex 0.9.8+ to understand a 'list' definition, and this object
@@ -123,14 +137,18 @@
     cdef delta_index *_index
     cdef public unsigned long _source_offset
     cdef readonly unsigned int _max_num_sources
+    cdef public int _max_bytes_to_index
 
-    def __init__(self, source=None):
+    def __init__(self, source=None, max_bytes_to_index=None):
         self._sources = []
         self._index = NULL
         self._max_num_sources = 65000
         self._source_infos = <source_info *>safe_malloc(sizeof(source_info)
                                                         * self._max_num_sources)
         self._source_offset = 0
+        self._max_bytes_to_index = 0
+        if max_bytes_to_index is not None:
+            self._max_bytes_to_index = max_bytes_to_index
 
         if source is not None:
             self.add_source(source, 0)
@@ -164,6 +182,44 @@
     def _has_index(self):
         return (self._index != NULL)
 
+    def _dump_index(self):
+        """Dump the pointers in the index.
+
+        This is an arbitrary layout, used for testing. It is not meant to be
+        used in production code.
+
+        :return: (hash_list, entry_list)
+            hash_list   A list of offsets, so hash[i] points to the 'hash
+                        bucket' starting at the given offset and going until
+                        hash[i+1]
+            entry_list  A list of (text_offset, hash_val). text_offset is the
+                        offset in the "source" texts, and hash_val is the RABIN
+                        hash for that offset.
+                        Note that the entry should be in the hash bucket
+                        defined by
+                        hash[(hash_val & mask)] && hash[(hash_val & mask) + 1]
+        """
+        cdef int pos
+        cdef unsigned int text_offset
+        cdef unsigned int hash_val
+        cdef unsigned int hash_offset
+        if self._index == NULL:
+            return None
+        hash_list = []
+        pos = 0
+        while get_hash_offset(self._index, pos, &hash_offset):
+            hash_list.append(int(hash_offset))
+            pos += 1
+        entry_list = []
+        pos = 0
+        while get_entry_summary(self._index, pos, &text_offset, &hash_val):
+            # Map back using 'int' so that we don't get Long everywhere, when
+            # almost everything is <2**31.
+            val = tuple(map(int, [text_offset, hash_val]))
+            entry_list.append(val)
+            pos += 1
+        return hash_list, entry_list
+
     def add_delta_source(self, delta, unadded_bytes):
         """Add a new delta to the source texts.
 
@@ -207,6 +263,10 @@
         :param source: The text in question, this must be a byte string
         :param unadded_bytes: Assume there are this many bytes that didn't get
             added between this source and the end of the previous source.
+        :param max_pointers: Add no more than this many entries to the index.
+            By default, we sample every 16 bytes, if that would require more
+            than max_entries, we will reduce the sampling rate.
+            A value of 0 means unlimited, None means use the default limit.
         """
         cdef char *c_source
         cdef Py_ssize_t c_source_size
@@ -215,6 +275,7 @@
         cdef unsigned int source_location
         cdef source_info *src
         cdef unsigned int num_indexes
+        cdef int max_num_entries
 
         if not PyString_CheckExact(source):
             raise TypeError('source is not a str')
@@ -237,7 +298,8 @@
         # We delay creating the index on the first insert
         if source_location != 0:
             with nogil:
-                res = create_delta_index(src, self._index, &index)
+                res = create_delta_index(src, self._index, &index,
+                                         self._max_bytes_to_index)
             if res != DELTA_OK:
                 raise _translate_delta_failure(res)
             if index != self._index:
@@ -254,7 +316,8 @@
         # We know that self._index is already NULL, so create_delta_index
         # will always create a new index unless there's a malloc failure
         with nogil:
-            res = create_delta_index(&self._source_infos[0], NULL, &index)
+            res = create_delta_index(&self._source_infos[0], NULL, &index,
+                                     self._max_bytes_to_index)
         if res != DELTA_OK:
             raise _translate_delta_failure(res)
         self._index = index

=== modified file 'bzrlib/delta.h'
--- a/bzrlib/delta.h	2011-03-22 00:33:21 +0000
+++ b/bzrlib/delta.h	2011-05-16 14:26:54 +0000
@@ -21,7 +21,6 @@
                                  aggregate source */
 };
 
-
 /* result type for functions that have multiple failure modes */
 typedef enum {
     DELTA_OK,             /* Success */
@@ -42,11 +41,17 @@
  * free_delta_index().  Other values are a failure, and *fresh is unset.
  * The given buffer must not be freed nor altered before free_delta_index() is
  * called. The resultant struct must be freed using free_delta_index().
+ *
+ * :param max_bytes_to_index: Limit the number of regions to sample to this
+ *      amount of text. We will store at most max_bytes_to_index / RABIN_WINDOW
+ *      pointers into the source text.  Useful if src can be unbounded in size,
+ *      and you are willing to trade match accuracy for peak memory.
  */
 extern delta_result
 create_delta_index(const struct source_info *src,
                    struct delta_index *old,
-                   struct delta_index **fresh);
+                   struct delta_index **fresh,
+                   int max_bytes_to_index);
 
 
 /*
@@ -118,4 +123,32 @@
     return size;
 }
 
+/*
+ * Return the basic information about a given delta index.
+ * :param index: The delta_index object
+ * :param pos: The offset in the entry list. Start at 0, and walk until you get
+ *      0 as a return code.
+ * :param global_offset: return value, distance to the beginning of all sources
+ * :param hash_val: return value, the RABIN hash associated with this pointer
+ * :param hash_offset: Location for this entry in the hash array.
+ * :return: 1 if pos != -1 (there was data produced)
+ */
+extern int
+get_entry_summary(const struct delta_index *index, int pos,
+                  unsigned int *text_offset, unsigned int *hash_val);
+
+/*
+ * Determine what entry index->hash[X] points to.
+ */
+extern int
+get_hash_offset(const struct delta_index *index, int pos,
+                unsigned int *entry_offset);
+
+/*
+ * Compute the rabin_hash of the given data, it is assumed the data is at least
+ * RABIN_WINDOW wide (16 bytes).
+ */
+extern unsigned int
+rabin_hash(const unsigned char *data);
+
 #endif

=== modified file 'bzrlib/diff-delta.c'
--- a/bzrlib/diff-delta.c	2011-03-20 18:58:43 +0000
+++ b/bzrlib/diff-delta.c	2011-05-16 14:26:54 +0000
@@ -376,10 +376,11 @@
 delta_result
 create_delta_index(const struct source_info *src,
                    struct delta_index *old,
-                   struct delta_index **fresh)
+                   struct delta_index **fresh,
+                   int max_bytes_to_index)
 {
     unsigned int i, hsize, hmask, num_entries, prev_val, *hash_count;
-    unsigned int total_num_entries;
+    unsigned int total_num_entries, stride, max_entries;
     const unsigned char *data, *buffer;
     struct delta_index *index;
     struct unpacked_index_entry *entry, **hash;
@@ -391,9 +392,20 @@
     buffer = src->buf;
 
     /* Determine index hash size.  Note that indexing skips the
-       first byte to allow for optimizing the Rabin's polynomial
-       initialization in create_delta(). */
+       first byte so we subtract 1 to get the edge cases right.
+     */
+    stride = RABIN_WINDOW;
     num_entries = (src->size - 1)  / RABIN_WINDOW;
+    if (max_bytes_to_index > 0) {
+        max_entries = (unsigned int) (max_bytes_to_index / RABIN_WINDOW);
+        if (num_entries > max_entries) {
+            /* Limit the max number of matching entries. This reduces the 'best'
+             * possible match, but means we don't consume all of ram.
+             */
+            num_entries = max_entries;
+            stride = (src->size - 1) / num_entries;
+        }
+    }
     if (old != NULL)
         total_num_entries = num_entries + old->num_entries;
     else
@@ -428,9 +440,9 @@
 
     /* then populate the index for the new data */
     prev_val = ~0;
-    for (data = buffer + num_entries * RABIN_WINDOW - RABIN_WINDOW;
+    for (data = buffer + num_entries * stride - RABIN_WINDOW;
          data >= buffer;
-         data -= RABIN_WINDOW) {
+         data -= stride) {
         unsigned int val = 0;
         for (i = 1; i <= RABIN_WINDOW; i++)
             val = ((val << 8) | data[i]) ^ T[val >> RABIN_SHIFT];
@@ -476,7 +488,7 @@
                        unsigned int hsize)
 {
     unsigned int hash_offset, hmask, memsize;
-    struct index_entry *entry, *last_entry;
+    struct index_entry *entry;
     struct index_entry_linked_list *out_entry, **hash;
     void *mem;
 
@@ -496,7 +508,6 @@
     /* We know that entries are in the order we want in the output, but they
      * aren't "grouped" by hash bucket yet.
      */
-    last_entry = entries + num_entries;
     for (entry = entries + num_entries - 1; entry >= entries; --entry) {
         hash_offset = entry->val & hmask;
         out_entry->p_entry = entry;
@@ -521,7 +532,7 @@
     unsigned int i, j, hsize, hmask, total_num_entries;
     struct delta_index *index;
     struct index_entry *entry, *packed_entry, **packed_hash;
-    struct index_entry *last_entry, null_entry = {0};
+    struct index_entry null_entry = {0};
     void *mem;
     unsigned long memsize;
     struct index_entry_linked_list *unpacked_entry, **mini_hash;
@@ -572,7 +583,6 @@
         free(index);
         return NULL;
     }
-    last_entry = entries + num_entries;
     for (i = 0; i < hsize; i++) {
         /*
          * Coalesce all entries belonging in one hash bucket
@@ -1100,5 +1110,74 @@
     return DELTA_OK;
 }
 
+
+int
+get_entry_summary(const struct delta_index *index, int pos,
+                  unsigned int *text_offset, unsigned int *hash_val)
+{
+    int hsize;
+    const struct index_entry *entry;
+    const struct index_entry *start_of_entries;
+    unsigned int offset;
+    if (pos < 0 || text_offset == NULL || hash_val == NULL
+        || index == NULL)
+    {
+        return 0;
+    }
+    hsize = index->hash_mask + 1;
+    start_of_entries = (struct index_entry *)(((struct index_entry **)index->hash) + (hsize + 1));
+    entry = start_of_entries + pos;
+    if (entry > index->last_entry) {
+        return 0;
+    }
+    if (entry->ptr == NULL) {
+        *text_offset = 0;
+        *hash_val = 0;
+    } else {
+        offset = entry->src->agg_offset;
+        offset += (entry->ptr - ((unsigned char *)entry->src->buf));
+        *text_offset = offset;
+        *hash_val = entry->val;
+    }
+    return 1;
+}
+
+
+int
+get_hash_offset(const struct delta_index *index, int pos,
+                unsigned int *entry_offset)
+{
+    int hsize;
+    const struct index_entry *entry;
+    const struct index_entry *start_of_entries;
+    if (pos < 0 || index == NULL || entry_offset == NULL)
+    {
+        return 0;
+    }
+    hsize = index->hash_mask + 1;
+    start_of_entries = (struct index_entry *)(((struct index_entry **)index->hash) + (hsize + 1));
+    if (pos >= hsize) {
+        return 0;
+    }
+    entry = index->hash[pos];
+    if (entry == NULL) {
+        *entry_offset = -1;
+    } else {
+        *entry_offset = (entry - start_of_entries);
+    }
+    return 1;
+}
+
+
+unsigned int
+rabin_hash(const unsigned char *data)
+{
+    int i;
+    unsigned int val = 0;
+    for (i = 0; i < RABIN_WINDOW; i++)
+        val = ((val << 8) | data[i]) ^ T[val >> RABIN_SHIFT];
+    return val;
+}
+
 /* vim: et ts=4 sw=4 sts=4
  */

=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2011-05-16 10:08:01 +0000
+++ b/bzrlib/groupcompress.py	2011-05-16 17:33:27 +0000
@@ -27,6 +27,7 @@
 lazy_import(globals(), """
 from bzrlib import (
     annotate,
+    config,
     debug,
     errors,
     graph as _mod_graph,
@@ -490,11 +491,25 @@
     _full_enough_block_size = 3*1024*1024 # size at which we won't repack
     _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
 
-    def __init__(self, block):
+    def __init__(self, block, get_compressor_settings=None):
         self._block = block
         # We need to preserve the ordering
         self._factories = []
         self._last_byte = 0
+        self._get_settings = get_compressor_settings
+        self._compressor_settings = None
+
+    def _get_compressor_settings(self):
+        if self._compressor_settings is not None:
+            return self._compressor_settings
+        settings = None
+        if self._get_settings is not None:
+            settings = self._get_settings()
+        if settings is None:
+            vf = GroupCompressVersionedFiles
+            settings = vf._DEFAULT_COMPRESSOR_SETTINGS
+        self._compressor_settings = settings
+        return self._compressor_settings
 
     def add_factory(self, key, parents, start, end):
         if not self._factories:
@@ -533,9 +548,12 @@
         new_block.set_content(self._block._content[:last_byte])
         self._block = new_block
 
+    def _make_group_compressor(self):
+        return GroupCompressor(self._get_compressor_settings())
+
     def _rebuild_block(self):
         """Create a new GroupCompressBlock with only the referenced texts."""
-        compressor = GroupCompressor()
+        compressor = self._make_group_compressor()
         tstart = time.time()
         old_length = self._block._content_length
         end_point = 0
@@ -553,6 +571,11 @@
         #       block? It seems hard to come up with a method that it would
         #       expand, since we do full compression again. Perhaps based on a
         #       request that ends up poorly ordered?
+        # TODO: If the content would have expanded, then we would want to
+        #       handle a case where we need to split the block.
+        #       Now that we have a user-tweakable option
+        #       (max_bytes_to_index), it is possible that one person set it
+        #       to a very low value, causing poor compression.
         delta = time.time() - tstart
         self._block = new_block
         trace.mutter('creating new compressed block on-the-fly in %.3fs'
@@ -781,7 +804,7 @@
 
 class _CommonGroupCompressor(object):
 
-    def __init__(self):
+    def __init__(self, settings=None):
         """Create a GroupCompressor."""
         self.chunks = []
         self._last = None
@@ -790,6 +813,10 @@
         self.labels_deltas = {}
         self._delta_index = None # Set by the children
         self._block = GroupCompressBlock()
+        if settings is None:
+            self._settings = {}
+        else:
+            self._settings = settings
 
     def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
         """Compress lines with label key.
@@ -910,12 +937,12 @@
 
 class PythonGroupCompressor(_CommonGroupCompressor):
 
-    def __init__(self):
+    def __init__(self, settings=None):
         """Create a GroupCompressor.
 
         Used only if the pyrex version is not available.
         """
-        super(PythonGroupCompressor, self).__init__()
+        super(PythonGroupCompressor, self).__init__(settings)
         self._delta_index = LinesDeltaIndex([])
         # The actual content is managed by LinesDeltaIndex
         self.chunks = self._delta_index.lines
@@ -969,9 +996,10 @@
        left side.
     """
 
-    def __init__(self):
-        super(PyrexGroupCompressor, self).__init__()
-        self._delta_index = DeltaIndex()
+    def __init__(self, settings=None):
+        super(PyrexGroupCompressor, self).__init__(settings)
+        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
+        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
 
     def _compress(self, key, bytes, max_delta_size, soft=False):
         """see _CommonGroupCompressor._compress"""
@@ -1068,12 +1096,12 @@
 
 class _BatchingBlockFetcher(object):
     """Fetch group compress blocks in batches.
-    
+
     :ivar total_bytes: int of expected number of bytes needed to fetch the
         currently pending batch.
     """
 
-    def __init__(self, gcvf, locations):
+    def __init__(self, gcvf, locations, get_compressor_settings=None):
         self.gcvf = gcvf
         self.locations = locations
         self.keys = []
@@ -1082,10 +1110,11 @@
         self.total_bytes = 0
         self.last_read_memo = None
         self.manager = None
+        self._get_compressor_settings = get_compressor_settings
 
     def add_key(self, key):
         """Add another to key to fetch.
-        
+
         :return: The estimated number of bytes needed to fetch the batch so
             far.
         """
@@ -1116,7 +1145,7 @@
             # and then.
             self.batch_memos[read_memo] = cached_block
         return self.total_bytes
-        
+
     def _flush_manager(self):
         if self.manager is not None:
             for factory in self.manager.get_record_stream():
@@ -1127,7 +1156,7 @@
     def yield_factories(self, full_flush=False):
         """Yield factories for keys added since the last yield.  They will be
         returned in the order they were added via add_key.
-        
+
         :param full_flush: by default, some results may not be returned in case
             they can be part of the next batch.  If full_flush is True, then
             all results are returned.
@@ -1161,7 +1190,8 @@
                     memos_to_get_stack.pop()
                 else:
                     block = self.batch_memos[read_memo]
-                self.manager = _LazyGroupContentManager(block)
+                self.manager = _LazyGroupContentManager(block,
+                    get_compressor_settings=self._get_compressor_settings)
                 self.last_read_memo = read_memo
             start, end = index_memo[3:5]
             self.manager.add_factory(key, parents, start, end)
@@ -1177,8 +1207,20 @@
 class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
     """A group-compress based VersionedFiles implementation."""
 
+    # This controls how the GroupCompress DeltaIndex works. Basically, we
+    # compute hash pointers into the source blocks (so hash(text) => text).
+    # However each of these references costs some memory in trade against a
+    # more accurate match result. For very large files, they either are
+    # pre-compressed and change in bulk whenever they change, or change in just
+    # local blocks. Either way, 'improved resolution' is not very helpful,
+    # versus running out of memory trying to track everything. The default max
+    # gives 100% sampling of a 1MB file.
+    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
+    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
+                                     _DEFAULT_MAX_BYTES_TO_INDEX}
+
     def __init__(self, index, access, delta=True, _unadded_refs=None,
-            _group_cache=None):
+                 _group_cache=None):
         """Create a GroupCompressVersionedFiles object.
 
         :param index: The index object storing access and graph data.
@@ -1197,6 +1239,7 @@
             _group_cache = LRUSizeCache(max_size=50*1024*1024)
         self._group_cache = _group_cache
         self._immediate_fallback_vfs = []
+        self._max_bytes_to_index = None
 
     def without_fallbacks(self):
         """Return a clone of this object without any fallbacks configured."""
@@ -1560,7 +1603,8 @@
         #  - we encounter an unadded ref, or
         #  - we run out of keys, or
         #  - the total bytes to retrieve for this batch > BATCH_SIZE
-        batcher = _BatchingBlockFetcher(self, locations)
+        batcher = _BatchingBlockFetcher(self, locations,
+            get_compressor_settings=self._get_compressor_settings)
         for source, keys in source_keys:
             if source is self:
                 for key in keys:
@@ -1612,6 +1656,30 @@
         for _ in self._insert_record_stream(stream, random_id=False):
             pass
 
+    def _get_compressor_settings(self):
+        if self._max_bytes_to_index is None:
+            # TODO: VersionedFiles don't know about their containing
+            #       repository, so they don't have much of an idea about their
+            #       location. So for now, this is only a global option.
+            c = config.GlobalConfig()
+            val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
+            if val is not None:
+                try:
+                    val = int(val)
+                except ValueError, e:
+                    trace.warning('Value for '
+                                  '"bzr.groupcompress.max_bytes_to_index"'
+                                  ' %r is not an integer'
+                                  % (val,))
+                    val = None
+            if val is None:
+                val = self._DEFAULT_MAX_BYTES_TO_INDEX
+            self._max_bytes_to_index = val
+        return {'max_bytes_to_index': self._max_bytes_to_index}
+
+    def _make_group_compressor(self):
+        return GroupCompressor(self._get_compressor_settings())
+
     def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
                               reuse_blocks=True):
         """Internal core to insert a record stream into this container.
@@ -1640,12 +1708,12 @@
                 return adapter
         # This will go up to fulltexts for gc to gc fetching, which isn't
         # ideal.
-        self._compressor = GroupCompressor()
+        self._compressor = self._make_group_compressor()
         self._unadded_refs = {}
         keys_to_add = []
         def flush():
             bytes_len, chunks = self._compressor.flush().to_chunks()
-            self._compressor = GroupCompressor()
+            self._compressor = self._make_group_compressor()
             # Note: At this point we still have 1 copy of the fulltext (in
             #       record and the var 'bytes'), and this generates 2 copies of
             #       the compressed text (one for bytes, one in chunks)

=== modified file 'bzrlib/tests/test__groupcompress.py'
--- a/bzrlib/tests/test__groupcompress.py	2011-01-12 01:01:53 +0000
+++ b/bzrlib/tests/test__groupcompress.py	2011-05-14 10:28:59 +0000
@@ -264,6 +264,63 @@
         di = self._gc_module.DeltaIndex('test text\n')
         self.assertEqual('DeltaIndex(1, 10)', repr(di))
 
+    def test__dump_no_index(self):
+        di = self._gc_module.DeltaIndex()
+        self.assertEqual(None, di._dump_index())
+
+    def test__dump_index_simple(self):
+        di = self._gc_module.DeltaIndex()
+        di.add_source(_text1, 0)
+        self.assertFalse(di._has_index())
+        self.assertEqual(None, di._dump_index())
+        _ = di.make_delta(_text1)
+        self.assertTrue(di._has_index())
+        hash_list, entry_list = di._dump_index()
+        self.assertEqual(16, len(hash_list))
+        self.assertEqual(68, len(entry_list))
+        just_entries = [(idx, text_offset, hash_val)
+                        for idx, (text_offset, hash_val)
+                         in enumerate(entry_list)
+                         if text_offset != 0 or hash_val != 0]
+        rabin_hash = self._gc_module._rabin_hash
+        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
+                          (25, 48, rabin_hash(_text1[33:49])),
+                          (34, 32, rabin_hash(_text1[17:33])),
+                          (47, 64, rabin_hash(_text1[49:65])),
+                         ], just_entries)
+        # This ensures that the hash map points to the location we expect it to
+        for entry_idx, text_offset, hash_val in just_entries:
+            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
+
+    def test__dump_index_two_sources(self):
+        di = self._gc_module.DeltaIndex()
+        di.add_source(_text1, 0)
+        di.add_source(_text2, 2)
+        start2 = len(_text1) + 2
+        self.assertTrue(di._has_index())
+        hash_list, entry_list = di._dump_index()
+        self.assertEqual(16, len(hash_list))
+        self.assertEqual(68, len(entry_list))
+        just_entries = [(idx, text_offset, hash_val)
+                        for idx, (text_offset, hash_val)
+                         in enumerate(entry_list)
+                         if text_offset != 0 or hash_val != 0]
+        rabin_hash = self._gc_module._rabin_hash
+        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
+                          (9, start2+16, rabin_hash(_text2[1:17])),
+                          (25, 48, rabin_hash(_text1[33:49])),
+                          (30, start2+64, rabin_hash(_text2[49:65])),
+                          (34, 32, rabin_hash(_text1[17:33])),
+                          (35, start2+32, rabin_hash(_text2[17:33])),
+                          (43, start2+48, rabin_hash(_text2[33:49])),
+                          (47, 64, rabin_hash(_text1[49:65])),
+                         ], just_entries)
+        # Each entry should be in the appropriate hash bucket.
+        for entry_idx, text_offset, hash_val in just_entries:
+            hash_idx = hash_val & 0xf
+            self.assertTrue(
+                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
+
     def test_first_add_source_doesnt_index_until_make_delta(self):
         di = self._gc_module.DeltaIndex()
         self.assertFalse(di._has_index())
@@ -275,6 +332,27 @@
         self.assertTrue(di._has_index())
         self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
+    def test_add_source_max_bytes_to_index(self):
+        di = self._gc_module.DeltaIndex()
+        di._max_bytes_to_index = 3*16
+        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
+        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
+        start2 = len(_text1) + 3
+        hash_list, entry_list = di._dump_index()
+        self.assertEqual(16, len(hash_list))
+        self.assertEqual(67, len(entry_list))
+        just_entries = sorted([(text_offset, hash_val)
+                               for text_offset, hash_val in entry_list
+                                if text_offset != 0 or hash_val != 0])
+        rabin_hash = self._gc_module._rabin_hash
+        self.assertEqual([(25, rabin_hash(_text1[10:26])),
+                          (50, rabin_hash(_text1[35:51])),
+                          (75, rabin_hash(_text1[60:76])),
+                          (start2+44, rabin_hash(_text3[29:45])),
+                          (start2+88, rabin_hash(_text3[73:89])),
+                          (start2+132, rabin_hash(_text3[117:133])),
+                         ], just_entries)
+
     def test_second_add_source_triggers_make_index(self):
         di = self._gc_module.DeltaIndex()
         self.assertFalse(di._has_index())

=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py	2011-01-10 22:20:12 +0000
+++ b/bzrlib/tests/test_groupcompress.py	2011-05-14 10:33:28 +0000
@@ -20,6 +20,7 @@
 
 from bzrlib import (
     btree_index,
+    config,
     groupcompress,
     errors,
     index as _mod_index,
@@ -552,6 +553,23 @@
                     'as-requested', False)]
         self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
 
+    def test_get_record_stream_max_bytes_to_index_default(self):
+        vf = self.make_test_vf(True, dir='source')
+        vf.add_lines(('a',), (), ['lines\n'])
+        vf.writer.end()
+        record = vf.get_record_stream([('a',)], 'unordered', True).next()
+        self.assertEqual(vf._DEFAULT_COMPRESSOR_SETTINGS,
+                         record._manager._get_compressor_settings())
+
+    def test_get_record_stream_accesses_compressor_settings(self):
+        vf = self.make_test_vf(True, dir='source')
+        vf.add_lines(('a',), (), ['lines\n'])
+        vf.writer.end()
+        vf._max_bytes_to_index = 1234
+        record = vf.get_record_stream([('a',)], 'unordered', True).next()
+        self.assertEqual(dict(max_bytes_to_index=1234),
+                         record._manager._get_compressor_settings())
+
     def test_insert_record_stream_reuses_blocks(self):
         vf = self.make_test_vf(True, dir='source')
         def grouped_stream(revision_ids, first_parents=()):
@@ -770,6 +788,48 @@
         self.assertEqual(0, len(vf._group_cache))
 
 
+class TestGroupCompressConfig(tests.TestCaseWithTransport):
+
+    def make_test_vf(self):
+        t = self.get_transport('.')
+        t.ensure_base()
+        factory = groupcompress.make_pack_factory(graph=True,
+            delta=False, keylength=1, inconsistency_fatal=True)
+        vf = factory(t)
+        self.addCleanup(groupcompress.cleanup_pack_group, vf)
+        return vf
+
+    def test_max_bytes_to_index_default(self):
+        vf = self.make_test_vf()
+        gc = vf._make_group_compressor()
+        self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
+                         vf._max_bytes_to_index)
+        if isinstance(gc, groupcompress.PyrexGroupCompressor):
+            self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
+                             gc._delta_index._max_bytes_to_index)
+
+    def test_max_bytes_to_index_in_config(self):
+        c = config.GlobalConfig()
+        c.set_user_option('bzr.groupcompress.max_bytes_to_index', '10000')
+        vf = self.make_test_vf()
+        gc = vf._make_group_compressor()
+        self.assertEqual(10000, vf._max_bytes_to_index)
+        if isinstance(gc, groupcompress.PyrexGroupCompressor):
+            self.assertEqual(10000, gc._delta_index._max_bytes_to_index)
+
+    def test_max_bytes_to_index_bad_config(self):
+        c = config.GlobalConfig()
+        c.set_user_option('bzr.groupcompress.max_bytes_to_index', 'boogah')
+        vf = self.make_test_vf()
+        # TODO: This is triggering a warning, we might want to trap and make
+        #       sure it is readable.
+        gc = vf._make_group_compressor()
+        self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
+                         vf._max_bytes_to_index)
+        if isinstance(gc, groupcompress.PyrexGroupCompressor):
+            self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
+                             gc._delta_index._max_bytes_to_index)
+
 
 class StubGCVF(object):
     def __init__(self, canned_get_blocks=None):
@@ -1046,6 +1106,54 @@
             self.assertEqual(self._texts[record.key],
                              record.get_bytes_as('fulltext'))
 
+    def test_manager_default_compressor_settings(self):
+        locations, old_block = self.make_block(self._texts)
+        manager = groupcompress._LazyGroupContentManager(old_block)
+        gcvf = groupcompress.GroupCompressVersionedFiles
+        # It doesn't greedily evaluate _max_bytes_to_index
+        self.assertIs(None, manager._compressor_settings)
+        self.assertEqual(gcvf._DEFAULT_COMPRESSOR_SETTINGS,
+                         manager._get_compressor_settings())
+
+    def test_manager_custom_compressor_settings(self):
+        locations, old_block = self.make_block(self._texts)
+        called = []
+        def compressor_settings():
+            called.append('called')
+            return (10,)
+        manager = groupcompress._LazyGroupContentManager(old_block,
+            get_compressor_settings=compressor_settings)
+        gcvf = groupcompress.GroupCompressVersionedFiles
+        # It doesn't greedily evaluate compressor_settings
+        self.assertIs(None, manager._compressor_settings)
+        self.assertEqual((10,), manager._get_compressor_settings())
+        self.assertEqual((10,), manager._get_compressor_settings())
+        self.assertEqual((10,), manager._compressor_settings)
+        # Only called 1 time
+        self.assertEqual(['called'], called)
+
+    def test__rebuild_handles_compressor_settings(self):
+        if not isinstance(groupcompress.GroupCompressor,
+                          groupcompress.PyrexGroupCompressor):
+            raise tests.TestNotApplicable('pure-python compressor'
+                ' does not handle compressor_settings')
+        locations, old_block = self.make_block(self._texts)
+        manager = groupcompress._LazyGroupContentManager(old_block,
+            get_compressor_settings=lambda: dict(max_bytes_to_index=32))
+        gc = manager._make_group_compressor()
+        self.assertEqual(32, gc._delta_index._max_bytes_to_index)
+        self.add_key_to_manager(('key3',), locations, old_block, manager)
+        self.add_key_to_manager(('key4',), locations, old_block, manager)
+        action, last_byte, total_bytes = manager._check_rebuild_action()
+        self.assertEqual('rebuild', action)
+        manager._rebuild_block()
+        new_block = manager._block
+        self.assertIsNot(old_block, new_block)
+        # Because of the new max_bytes_to_index, we do a poor job of
+        # rebuilding. This is a side-effect of the change, but at least it does
+        # show the setting had an effect.
+        self.assertTrue(old_block._content_length < new_block._content_length)
+
     def test_check_is_well_utilized_all_keys(self):
         block, manager = self.make_block_and_full_manager(self._texts)
         self.assertFalse(manager.check_is_well_utilized())

=== modified file 'doc/en/release-notes/bzr-2.4.txt'
--- a/doc/en/release-notes/bzr-2.4.txt	2011-05-16 11:17:40 +0000
+++ b/doc/en/release-notes/bzr-2.4.txt	2011-05-16 17:33:27 +0000
@@ -24,6 +24,15 @@
 
 .. New commands, options, etc that users may wish to try out.
 
+* The text compressor used for 2a repositories now has a tweakable
+  parameter that can be set in bazaar.conf.
+  ``bzr.groupcompress.max_entries_per_source`` default of 65536.
+  When doing compression, we build up an index of locations to match
+  against. Setting this higher will result in slightly better compression,
+  at a cost of more memory. Note that a value of 65k represents fully
+  sampling a 1MB file. So this only has an effect when compressing texts
+  larger than N*16 bytes. (John Arbash Meinel, #602614)
+
 Improvements
 ************
 




More information about the bazaar-commits mailing list