Rev 5758: Add a max_entries_per_source to DeltaIndex in http://bazaar.launchpad.net/~jameinel/bzr/2.4-max-entries-gc-602614

John Arbash Meinel john at arbash-meinel.com
Thu May 12 11:38:55 UTC 2011


At http://bazaar.launchpad.net/~jameinel/bzr/2.4-max-entries-gc-602614

------------------------------------------------------------
revno: 5758
revision-id: john at arbash-meinel.com-20110512113839-reasdnirl4889ptb
parent: john at arbash-meinel.com-20110511141536-mwg3yo304rhpe1nz
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.4-max-entries-gc-602614
timestamp: Thu 2011-05-12 13:38:39 +0200
message:
  Add a max_entries_per_source to DeltaIndex
  
  This changes the sampling rate in the create_delta_from_source.
  This isn't exposed higher up yet, but it work so far.
-------------- next part --------------
=== modified file 'bzrlib/_groupcompress_pyx.pyx'
--- a/bzrlib/_groupcompress_pyx.pyx	2011-03-22 15:58:09 +0000
+++ b/bzrlib/_groupcompress_pyx.pyx	2011-05-12 11:38:39 +0000
@@ -56,7 +56,8 @@
         DELTA_SIZE_TOO_BIG
     delta_result create_delta_index(source_info *src,
                                     delta_index *old,
-                                    delta_index **fresh) nogil
+                                    delta_index **fresh,
+                                    int max_entries) nogil
     delta_result create_delta_index_from_delta(source_info *delta,
                                                delta_index *old,
                                                delta_index **fresh) nogil
@@ -70,6 +71,10 @@
                                      unsigned char *top) nogil
     unsigned long sizeof_delta_index(delta_index *index)
     Py_ssize_t DELTA_SIZE_MIN
+    int get_hash_offset(delta_index *index, int pos, unsigned int *hash_offset)
+    int get_entry_summary(delta_index *index, int pos,
+                          unsigned int *global_offset, unsigned int *hash_val)
+    unsigned int c_rabin_hash "rabin_hash" (char *data)
 
 
 cdef void *safe_malloc(size_t count) except NULL:
@@ -113,6 +118,15 @@
     return AssertionError("Unrecognised delta result code: %d" % result)
 
 
+def rabin_hash(content):
+    if not PyString_CheckExact(content):
+        raise ValueError('content must be a string')
+    if len(content) < 16:
+        raise ValueError('content must be at least 16 bytes long')
+    # Try to cast it to an int, if it can fit
+    return int(c_rabin_hash(PyString_AS_STRING(content)))
+
+
 cdef class DeltaIndex:
 
     # We need Pyrex 0.9.8+ to understand a 'list' definition, and this object
@@ -123,14 +137,18 @@
     cdef delta_index *_index
     cdef public unsigned long _source_offset
     cdef readonly unsigned int _max_num_sources
+    cdef public int _max_entries_per_source
 
-    def __init__(self, source=None):
+    def __init__(self, source=None, max_entries_per_source=None):
         self._sources = []
         self._index = NULL
         self._max_num_sources = 65000
         self._source_infos = <source_info *>safe_malloc(sizeof(source_info)
                                                         * self._max_num_sources)
         self._source_offset = 0
+        self._max_entries_per_source = 0
+        if max_entries_per_source is not None:
+            self._max_entries_per_source = max_entries_per_source
 
         if source is not None:
             self.add_source(source, 0)
@@ -164,6 +182,44 @@
     def _has_index(self):
         return (self._index != NULL)
 
+    def _dump_index(self):
+        """Dump the pointers in the index.
+
+        This is an arbitrary layout, used for testing. It is not meant to be
+        used in production code.
+
+        :return: (hash_list, entry_list)
+            hash_list   A list of offsets, so hash[i] points to the 'hash
+                        bucket' starting at the given offset and going until
+                        hash[i+1]
+            entry_list  A list of (text_offset, hash_val). text_offset is the
+                        offset in the "source" texts, and hash_val is the RABIN
+                        hash for that offset.
+                        Note that the entry should be in the hash bucket
+                        defined by
+                        hash[(hash_val & mask)] && hash[(hash_val & mask) + 1]
+        """
+        cdef int pos
+        cdef unsigned int text_offset
+        cdef unsigned int hash_val
+        cdef unsigned int hash_offset
+        if self._index == NULL:
+            return None
+        hash_list = []
+        pos = 0
+        while get_hash_offset(self._index, pos, &hash_offset):
+            hash_list.append(int(hash_offset))
+            pos += 1
+        entry_list = []
+        pos = 0
+        while get_entry_summary(self._index, pos, &text_offset, &hash_val):
+            # Map back using 'int' so that we don't get Long everywhere, when
+            # almost everything is <2**31.
+            val = tuple(map(int, [text_offset, hash_val]))
+            entry_list.append(val)
+            pos += 1
+        return hash_list, entry_list
+
     def add_delta_source(self, delta, unadded_bytes):
         """Add a new delta to the source texts.
 
@@ -207,6 +263,10 @@
         :param source: The text in question, this must be a byte string
         :param unadded_bytes: Assume there are this many bytes that didn't get
             added between this source and the end of the previous source.
+        :param max_pointers: Add no more than this many entries to the index.
+            By default, we sample every 16 bytes, if that would require more
+            than max_entries, we will reduce the sampling rate.
+            A value of 0 means unlimited, None means use the default limit.
         """
         cdef char *c_source
         cdef Py_ssize_t c_source_size
