Rev 43: Change so that regions that have lots of copies get converted back in http://bazaar.launchpad.net/%7Ejameinel/bzr-groupcompress/experimental

John Arbash Meinel john at arbash-meinel.com
Fri Feb 20 03:52:43 GMT 2009


At http://bazaar.launchpad.net/%7Ejameinel/bzr-groupcompress/experimental

------------------------------------------------------------
revno: 43
revision-id: john at arbash-meinel.com-20090220035205-nc0wvvys4zyq7ubs
parent: john at arbash-meinel.com-20090219210803-b2pxww2xi0muwgvv
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: experimental
timestamp: Thu 2009-02-19 21:52:05 -0600
message:
  Change so that regions that have lots of copies get converted back
  into an insertion.
  This does get triggered, but it doesn't help. The total compression is 17MB,
  and the conversion time is 10min. Which is equivalent to the original values.
  
  
  Even further, don't match blocks that are shorter than XX bytes (currently 10).
  With a value of 5, we still get trivial blocks inserted. With a value of 10,
  everything changes to copies.
  
  Dropping the max block size to 8MB decreases the total bytes to 14MB (presumably
  because the copy records now have 1 fewer byte per record). It also makes it 9m versus 10m.
  Preferentially splitting based on file-id (at >= 4MB) stays at 9min, but
  drops it to 13MB
-------------- next part --------------
=== modified file 'groupcompress.py'
--- a/groupcompress.py	2009-02-19 21:08:03 +0000
+++ b/groupcompress.py	2009-02-20 03:52:05 +0000
@@ -158,7 +158,7 @@
         self.labels_deltas = {}
         self._present_prefixes = set()
 
-    def get_matching_blocks(self, lines):
+    def get_matching_blocks(self, lines, soft=False):
         """Return an the ranges in lines which match self.lines.
 
         :param lines: lines to compress
@@ -176,15 +176,30 @@
         locations = None
         max_pos = len(lines)
         result_append = result.append
+        min_match_bytes = 10
+        if soft:
+            min_match_bytes = 200
         while pos < max_pos:
             block, pos, locations = _get_longest_match(line_locations, pos,
                                                        max_pos, locations)
             if block is not None:
+                # Check to see if we are matching fewer than 5 characters,
+                # which is turned into a simple 'insert', rather than a copy
+                # If we have more than 5 lines, we definitely have more than 5
+                # chars
+                if block[-1] < min_match_bytes:
+                    # This block may be a 'short' block, check
+                    old_start, new_start, range_len = block
+                    matched_bytes = sum(map(len,
+                        lines[new_start:new_start + range_len]))
+                    if matched_bytes < min_match_bytes:
+                        block = None
+            if block is not None:
                 result_append(block)
         result_append((len(self.lines), len(lines), 0))
         return result
 
-    def compress(self, key, lines, expected_sha):
+    def compress(self, key, lines, expected_sha, soft=False):
         """Compress lines with label key.
 
         :param key: A key tuple. It is stored in the output
@@ -196,6 +211,8 @@
         :param expected_sha: If non-None, the sha the lines are blieved to
             have. During compression the sha is calculated; a mismatch will
             cause an error.
