Rev 52: Merge in the latest updates to the gc trunk. in http://bzr.arbash-meinel.com/plugins/groupcompress_rabin

John Arbash Meinel john at arbash-meinel.com
Fri Feb 27 16:07:47 GMT 2009


At http://bzr.arbash-meinel.com/plugins/groupcompress_rabin

------------------------------------------------------------
revno: 52
revision-id: john at arbash-meinel.com-20090227160746-1gt1m20vqk7i273c
parent: john at arbash-meinel.com-20090227160650-iv1rpvxsqejydxj7
parent: john at arbash-meinel.com-20090227051839-841q6ss4z8zm1353
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: groupcompress_rabin
timestamp: Fri 2009-02-27 10:07:46 -0600
message:
  Merge in the latest updates to the gc trunk.
modified:
  groupcompress.py               groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
  repofmt.py                     repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
  tests/test_groupcompress.py    test_groupcompress.p-20080705181503-ccbxd6xuy1bdnrpu-13
    ------------------------------------------------------------
    revno: 48.1.7
    revision-id: john at arbash-meinel.com-20090227051839-841q6ss4z8zm1353
    parent: john at arbash-meinel.com-20090227051520-3bqqcchl92qup96h
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: trunk
    timestamp: Thu 2009-02-26 23:18:39 -0600
    message:
      Update a Note/Todo
    modified:
      repofmt.py                     repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 48.1.6
    revision-id: john at arbash-meinel.com-20090227051520-3bqqcchl92qup96h
    parent: john at arbash-meinel.com-20090227050931-bt0zwmqxrrheyosq
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: trunk
    timestamp: Thu 2009-02-26 23:15:20 -0600
    message:
      Try even harder, now with even *more* streams.
      The compressed size drops by another 4x.
      Turn the data for each *layer* into a different stream.
      With this change, gc255 has compressed inventory drop to 1.5MB
      which is finally *smaller* than the source 'knit' format.
    modified:
      repofmt.py                     repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 48.1.5
    revision-id: john at arbash-meinel.com-20090227050931-bt0zwmqxrrheyosq
    parent: john at arbash-meinel.com-20090227035733-h1gmn3ymofoxc7zm
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: trunk
    timestamp: Thu 2009-02-26 23:09:31 -0600
    message:
      As expected, splitting things up into streams of streams
      gives even better compression. (Down to 4.4MB for inventories).
      Probably the big win is that parent_id_basename content doesn't compress
      well at all versus id_to_entry content, and this way you don't
      get large offsets.
    modified:
      repofmt.py                     repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 48.1.4
    revision-id: john at arbash-meinel.com-20090227035733-h1gmn3ymofoxc7zm
    parent: john at arbash-meinel.com-20090227035442-4sk2vcl343hg9afv
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: trunk
    timestamp: Thu 2009-02-26 21:57:33 -0600
    message:
      Add a general progress indicator for other parts of copy.
    modified:
      repofmt.py                     repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 48.1.3
    revision-id: john at arbash-meinel.com-20090227035442-4sk2vcl343hg9afv
    parent: john at arbash-meinel.com-20090227033445-gk1o5rr6t0u0d14i
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: trunk
    timestamp: Thu 2009-02-26 21:54:42 -0600
    message:
      Add a progress indicator for chk pages.
      Fix a bug with handling signatures, which don't have a parent graph
    modified:
      groupcompress.py               groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
      repofmt.py                     repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 48.1.2
    revision-id: john at arbash-meinel.com-20090227033445-gk1o5rr6t0u0d14i
    parent: john at arbash-meinel.com-20090227032810-wughh2jot1eiq94b
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: trunk
    timestamp: Thu 2009-02-26 21:34:45 -0600
    message:
      Make it clear that the bits you get from 'apply_delta' are chunks, not lines.
    modified:
      groupcompress.py               groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
    ------------------------------------------------------------
    revno: 48.1.1
    revision-id: john at arbash-meinel.com-20090227032810-wughh2jot1eiq94b
    parent: john at arbash-meinel.com-20090225230422-4oigw03k7fq62eyb
    parent: john at arbash-meinel.com-20090227030449-r8cmknorcmlgaqtm
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: trunk
    timestamp: Thu 2009-02-26 21:28:10 -0600
    message:
      Merge the chk sorting code.
      Restore labels and sha1s in the stored data.
      Leave the 'extra' formats commented out for now.
    modified:
      groupcompress.py               groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
      repofmt.py                     repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
      tests/test_groupcompress.py    test_groupcompress.p-20080705181503-ccbxd6xuy1bdnrpu-13
    ------------------------------------------------------------
    revno: 45.1.6
    revision-id: john at arbash-meinel.com-20090227030449-r8cmknorcmlgaqtm
    parent: john at arbash-meinel.com-20090226224152-z4jiazt0gp1vsylk
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: experimental
    timestamp: Thu 2009-02-26 21:04:49 -0600
    message:
      Clustering chk pages properly makes a big difference.
      
      By iterating root nodes in the same order as the referencing inventory,
      and then iterating by search prefix, we get compression about 2:1 versus
      not compressing at all, which is probably 50% better than random ordering.
    modified:
      groupcompress.py               groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
      repofmt.py                     repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 45.1.5
    revision-id: john at arbash-meinel.com-20090226224152-z4jiazt0gp1vsylk
    parent: john at arbash-meinel.com-20090226220934-lnqvbe6uqle8eoum
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: experimental
    timestamp: Thu 2009-02-26 16:41:52 -0600
    message:
      Try a different method of streaming the chk pages.
      In this method, we work out what chk pages are referenced by what inventory
      pages. And then fetch them based on breadth-first references.
      This should mean that pages that will compress well together are
      sent together, rather than in arbitrary ordering.
      Note that we might want to do even a little better, and use
      a list for the first time we encounter it, rather than sets everywhere.
      (we still want a set to make sure we don't add it multiple times to the list)
      
      Then again, 'unordered' may reorder it anyway, so it may not matter.
      We should also consider using multiple chk streams, because it
      will likely result in better compression, by forcing breaks in the
      gc groups.
    modified:
      repofmt.py                     repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 45.1.4
    revision-id: john at arbash-meinel.com-20090226220934-lnqvbe6uqle8eoum
    parent: john at arbash-meinel.com-20090226215937-4n69g2lfbjm3yyip
    parent: john at arbash-meinel.com-20090225230422-4oigw03k7fq62eyb
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: experimental
    timestamp: Thu 2009-02-26 16:09:34 -0600
    message:
      Bring in the missing update from 'trunk'
    modified:
      groupcompress.py               groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
      repofmt.py                     repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 45.1.3
    revision-id: john at arbash-meinel.com-20090226215937-4n69g2lfbjm3yyip
    parent: john at arbash-meinel.com-20090226215757-0rpcnz9vf6z3eikn
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: experimental
    timestamp: Thu 2009-02-26 15:59:37 -0600
    message:
      Play with some experimental alternate hashes, comment them out for now.
    modified:
      repofmt.py                     repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 45.1.2
    revision-id: john at arbash-meinel.com-20090226215757-0rpcnz9vf6z3eikn
    parent: john at arbash-meinel.com-20090225221429-l0is3qxy1hvzuhes
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: experimental
    timestamp: Thu 2009-02-26 15:57:57 -0600
    message:
      experiment with removing the label and sha1 fields. Seems to shrink texts by 10-30%.
    modified:
      groupcompress.py               groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
-------------- next part --------------
=== modified file 'groupcompress.py'
--- a/groupcompress.py	2009-02-25 22:21:23 +0000
+++ b/groupcompress.py	2009-02-27 03:54:42 +0000
@@ -73,6 +73,7 @@
         else:
             contents = [next() for i in xrange(numbers[0])]
             result.append((op, None, numbers[0], contents))
+    ## return result
     return label, sha1, result
 
 
@@ -109,17 +110,18 @@
     """
     # gc-optimal ordering is approximately reverse topological,
     # properly grouped by file-id.
-    per_prefix_map = {'': []}
+    per_prefix_map = {}
     present_keys = []
     for item in parent_map.iteritems():
         key = item[0]
         if isinstance(key, str) or len(key) == 1:
-            per_prefix_map[''].append(item)
+            prefix = ''
         else:
-            try:
-                per_prefix_map[key[0]].append(item)
-            except KeyError:
-                per_prefix_map[key[0]] = [item]
+            prefix = key[0]
+        try:
+            per_prefix_map[prefix].append(item)
+        except KeyError:
+            per_prefix_map[prefix] = [item]
 
     for prefix in sorted(per_prefix_map):
         present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
@@ -220,9 +222,11 @@
         if key[-1] is None:
             key = key[:-1] + ('sha1:' + sha1,)
         label = '\x00'.join(key)
-        new_lines = []
-        new_lines.append('label: %s\n' % label)
-        new_lines.append('sha1: %s\n' % sha1)
+        ## new_lines = []
+        new_lines = ['label: %s\n' % label,
+                     'sha1: %s\n' % sha1,
+                    ]
+        ## index_lines = []
         index_lines = [False, False]
         # setup good encoding for trailing \n support.
         if not lines or lines[-1].endswith('\n'):
@@ -275,12 +279,13 @@
         delta_details = self.labels_deltas[key]
         delta_lines = self.lines[delta_details[0][1]:delta_details[1][1]]
         label, sha1, delta = parse(delta_lines)
+        ## delta = parse(delta_lines)
         if label != key:
             raise AssertionError("wrong key: %r, wanted %r" % (label, key))
         # Perhaps we want to keep the line offsets too in memory at least?
-        lines = apply_delta(''.join(self.lines), delta)
-        sha1 = sha_strings(lines)
-        return lines, sha1
+        chunks = apply_delta(''.join(self.lines), delta)
+        sha1 = sha_strings(chunks)
+        return chunks, sha1
 
     def flush_multi(self, instructions, lines, new_lines, index_lines):
         """Flush a bunch of different ranges out.
@@ -474,7 +479,7 @@
             chunks = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
             parent_lines = [parent_cache[parent] for parent in parent_map[key]]
             parent_cache[key] = list(
-                reannotate(parent_lines, fulltext, key, None, head_cache))
+                reannotate(parent_lines, chunks, key, None, head_cache))
         return parent_cache[key]
 
     def check(self, progress_bar=None):