@@ -215,6 +275,7 @@
         cdef unsigned int source_location
         cdef source_info *src
         cdef unsigned int num_indexes
+        cdef int max_num_entries
 
         if not PyString_CheckExact(source):
             raise TypeError('source is not a str')
@@ -237,7 +298,8 @@
         # We delay creating the index on the first insert
         if source_location != 0:
             with nogil:
-                res = create_delta_index(src, self._index, &index)
+                res = create_delta_index(src, self._index, &index,
+                                         self._max_entries_per_source)
             if res != DELTA_OK:
                 raise _translate_delta_failure(res)
             if index != self._index:
@@ -254,7 +316,8 @@
         # We know that self._index is already NULL, so create_delta_index
         # will always create a new index unless there's a malloc failure
         with nogil:
-            res = create_delta_index(&self._source_infos[0], NULL, &index)
+            res = create_delta_index(&self._source_infos[0], NULL, &index,
+                                     self._max_entries_per_source)
         if res != DELTA_OK:
             raise _translate_delta_failure(res)
         self._index = index

=== modified file 'bzrlib/delta.h'
--- a/bzrlib/delta.h	2011-03-22 00:33:21 +0000
+++ b/bzrlib/delta.h	2011-05-12 11:38:39 +0000
@@ -21,7 +21,6 @@
                                  aggregate source */
 };
 
-
 /* result type for functions that have multiple failure modes */
 typedef enum {
     DELTA_OK,             /* Success */
@@ -42,11 +41,16 @@
  * free_delta_index().  Other values are a failure, and *fresh is unset.
  * The given buffer must not be freed nor altered before free_delta_index() is
  * called. The resultant struct must be freed using free_delta_index().
+ *
+ * :param max_entries: Limit the number of regions to sample to this amount.
+ *      Useful if src can be unbounded in size, but you are willing to trade
+ *      match accuracy for peak memory.
  */
 extern delta_result
 create_delta_index(const struct source_info *src,
                    struct delta_index *old,
-                   struct delta_index **fresh);
+                   struct delta_index **fresh,
+                   int max_entries);
 
 
 /*
@@ -118,4 +122,32 @@
     return size;
 }
 
+/*
+ * Return the basic information about a given delta index.
+ * :param index: The delta_index object
+ * :param pos: The offset in the entry list. Start at 0, and walk until you get
+ *      0 as a return code.
+ * :param global_offset: return value, distance to the beginning of all sources
+ * :param hash_val: return value, the RABIN hash associated with this pointer
+ * :param hash_offset: Location for this entry in the hash array.
+ * :return: 1 if pos != -1 (there was data produced)
+ */
+extern int
+get_entry_summary(const struct delta_index *index, int pos,
+                  unsigned int *text_offset, unsigned int *hash_val);
+
+/*
+ * Determine what entry index->hash[X] points to.
+ */
+extern int
+get_hash_offset(const struct delta_index *index, int pos,
+                unsigned int *entry_offset);
+
+/*
+ * Compute the rabin_hash of the given data, it is assumed the data is at least
+ * RABIN_WINDOW wide (16 bytes).
+ */
+extern unsigned int
+rabin_hash(const unsigned char *data);
+
 #endif

=== modified file 'bzrlib/diff-delta.c'
--- a/bzrlib/diff-delta.c	2011-04-05 14:26:43 +0000
+++ b/bzrlib/diff-delta.c	2011-05-12 11:38:39 +0000
@@ -29,7 +29,6 @@
 
 #define RABIN_SHIFT 23
 #define RABIN_WINDOW 16