+        :param soft: Do a 'soft' compression. This means that we require larger
+            ranges to match to be considered for a copy command.
         :return: The sha1 of lines, and the number of bytes accumulated in
             the group output so far.
         """
@@ -203,50 +220,44 @@
         if key[-1] is None:
             key = key[:-1] + ('sha1:' + sha1,)
         label = '\x00'.join(key)
+        new_lines = []
+        new_lines.append('label: %s\n' % label)
+        new_lines.append('sha1: %s\n' % sha1)
+        index_lines = [False, False]
         # setup good encoding for trailing \n support.
-        if not lines:
-            lines_is_empty = True
-        else:
-            lines_is_empty = False
         if not lines or lines[-1].endswith('\n'):
             lines.append('\n')
         else:
             lines[-1] = lines[-1] + '\n'
-        new_lines = []
-        new_lines.append('label: %s\n' % label)
-        new_lines.append('sha1: %s\n' % sha1)
-        index_lines = [False, False]
         pos = 0
         range_len = 0
         range_start = 0
         flush_range = self.flush_range
         copy_ends = None
-        blocks = None
-        if lines_is_empty:
-            # Empty texts are given a simple 'i1\n\n' insertion instruction.
-            # This prevents us from trying to match against an empty text.
-            blocks = [(0, len(lines), 0)]
-        if blocks is None and len(key) > 1:
-            prefix = key[0]
-            if prefix not in self._present_prefixes:
-                self._present_prefixes.add(prefix)
-                # Mark this as not matching anything
-                blocks = [(0, len(lines), 0)]
-        if blocks is None:
-            blocks = self.get_matching_blocks(lines)
+        blocks = self.get_matching_blocks(lines, soft=soft)
         current_pos = 0
-        # We either copy a range (while there are reusable lines) or we 
-        # insert new lines. To find reusable lines we traverse 
+        copies_without_insertion = []
+        # We either copy a range (while there are reusable lines) or we
+        # insert new lines. To find reusable lines we traverse
         for old_start, new_start, range_len in blocks:
             if new_start != current_pos:
+                # if copies_without_insertion:
+                #     self.flush_multi(copies_without_insertion,
+                #                      lines, new_lines, index_lines)
+                #     copies_without_insertion = []
                 # non-matching region
                 flush_range(current_pos, None, new_start - current_pos,
                     lines, new_lines, index_lines)
             current_pos = new_start + range_len
             if not range_len:
                 continue
-            flush_range(new_start, old_start, range_len, lines,
-                new_lines, index_lines)
+            # copies_without_insertion.append((new_start, old_start, range_len))
+            flush_range(new_start, old_start, range_len,
+                        lines, new_lines, index_lines)
+        # if copies_without_insertion:
+        #     self.flush_multi(copies_without_insertion,
+        #                      lines, new_lines, index_lines)
+        #     copies_without_insertion = []
         delta_start = (self.endpoint, len(self.lines))
         self.output_lines(new_lines, index_lines)
         trim_encoding_newline(lines)
@@ -271,6 +282,25 @@
         sha1 = sha_strings(lines)
         return lines, sha1
 
+    def flush_multi(self, instructions, lines, new_lines, index_lines):
+        """Flush a bunch of different ranges out.
+
+        This should only be called with data that are "pure" copies.
+        """
+        flush_range = self.flush_range
+        if len(instructions) > 2:
+            # This is the number of lines to be copied
+            total_copy_range = sum(i[2] for i in instructions)
+            if len(instructions) > 0.5 * total_copy_range:
+                # We are copying N lines, but taking more than N/2
+                # copy instructions to do so. We will go ahead and expand this
+                # text so that other code is able to match against it
+                flush_range(instructions[0][0], None, total_copy_range,
+                            lines, new_lines, index_lines)
+                return
+        for ns, os, rl in instructions:
+            flush_range(ns, os, rl, lines, new_lines, index_lines)
+
     def flush_range(self, range_start, copy_start, range_len, lines, new_lines, index_lines):
         insert_instruction = "i,%d\n" % range_len
         if copy_start is not None:
@@ -492,6 +522,17 @@
                     result[key] = self._unadded_refs[key]
         return result
 
+    def _get_delta_lines(self, key):
+        """Helper function for manual debugging.
+
+        This is a convenience function that shouldn't be used in production
+        code.
+        """
+        build_details = self._index.get_build_details([key])[key]
+        index_memo = build_details[0]
+        group, delta_lines = self._get_group_and_delta_lines(index_memo)
+        return delta_lines
+
     def _get_group_and_delta_lines(self, index_memo):
         read_memo = index_memo[0:3]
         # get the group:
@@ -581,6 +622,8 @@
                 if label != key:
                     raise AssertionError("wrong key: %r, wanted %r" % (label, key))
                 lines = apply_delta(plain, delta)
+                if sha_strings(lines) != sha1:
+                    raise AssertionError('sha1 sum did not match')
             yield ChunkedContentFactory(key, parents, sha1, lines)
 
     def get_sha1s(self, keys):
@@ -640,6 +683,7 @@
             for key, reads, refs in keys_to_add:
                 nodes.append((key, "%d %d %s" % (start, length, reads), refs))
             self._index.add_records(nodes, random_id=random_id)
+        last_prefix = None
         for record in stream:
             # Raise an error when a record is missing.
             if record.storage_kind == 'absent':
@@ -652,22 +696,21 @@
                 bytes = adapter.get_bytes(record,
                     record.get_bytes_as(record.storage_kind))
                 lines = osutils.split_lines(bytes)
+            soft = False
             if len(record.key) > 1:
                 prefix = record.key[0]
-                if (prefix not in self._compressor._present_prefixes
-                    and basis_end > 1024 * 1024 * 10):
-                    # This is a new file id we are inserting.
-                    # And the file is already more than half full. This record
-                    # would be added as full lines, so go ahead and start a new
-                    # group
-                    flush()
-                    self._compressor = GroupCompressor(self._delta)
-                    self._unadded_refs = {}
-                    keys_to_add = []
-                    basis_end = 0
-                    groups += 1
+                if (last_prefix is not None and prefix != last_prefix):
+                    soft = True
+                    if basis_end > 1024 * 1024 * 4:
+                        flush()
+                        self._compressor = GroupCompressor(self._delta)
+                        self._unadded_refs = {}
+                        keys_to_add = []
+                        basis_end = 0
+                        groups += 1
+                last_prefix = prefix
             found_sha1, end_point = self._compressor.compress(record.key,
-                lines, record.sha1)
+                lines, record.sha1, soft=soft)
             if record.key[-1] is None:
                 key = record.key[:-1] + ('sha1:' + found_sha1,)
             else:
@@ -677,7 +720,7 @@
             keys_to_add.append((key, '%d %d' % (basis_end, end_point),
                 (record.parents,)))
             basis_end = end_point
-            if basis_end > 1024 * 1024 * 20:
+            if basis_end > 1024 * 1024 * 8:
                 flush()
                 self._compressor = GroupCompressor(self._delta)
                 self._unadded_refs = {}



More information about the bazaar-commits mailing list