Rev 3879: Merge the dupes and empty record handling. in http://bzr.arbash-meinel.com/branches/bzr/brisbane/hack3

John Arbash Meinel john at arbash-meinel.com
Fri Mar 20 17:12:33 GMT 2009


At http://bzr.arbash-meinel.com/branches/bzr/brisbane/hack3

------------------------------------------------------------
revno: 3879
revision-id: john at arbash-meinel.com-20090320170430-fvgnzc1amp7rmd11
parent: john at arbash-meinel.com-20090320154811-znms4757w29gmc4b
parent: john at arbash-meinel.com-20090319225110-hfslu08ridcsc5xi
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: hack3
timestamp: Fri 2009-03-20 12:04:30 -0500
message:
  Merge the dupes and empty record handling.
modified:
  bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
  bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 3877.1.2
    revision-id: john at arbash-meinel.com-20090319225110-hfslu08ridcsc5xi
    parent: john at arbash-meinel.com-20090319223546-odxel2nktm700d7e
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: hack3
    timestamp: Thu 2009-03-19 17:51:10 -0500
    message:
      Change .compress() to return the start-point.
      
      Now that we can return an entry pointing at an existing entry
      we need to record that correctly in the index.
      It also helps for the 'null' entries.
    modified:
      bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
    ------------------------------------------------------------
    revno: 3877.1.1
    revision-id: john at arbash-meinel.com-20090319223546-odxel2nktm700d7e
    parent: john at arbash-meinel.com-20090319194720-4esxj7gnrmfaykww
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: hack3
    timestamp: Thu 2009-03-19 17:35:46 -0500
    message:
      Some hacky code to check for:
      a) empty records, like directories and symlinks. This might decrease index pressure
      a tiny bit, as they can be recorded as (0, 0, 0, 0).
      b) duplicate entries in the same gc block.
      Rather than insert another copy of the same delta (or a delta to the identical text).
      
      We seem to get quite a few hits for both (b) and (a), so it is probably worth looking
      closer at them.
    modified:
      bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
      bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2009-03-20 15:48:11 +0000
+++ b/bzrlib/groupcompress.py	2009-03-20 17:04:30 +0000
@@ -730,12 +730,13 @@
        left side.
     """
 
-    def __init__(self):
+    def __init__(self, check_for_dupes=False):
         """Create a GroupCompressor."""
         # Consider seeding the lines with some sort of GC Start flag, or
         # putting it as part of the output stream, rather than in the
         # compressed bytes.
         self.lines = []
+        self._check_for_dupes = check_for_dupes
         self.endpoint = 0
         self.input_bytes = 0
         self.num_keys = 0
@@ -743,6 +744,9 @@
         self._last = None
         self._delta_index = _groupcompress_pyx.DeltaIndex()
         self._block = GroupCompressBlock()
+        self._entries_by_sha1 = {}
+        self._empty_entries = 0
+        self._deduped_entries = 0
 
     def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
         """Compress lines with label key.
@@ -763,10 +767,18 @@
             the group output so far.
         :seealso VersionedFiles.add_lines:
         """
+        if not bytes: # empty, like a dir entry, etc
+            self._empty_entries += 1
+            self._block.add_entry(key, type='empty',
+                                  sha1=None, start=0,
+                                  length=0)
+            return None, 0, 0, 'fulltext', 0
         sha1 = None
         # we assume someone knew what they were doing when they passed it in
         if expected_sha is not None:
             sha1 = expected_sha
+        if self._check_for_dupes and sha1 is None:
+            sha1 = osutils.sha_string(bytes)
         if nostore_sha is not None:
             if sha1 is None:
                 sha1 = osutils.sha_string(bytes)
@@ -790,6 +802,13 @@
             raise AssertionError('_source_offset != endpoint'
                 ' somehow the DeltaIndex got out of sync with'
                 ' the output lines')
+        if self._check_for_dupes and sha1 in self._entries_by_sha1:
+            self._deduped_entries += 1
+            (type, start, length) = self._entries_by_sha1[sha1]
+            self._block.add_entry(key, type=type,
+                                  sha1=sha1, start=start,
+                                  length=length)
+            return sha1, start, start+length, 'sha1_dupe', length
         max_delta_size = len(bytes) / 2
         delta = self._delta_index.make_delta(bytes, max_delta_size)
         if (delta is None):
@@ -811,6 +830,9 @@
                 self._delta_index.add_delta_source(delta, len_mini_header)
         self._block.add_entry(key, type=type, sha1=sha1,
                               start=self.endpoint, length=length)
+        if self._check_for_dupes:
+            self._entries_by_sha1[sha1] = (type, self.endpoint, length)
+        start = self.endpoint
         delta_start = (self.endpoint, len(self.lines))
         self.num_keys += 1
         self.output_chunks(new_chunks)
@@ -821,7 +843,7 @@
             raise AssertionError('the delta index is out of sync'
                 'with the output lines %s != %s'
                 % (self._delta_index._source_offset, self.endpoint))
-        return sha1, self.endpoint, type, length
+        return sha1, start, self.endpoint, type, length
 
     def extract(self, key):
         """Extract a key previously added to the compressor.