-#define MAX_NUM_ENTRIES 10000
 
 /* The hash map is sized to put 4 entries per bucket, this gives us ~even room
  * for more data. Tweaking this number above 4 doesn't seem to help much,
@@ -377,7 +376,8 @@
 delta_result
 create_delta_index(const struct source_info *src,
                    struct delta_index *old,
-                   struct delta_index **fresh)
+                   struct delta_index **fresh,
+                   int max_entries)
 {
     unsigned int i, hsize, hmask, num_entries, prev_val, *hash_count;
     unsigned int total_num_entries, stride;
@@ -392,17 +392,16 @@
     buffer = src->buf;
 
     /* Determine index hash size.  Note that indexing skips the
-       first byte to allow for optimizing the Rabin's polynomial
-       initialization in create_delta(). */
+       first byte so we subtract 1 to get the edge cases right.
+     */
     stride = RABIN_WINDOW;
     num_entries = (src->size - 1)  / RABIN_WINDOW;
-    if (num_entries > MAX_NUM_ENTRIES) {
+    if (max_entries > 0 && num_entries > max_entries) {
         /* Limit the max number of matching entries. This reduces the 'best'
          * possible match, but means we don't consume all of ram.
          */
-        // fprintf(stderr, "limiting num_entries to %d\n", MAX_NUM_ENTRIES);
-        num_entries = MAX_NUM_ENTRIES;
-        stride = (src->size) / num_entries;
+        num_entries = max_entries;
+        stride = (src->size - 1) / num_entries;
     }
     if (old != NULL)
         total_num_entries = num_entries + old->num_entries;
@@ -486,7 +485,7 @@
                        unsigned int hsize)
 {
     unsigned int hash_offset, hmask, memsize;
-    struct index_entry *entry, *last_entry;
+    struct index_entry *entry;
     struct index_entry_linked_list *out_entry, **hash;
     void *mem;
 
@@ -506,7 +505,6 @@
     /* We know that entries are in the order we want in the output, but they
      * aren't "grouped" by hash bucket yet.
      */
-    last_entry = entries + num_entries;
     for (entry = entries + num_entries - 1; entry >= entries; --entry) {
         hash_offset = entry->val & hmask;
         out_entry->p_entry = entry;
@@ -531,7 +529,7 @@
     unsigned int i, j, hsize, hmask, total_num_entries;
     struct delta_index *index;
     struct index_entry *entry, *packed_entry, **packed_hash;
-    struct index_entry *last_entry, null_entry = {0};
+    struct index_entry null_entry = {0};
     void *mem;
     unsigned long memsize;
     struct index_entry_linked_list *unpacked_entry, **mini_hash;
@@ -582,7 +580,6 @@
         free(index);
         return NULL;
     }
-    last_entry = entries + num_entries;
     for (i = 0; i < hsize; i++) {
         /*
          * Coalesce all entries belonging in one hash bucket
@@ -1110,5 +1107,74 @@
     return DELTA_OK;
 }
 
+
+int
+get_entry_summary(const struct delta_index *index, int pos,
+                  unsigned int *text_offset, unsigned int *hash_val)
+{
+    int hsize;
+    const struct index_entry *entry;
+    const struct index_entry *start_of_entries;
+    unsigned int offset;
+    if (pos < 0 || text_offset == NULL || hash_val == NULL
+        || index == NULL)
+    {
+        return 0;
+    }
+    hsize = index->hash_mask + 1;
+    start_of_entries = (struct index_entry *)(((struct index_entry **)index->hash) + (hsize + 1));
+    entry = start_of_entries + pos;
+    if (entry > index->last_entry) {
+        return 0;
+    }
+    if (entry->ptr == NULL) {
+        *text_offset = 0;
+        *hash_val = 0;
+    } else {
+        offset = entry->src->agg_offset;
+        offset += (entry->ptr - ((unsigned char *)entry->src->buf));
+        *text_offset = offset;
+        *hash_val = entry->val;
+    }
+    return 1;
+}
+
+
+int
+get_hash_offset(const struct delta_index *index, int pos,
+                unsigned int *entry_offset)
+{
+    int hsize;
+    const struct index_entry *entry;
+    const struct index_entry *start_of_entries;
+    if (pos < 0 || index == NULL || entry_offset == NULL)
+    {
+        return 0;
+    }
+    hsize = index->hash_mask + 1;
+    start_of_entries = (struct index_entry *)(((struct index_entry **)index->hash) + (hsize + 1));
+    if (pos >= hsize) {
+        return 0;
+    }
+    entry = index->hash[pos];
+    if (entry == NULL) {
+        *entry_offset = -1;
+    } else {
+        *entry_offset = (entry - start_of_entries);
+    }
+    return 1;
+}
+
+
+unsigned int
+rabin_hash(const unsigned char *data)
+{
+    int i;
+    unsigned int val = 0;
+    for (i = 0; i < RABIN_WINDOW; i++)
+        val = ((val << 8) | data[i]) ^ T[val >> RABIN_SHIFT];
+    return val;
+}
+
 /* vim: et ts=4 sw=4 sts=4
  */

=== modified file 'bzrlib/tests/test__groupcompress.py'
--- a/bzrlib/tests/test__groupcompress.py	2011-01-12 01:01:53 +0000
+++ b/bzrlib/tests/test__groupcompress.py	2011-05-12 11:38:39 +0000
@@ -264,6 +264,63 @@
         di = self._gc_module.DeltaIndex('test text\n')
         self.assertEqual('DeltaIndex(1, 10)', repr(di))
 
+    def test__dump_no_index(self):
+        di = self._gc_module.DeltaIndex()
+        self.assertEqual(None, di._dump_index())
+
+    def test__dump_index_simple(self):
+        di = self._gc_module.DeltaIndex()
+        di.add_source(_text1, 0)
+        self.assertFalse(di._has_index())
+        self.assertEqual(None, di._dump_index())
+        _ = di.make_delta(_text1)
+        self.assertTrue(di._has_index())
+        hash_list, entry_list = di._dump_index()
+        self.assertEqual(16, len(hash_list))
+        self.assertEqual(68, len(entry_list))
+        just_entries = [(idx, text_offset, hash_val)
+                        for idx, (text_offset, hash_val)
+                         in enumerate(entry_list)
+                         if text_offset != 0 or hash_val != 0]
+        rabin_hash = self._gc_module.rabin_hash
+        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
+                          (25, 48, rabin_hash(_text1[33:49])),
+                          (34, 32, rabin_hash(_text1[17:33])),
+                          (47, 64, rabin_hash(_text1[49:65])),
+                         ], just_entries)
+        # This ensures that the hash map points to the location we expect it to
+        for entry_idx, text_offset, hash_val in just_entries:
+            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
+
+    def test__dump_index_two_sources(self):
+        di = self._gc_module.DeltaIndex()
+        di.add_source(_text1, 0)
+        di.add_source(_text2, 2)
+        start2 = len(_text1) + 2
+        self.assertTrue(di._has_index())
+        hash_list, entry_list = di._dump_index()
+        self.assertEqual(16, len(hash_list))
+        self.assertEqual(68, len(entry_list))
+        just_entries = [(idx, text_offset, hash_val)
+                        for idx, (text_offset, hash_val)
+                         in enumerate(entry_list)
+                         if text_offset != 0 or hash_val != 0]
+        rabin_hash = self._gc_module.rabin_hash
+        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
+                          (9, start2+16, rabin_hash(_text2[1:17])),
+                          (25, 48, rabin_hash(_text1[33:49])),
+                          (30, start2+64, rabin_hash(_text2[49:65])),
+                          (34, 32, rabin_hash(_text1[17:33])),
+                          (35, start2+32, rabin_hash(_text2[17:33])),
+                          (43, start2+48, rabin_hash(_text2[33:49])),
+                          (47, 64, rabin_hash(_text1[49:65])),
+                         ], just_entries)
+        # Each entry should be in the appropriate hash bucket.
+        for entry_idx, text_offset, hash_val in just_entries:
+            hash_idx = hash_val & 0xf
+            self.assertTrue(
+                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
+
     def test_first_add_source_doesnt_index_until_make_delta(self):
         di = self._gc_module.DeltaIndex()
         self.assertFalse(di._has_index())
@@ -275,6 +332,27 @@
         self.assertTrue(di._has_index())
         self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
+    def test_add_source_max_entries(self):
+        di = self._gc_module.DeltaIndex()
+        di._max_entries_per_source = 3
+        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
+        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
+        start2 = len(_text1) + 3
+        hash_list, entry_list = di._dump_index()
+        self.assertEqual(16, len(hash_list))
+        self.assertEqual(67, len(entry_list))
+        just_entries = sorted([(text_offset, hash_val)
+                               for text_offset, hash_val in entry_list
+                                if text_offset != 0 or hash_val != 0])
+        rabin_hash = self._gc_module.rabin_hash
+        self.assertEqual([(25, rabin_hash(_text1[10:26])),
+                          (50, rabin_hash(_text1[35:51])),
+                          (75, rabin_hash(_text1[60:76])),
+                          (start2+44, rabin_hash(_text3[29:45])),
+                          (start2+88, rabin_hash(_text3[73:89])),
+                          (start2+132, rabin_hash(_text3[117:133])),
+                         ], just_entries)
+
     def test_second_add_source_triggers_make_index(self):
         di = self._gc_module.DeltaIndex()
         self.assertFalse(di._has_index())



More information about the bazaar-commits mailing list