Rev 50: Try a different method of streaming the chk pages. in http://bazaar.launchpad.net/%7Ejameinel/bzr-groupcompress/experimental

John Arbash Meinel john at arbash-meinel.com
Thu Feb 26 22:48:09 GMT 2009


At http://bazaar.launchpad.net/%7Ejameinel/bzr-groupcompress/experimental

------------------------------------------------------------
revno: 50
revision-id: john at arbash-meinel.com-20090226224152-z4jiazt0gp1vsylk
parent: john at arbash-meinel.com-20090226220934-lnqvbe6uqle8eoum
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: experimental
timestamp: Thu 2009-02-26 16:41:52 -0600
message:
  Try a different method of streaming the chk pages.
  In this method, we work out what chk pages are referenced by what inventory
  pages. And then fetch them based on breadth-first references.
  This should mean that pages that will compress well together are
  sent together, rather than in arbitrary ordering.
  Note that we might want to do even a little better, and use
  a list for the first time we encounter it, rather than sets everywhere.
  (we still want a set to make sure we don't add it multiple times to the list)
  
  Then again, 'unordered' may reorder it anyway, so it may not matter.
  We should also consider using multiple chk streams, because it
  will likely result in better compression, by forcing breaks in the
  gc groups.
-------------- next part --------------
=== modified file 'repofmt.py'
--- a/repofmt.py	2009-02-26 22:09:34 +0000
+++ b/repofmt.py	2009-02-26 22:41:52 +0000
@@ -24,6 +24,7 @@
     debug,
     errors,
     knit,
+    inventory,
     pack,
     repository,
     ui,
@@ -64,6 +65,7 @@
 ##    RepositoryFormatPackDevelopment5Hash127b,
     RepositoryFormatPackDevelopment5Hash255,
     )
+    from bzrlib import chk_map
     chk_support = True
 except ImportError:
     chk_support = False
@@ -240,6 +242,61 @@
         self.repo.signatures._index._add_callback = self.signature_index.add_callback
         self.repo.texts._index._add_callback = self.text_index.add_callback
 
+    def _get_filtered_inv_stream(self, source_vf, keys):
+        """Filter the texts of inventories, to find the chk pages."""
+        id_roots = set()
+        p_id_roots = set()
+        def _filter_inv_stream(stream):
+            for idx, record in enumerate(stream):
+                ### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
+                bytes = record.get_bytes_as('fulltext')
+                chk_inv = inventory.CHKInventory.deserialise(None, bytes, record.key)
+                id_roots.add(chk_inv.id_to_entry.key())
+                p_id_map = chk_inv.parent_id_basename_to_file_id
+                if p_id_map is not None:
+                    p_id_roots.add(p_id_map.key())
+                yield record
+        stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
+        return _filter_inv_stream(stream), id_roots, p_id_roots
+
+    def _get_chk_stream(self, source_vf, keys, id_roots, p_id_roots):
+        # We want to stream the keys from 'id_roots', and things they
+        # reference, and then stream things from p_id_roots and things they
+        # reference, and then any remaining keys that we didn't get to.
+
+        # Note: We probably actually want multiple streams here, to help the
+        #       client understand that the different levels won't compress well
+        #       against eachother
+        remaining_keys = set(keys)
+        def _get_referenced_stream(root_keys):
+            cur_keys = root_keys
+            while cur_keys:
+                remaining_keys.difference_update(cur_keys)
+                next_keys = set()
+                stream = source_vf.get_record_stream(cur_keys, 'unordered',
+                                                     True)
+                for record in stream:
+                    bytes = record.get_bytes_as('fulltext')
+                    # We don't care about search_key_func for this code,
+                    # because we only care about external references.
+                    node = chk_map._deserialise(bytes, record.key,
+                                                search_key_func=None)
+                    next_keys.update(node.refs())
+                    yield record
+                cur_keys = next_keys.intersection(remaining_keys)
+        for record in _get_referenced_stream(id_roots):
+            yield record
+        for record in _get_referenced_stream(p_id_roots):
+            yield record
+        if remaining_keys:
+            trace.note('There were %d keys in the chk index, which'
+                       ' were not referenced from inventories',
+                       len(remaining_keys))
+            stream = source_vf.get_record_stream(remaining_keys, 'unordered',
+                                                 True)
+            for record in stream:
+                yield record
+
     def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
                                  reload_func=None):
         """Execute a series of pack operations.
@@ -275,7 +332,9 @@
             #       issue is that pages that are similar are not transmitted
             #       together. Perhaps get_record_stream('gc-optimal') should be
             #       taught about how to group chk pages?
+            has_chk = False
             if getattr(self, 'chk_index', None) is not None:
+                has_chk = True
                 to_copy.insert(2, ('chk_index', 'chk_bytes'))
 
             # Shouldn't we start_write_group around this?
@@ -310,7 +369,16 @@
                                       is_locked=self.repo.is_locked),
                         access=target_access,
                         delta=source_vf._delta)
-                    stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
+                    stream = None
+                    if has_chk:
+                        if vf_name == 'inventories':
+                            stream, id_roots, p_id_roots = self._get_filtered_inv_stream(
+                                source_vf, keys)
+                        elif vf_name == 'chk_bytes':
+                            stream = self._get_chk_stream(source_vf, keys,
+                                                          id_roots, p_id_roots)
+                    if stream is None:
+                        stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
                     target_vf.insert_record_stream(stream)
                 new_pack._check_references() # shouldn't be needed
             except:



More information about the bazaar-commits mailing list