@@ -577,10 +582,12 @@
             valid until the iterator is advanced.
         """
         # keys might be a generator
-        keys = set(keys)
+        orig_keys = list(keys)
+        keys = set(orig_keys)
         if not keys:
             return
-        if not self._index.has_graph:
+        if (not self._index.has_graph
+            and ordering in ('topological', 'gc-optimal')):
             # Cannot topological order when no graph has been stored.
             ordering = 'unordered'
         # Cheap: iterate
@@ -599,13 +606,15 @@
             # Now group by source:
         elif ordering == 'gc-optimal':
             parent_map = dict((key, details[2]) for key, details in
-                locations.iteritems())
+                              locations.iteritems())
             for key in local_keys:
                 parent_map[key] = self._unadded_refs[key]
             # XXX: This only optimizes for the target ordering. We may need to
             #      balance that with the time it takes to extract ordering, by
             #      somehow grouping based on locations[key][0:3]
             present_keys = sort_gc_optimal(parent_map)
+        elif ordering == 'as-requested':
+            present_keys = [key for key in orig_keys if key in locations]
         else:
             # We want to yield the keys in a semi-optimal (read-wise) ordering.
             # Otherwise we thrash the _group_cache and destroy performance
@@ -628,13 +637,15 @@
             else:
                 index_memo, _, parents, (method, _) = locations[key]
                 plain, delta_lines = self._get_group_and_delta_lines(index_memo)
+                ## delta = parse(delta_lines)
                 label, sha1, delta = parse(delta_lines)
                 if label != key:
                     raise AssertionError("wrong key: %r, wanted %r" % (label, key))
-                lines = apply_delta(plain, delta)
-                if sha_strings(lines) != sha1:
+                chunks = apply_delta(plain, delta)
+                ## sha1 = sha_strings(lines)
+                if sha_strings(chunks) != sha1:
                     raise AssertionError('sha1 sum did not match')
-            yield ChunkedContentFactory(key, parents, sha1, lines)
+            yield ChunkedContentFactory(key, parents, sha1, chunks)
 
     def get_sha1s(self, keys):
         """See VersionedFiles.get_sha1s()."""
@@ -703,8 +714,7 @@
             except errors.UnavailableRepresentation:
                 adapter_key = record.storage_kind, 'fulltext'
                 adapter = get_adapter(adapter_key)
-                bytes = adapter.get_bytes(record,
-                    record.get_bytes_as(record.storage_kind))
+                bytes = adapter.get_bytes(record)
                 lines = osutils.split_lines(bytes)
             soft = False
             if len(record.key) > 1:

=== modified file 'repofmt.py'
--- a/repofmt.py	2009-02-26 04:17:19 +0000
+++ b/repofmt.py	2009-02-27 16:07:46 +0000
@@ -24,6 +24,7 @@
     debug,
     errors,
     knit,
+    inventory,
     pack,
     repository,
     ui,
@@ -58,8 +59,13 @@
     CHKInventoryRepository,
     RepositoryFormatPackDevelopment5,
     RepositoryFormatPackDevelopment5Hash16,
+##    RepositoryFormatPackDevelopment5Hash16b,
+##    RepositoryFormatPackDevelopment5Hash63,
+##    RepositoryFormatPackDevelopment5Hash127a,
+##    RepositoryFormatPackDevelopment5Hash127b,
     RepositoryFormatPackDevelopment5Hash255,
     )
+    from bzrlib import chk_map
     chk_support = True
 except ImportError:
     chk_support = False
@@ -236,6 +242,94 @@
         self.repo.signatures._index._add_callback = self.signature_index.add_callback
         self.repo.texts._index._add_callback = self.text_index.add_callback
 
+    def _get_filtered_inv_stream(self, source_vf, keys):
+        """Filter the texts of inventories, to find the chk pages."""
+        id_roots = []
+        p_id_roots = []
+        id_roots_set = set()
+        p_id_roots_set = set()
+        def _filter_inv_stream(stream):
+            for idx, record in enumerate(stream):
+                ### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
+                bytes = record.get_bytes_as('fulltext')
+                chk_inv = inventory.CHKInventory.deserialise(None, bytes, record.key)
+                key = chk_inv.id_to_entry.key()
+                if key not in id_roots_set:
+                    id_roots.append(key)
+                    id_roots_set.add(key)
+                p_id_map = chk_inv.parent_id_basename_to_file_id
+                if p_id_map is not None:
+                    key = p_id_map.key()
+                    if key not in p_id_roots_set:
+                        p_id_roots_set.add(key)
+                        p_id_roots.append(key)
+                yield record
+        stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
+        return _filter_inv_stream(stream), id_roots, p_id_roots
+
+    def _get_chk_stream(self, source_vf, keys, id_roots, p_id_roots, pb=None):
+        # We want to stream the keys from 'id_roots', and things they
+        # reference, and then stream things from p_id_roots and things they
+        # reference, and then any remaining keys that we didn't get to.
+
+        # We also group referenced texts together, so if one root references a
+        # text with prefix 'a', and another root references a node with prefix
+        # 'a', we want to yield those nodes before we yield the nodes for 'b'
+        # This keeps 'similar' nodes together
+
+        # Note: We probably actually want multiple streams here, to help the
+        #       client understand that the different levels won't compress well
+        #       against eachother
+        #       Test the difference between using one Group per level, and
+        #       using 1 Group per prefix. (so '' (root) would get a group, then
+        #       all the references to search-key 'a' would get a group, etc.)
+        remaining_keys = set(keys)
+        counter = [0]
+        def _get_referenced_stream(root_keys):
+            cur_keys = root_keys
+            while cur_keys:
+                keys_by_search_prefix = {}
+                remaining_keys.difference_update(cur_keys)
+                next_keys = set()
+                stream = source_vf.get_record_stream(cur_keys, 'as-requested',
+                                                     True)
+                def next_stream():
+                    for record in stream:
+                        bytes = record.get_bytes_as('fulltext')
+                        # We don't care about search_key_func for this code,
+                        # because we only care about external references.
+                        node = chk_map._deserialise(bytes, record.key,
+                                                    search_key_func=None)
+                        common_base = node._search_prefix
+                        if isinstance(node, chk_map.InternalNode):
+                            for prefix, value in node._items.iteritems():
+                                assert isinstance(value, tuple)
+                                if value not in next_keys:
+                                    keys_by_search_prefix.setdefault(prefix,
+                                        []).append(value)
+                                    next_keys.add(value)
+                        counter[0] += 1
+                        if pb is not None:
+                            pb.update('chk node', counter[0])
+                        yield record
+                yield next_stream()
+                # Double check that we won't be emitting any keys twice
+                next_keys = next_keys.intersection(remaining_keys)
+                cur_keys = []
+                for prefix in sorted(keys_by_search_prefix):
+                    cur_keys.extend(keys_by_search_prefix[prefix])
+        for stream in _get_referenced_stream(id_roots):
+            yield stream
+        for stream in _get_referenced_stream(p_id_roots):
+            yield stream
+        if remaining_keys:
+            trace.note('There were %d keys in the chk index, which'
+                       ' were not referenced from inventories',
+                       len(remaining_keys))
+            stream = source_vf.get_record_stream(remaining_keys, 'unordered',
+                                                 True)
+            yield stream
+
     def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
                                  reload_func=None):
         """Execute a series of pack operations.