@@ -940,7 +962,8 @@
 class GroupCompressVersionedFiles(VersionedFiles):
     """A group-compress based VersionedFiles implementation."""
 
-    def __init__(self, index, access, delta=True):
+    def __init__(self, index, access, delta=True,
+                 check_for_dupes=False):
         """Create a GroupCompressVersionedFiles object.
 
         :param index: The index object storing access and graph data.
@@ -951,6 +974,7 @@
         self._access = access
         self._delta = delta
         self._unadded_refs = {}
+        self._check_for_dupes = check_for_dupes
         self._group_cache = LRUSizeCache(max_size=50*1024*1024)
         self._fallback_vfs = []
 
@@ -1384,11 +1408,19 @@
                 return adapter
         # This will go up to fulltexts for gc to gc fetching, which isn't
         # ideal.
-        self._compressor = GroupCompressor()
+        self._compressor = GroupCompressor(self._check_for_dupes)
         self._unadded_refs = {}
         keys_to_add = []
-        basis_end = 0
+        total_deduped = [0, 0]
         def flush():
+            if (self._compressor._deduped_entries > 0
+                or self._compressor._empty_entries > 0):
+                total_deduped[0] += self._compressor._deduped_entries
+                total_deduped[1] += self._compressor._empty_entries
+                trace.note('Dedupped %d out of %d entries, %d empty',
+                           self._compressor._deduped_entries,
+                           len(self._compressor._block._entries),
+                           self._compressor._empty_entries)
             bytes = self._compressor.flush().to_bytes()
             index, start, length = self._access.add_raw_records(
                 [(None, len(bytes))], bytes)[0]
@@ -1398,7 +1430,7 @@
             self._index.add_records(nodes, random_id=random_id)
             self._unadded_refs = {}
             del keys_to_add[:]
-            self._compressor = GroupCompressor()
+            self._compressor = GroupCompressor(self._check_for_dupes)
 
         last_prefix = None
         last_fulltext_len = None
@@ -1448,7 +1480,7 @@
             if max_fulltext_len < len(bytes):
                 max_fulltext_len = len(bytes)
                 max_fulltext_prefix = prefix
-            (found_sha1, end_point, type,
+            (found_sha1, start_point, end_point, type,
              length) = self._compressor.compress(record.key,
                 bytes, record.sha1, soft=soft,
                 nostore_sha=nostore_sha)
@@ -1496,9 +1528,8 @@
             if start_new_block:
                 self._compressor.pop_last()
                 flush()
-                basis_end = 0
                 max_fulltext_len = len(bytes)
-                (found_sha1, end_point, type,
+                (found_sha1, start_point, end_point, type,
                  length) = self._compressor.compress(record.key,
                     bytes, record.sha1)
                 last_fulltext_len = length
@@ -1508,12 +1539,14 @@
                 key = record.key
             self._unadded_refs[key] = record.parents
             yield found_sha1
-            keys_to_add.append((key, '%d %d' % (basis_end, end_point),
+            keys_to_add.append((key, '%d %d' % (start_point, end_point),
                 (record.parents,)))
-            basis_end = end_point
         if len(keys_to_add):
             flush()
         self._compressor = None
+        if total_deduped[0] > 0 or total_deduped[1] > 0:
+            trace.note('Total deduped = %d, total empty = %d\n',
+                       total_deduped[0], total_deduped[1])
 
     def iter_lines_added_or_present_in_keys(self, keys, pb=None):
         """Iterate over the lines in the versioned files from keys.

=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py	2009-03-20 15:48:11 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py	2009-03-20 17:04:30 +0000
@@ -348,13 +348,17 @@
                 indices.append(sub_index)
             index = _mod_index.CombinedGraphIndex(indices)
             add_callback = None
+        check_for_dupes = False
+        if index_name == 'text_index':
+            check_for_dupes = True
         vf = GroupCompressVersionedFiles(
             _GCGraphIndex(index,
                           add_callback=add_callback,
                           parents=parents,
                           is_locked=self._pack_collection.repo.is_locked),
             access=access,
-            delta=delta)
+            delta=delta,
+            check_for_dupes=check_for_dupes)
         return vf
 
     def _build_vfs(self, index_name, parents, delta):



More information about the bazaar-commits mailing list