@@ -267,7 +361,13 @@
                        ('text_index', 'texts'),
                        ('signature_index', 'signatures'),
                       ]
+            # TODO: This is a very non-optimal ordering for chk_bytes. The
+            #       issue is that pages that are similar are not transmitted
+            #       together. Perhaps get_record_stream('gc-optimal') should be
+            #       taught about how to group chk pages?
+            has_chk = False
             if getattr(self, 'chk_index', None) is not None:
+                has_chk = True
                 to_copy.insert(2, ('chk_index', 'chk_bytes'))
 
             # Shouldn't we start_write_group around this?
@@ -302,8 +402,30 @@
                                       is_locked=self.repo.is_locked),
                         access=target_access,
                         delta=source_vf._delta)
-                    stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
-                    target_vf.insert_record_stream(stream)
+                    stream = None
+                    child_pb = ui.ui_factory.nested_progress_bar()
+                    try:
+                        if has_chk:
+                            if vf_name == 'inventories':
+                                stream, id_roots, p_id_roots = self._get_filtered_inv_stream(
+                                    source_vf, keys)
+                            elif vf_name == 'chk_bytes':
+                                for stream in self._get_chk_stream(source_vf, keys,
+                                                    id_roots, p_id_roots,
+                                                    pb=child_pb):
+                                    target_vf.insert_record_stream(stream)
+                                # No more to copy
+                                stream = []
+                        if stream is None:
+                            def pb_stream():
+                                substream = source_vf.get_record_stream(keys, 'gc-optimal', True)
+                                for idx, record in enumerate(substream):
+                                    child_pb.update(vf_name, idx, len(keys))
+                                    yield record
+                            stream = pb_stream()
+                        target_vf.insert_record_stream(stream)
+                    finally:
+                        child_pb.finished()
                 new_pack._check_references() # shouldn't be needed
             except:
                 pb.finished()

=== modified file 'tests/test_groupcompress.py'
--- a/tests/test_groupcompress.py	2009-02-25 22:11:02 +0000
+++ b/tests/test_groupcompress.py	2009-02-27 03:28:10 +0000
@@ -40,6 +40,7 @@
             'factory':make_pack_factory(False, False, 1),
             'graph': False,
             'key_length':1,
+            'support_partial_insertion':False,
             }
         )
     applier = TestScenarioApplier()



More information about the bazaar-commits mailing list