Rev 3920: Merge the new GroupCHKStreamSource. in http://bazaar.launchpad.net/%7Ebzr/bzr/brisbane-core

John Arbash Meinel john at arbash-meinel.com
Wed Apr 1 16:57:29 BST 2009


At http://bazaar.launchpad.net/%7Ebzr/bzr/brisbane-core

------------------------------------------------------------
revno: 3920 [merge]
revision-id: john at arbash-meinel.com-20090401155608-thmsypbkwt4e4sv9
parent: john at arbash-meinel.com-20090401155331-h99g8mm7b00kybyr
parent: john at arbash-meinel.com-20090401154955-vukyb3s3igmrnu95
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: brisbane-core
timestamp: Wed 2009-04-01 10:56:08 -0500
message:
  Merge the new GroupCHKStreamSource.
modified:
  bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
  bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
------------------------------------------------------------
Use --levels 0 (or -n0) to see merged revisions.
-------------- next part --------------
=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py	2009-03-31 16:20:47 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py	2009-04-01 02:06:26 +0000
@@ -582,82 +582,6 @@
             self._obsolete_packs(packs)
 
 
-# XXX: This format is scheduled for termination
-#
-# class GCPackRepository(KnitPackRepository):
-#     """GC customisation of KnitPackRepository."""
-#
-#     def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
-#         _serializer):
-#         """Overridden to change pack collection class."""
-#         KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
-#             _commit_builder_class, _serializer)
-#         # and now replace everything it did :)
-#         index_transport = self._transport.clone('indices')
-#         self._pack_collection = GCRepositoryPackCollection(self,
-#             self._transport, index_transport,
-#             self._transport.clone('upload'),
-#             self._transport.clone('packs'),
-#             _format.index_builder_class,
-#             _format.index_class,
-#             use_chk_index=self._format.supports_chks,
-#             )
-#         self.inventories = GroupCompressVersionedFiles(
-#             _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
-#                 add_callback=self._pack_collection.inventory_index.add_callback,
-#                 parents=True, is_locked=self.is_locked),
-#             access=self._pack_collection.inventory_index.data_access)
-#         self.revisions = GroupCompressVersionedFiles(
-#             _GCGraphIndex(self._pack_collection.revision_index.combined_index,
-#                 add_callback=self._pack_collection.revision_index.add_callback,
-#                 parents=True, is_locked=self.is_locked),
-#             access=self._pack_collection.revision_index.data_access,
-#             delta=False)
-#         self.signatures = GroupCompressVersionedFiles(
-#             _GCGraphIndex(self._pack_collection.signature_index.combined_index,
-#                 add_callback=self._pack_collection.signature_index.add_callback,
-#                 parents=False, is_locked=self.is_locked),
-#             access=self._pack_collection.signature_index.data_access,
-#             delta=False)
-#         self.texts = GroupCompressVersionedFiles(
-#             _GCGraphIndex(self._pack_collection.text_index.combined_index,
-#                 add_callback=self._pack_collection.text_index.add_callback,
-#                 parents=True, is_locked=self.is_locked),
-#             access=self._pack_collection.text_index.data_access)
-#         if _format.supports_chks:
-#             # No graph, no compression:- references from chks are between
-#             # different objects not temporal versions of the same; and without
-#             # some sort of temporal structure knit compression will just fail.
-#             self.chk_bytes = GroupCompressVersionedFiles(
-#                 _GCGraphIndex(self._pack_collection.chk_index.combined_index,
-#                     add_callback=self._pack_collection.chk_index.add_callback,
-#                     parents=False, is_locked=self.is_locked),
-#                 access=self._pack_collection.chk_index.data_access)
-#         else:
-#             self.chk_bytes = None
-#         # True when the repository object is 'write locked' (as opposed to the
-#         # physical lock only taken out around changes to the pack-names list.)
-#         # Another way to represent this would be a decorator around the control
-#         # files object that presents logical locks as physical ones - if this
-#         # gets ugly consider that alternative design. RBC 20071011
-#         self._write_lock_count = 0
-#         self._transaction = None
-#         # for tests
-#         self._reconcile_does_inventory_gc = True
-#         self._reconcile_fixes_text_parents = True
-#         self._reconcile_backsup_inventory = False
-#
-#     def suspend_write_group(self):
-#         raise errors.UnsuspendableWriteGroup(self)
-#
-#     def _resume_write_group(self, tokens):
-#         raise errors.UnsuspendableWriteGroup(self)
-#
-#     def _reconcile_pack(self, collection, packs, extension, revs, pb):
-#         bork
-#         return packer.pack(pb)
-
-
 class GCCHKPackRepository(CHKInventoryRepository):
     """GC customisation of CHKInventoryRepository."""
 
@@ -716,6 +640,14 @@
         self._reconcile_fixes_text_parents = True
         self._reconcile_backsup_inventory = False
 
+    def _get_source(self, to_format):
+        """Return a source for streaming from this repository."""
+        if to_format.__class__ is self._format.__class__:
+            # We must be exactly the same format, otherwise stuff like the chk
+            # page layout might be different
+            return GroupCHKStreamSource(self, to_format)
+        return super(GCCHKPackRepository, self)._get_source(to_format)
+
     def suspend_write_group(self):
         raise errors.UnsuspendableWriteGroup(self)
 
@@ -851,3 +783,104 @@
         if not getattr(target_format, 'supports_tree_reference', False):
             raise errors.BadConversionTarget(
                 'Does not support nested trees', target_format)
+
+
+class GroupCHKStreamSource(repository.StreamSource):
+    """Used when both the source and target repo are GroupCHK repos."""
+
+    def __init__(self, from_repository, to_format):
+        """Create a StreamSource streaming from from_repository."""
+        super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
+        self._revision_keys = None
+        self._text_keys = None
+        self._chk_id_roots = None
+        self._chk_p_id_roots = None
+
+    def _get_filtered_inv_stream(self):
+        """Get a stream of inventory texts.
+
+        When this function returns, self._chk_id_roots and self._chk_p_id_roots
+        should be populated.
+        """
+        self._chk_id_roots = []
+        self._chk_p_id_roots = []
+        def _filtered_inv_stream():
+            id_roots_set = set()
+            p_id_roots_set = set()
+            source_vf = self.from_repository.inventories
+            stream = source_vf.get_record_stream(self._revision_keys,
+                                                 'groupcompress', True)
+            for record in stream:
+                bytes = record.get_bytes_as('fulltext')
+                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
+                                                             record.key)
+                key = chk_inv.id_to_entry.key()
+                if key not in id_roots_set:
+                    self._chk_id_roots.append(key)
+                    id_roots_set.add(key)
+                p_id_map = chk_inv.parent_id_basename_to_file_id
+                if p_id_map is None:
+                    raise AssertionError('Parent id -> file_id map not set')
+                key = p_id_map.key()
+                if key not in p_id_roots_set:
+                    p_id_roots_set.add(key)
+                    self._chk_p_id_roots.append(key)
+                yield record
+            # We have finished processing all of the inventory records, we
+            # don't need these sets anymore
+            id_roots_set.clear()
+            p_id_roots_set.clear()
+        return ('inventories', _filtered_inv_stream())
+
+    def _get_filtered_chk_streams(self, excluded_keys):
+        self._text_keys = set()
+        excluded_keys.discard(_mod_revision.NULL_REVISION)
+        if not excluded_keys:
+            uninteresting_root_keys = set()
+            uninteresting_pid_root_keys = set()
+        else:
+            uninteresting_root_keys = set()
+            uninteresting_pid_root_keys = set()
+            for inv in self.from_repository.iter_inventories(excluded_keys):
+                uninteresting_root_keys.add(inv.id_to_entry.key())
+                uninteresting_pid_root_keys.add(
+                    inv.parent_id_basename_to_file_id.key())
+        bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
+        chk_bytes = self.from_repository.chk_bytes
+        def _filter_id_to_entry():
+            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
+                        self._chk_id_roots, uninteresting_root_keys):
+                for name, bytes in items:
+                    # Note: we don't care about name_utf8, because we are always
+                    # rich-root = True
+                    _, file_id, revision_id = bytes_to_info(bytes)
+                    self._text_keys.add((file_id, revision_id))
+                if record is not None:
+                    yield record
+        yield 'chk_bytes', _filter_id_to_entry()
+        def _get_parent_id_basename_to_file_id_pages():
+            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
+                        self._chk_p_id_roots, uninteresting_pid_root_keys):
+                if record is not None:
+                    yield record
+        yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
+
+    def _get_text_stream(self):
+        # Note: We know we don't have to handle adding root keys, because both
+        # the source and target are GCCHK, and those always support rich-roots
+        # We may want to request as 'unordered', in case the source has done a
+        # 'split' packing
+        return ('texts', self.from_repository.texts.get_record_stream(
+                            self._text_keys, 'groupcompress', False))
+
+    def get_stream(self, search):
+        revision_ids = search.get_keys()
+        for stream_info in self._fetch_revision_texts(revision_ids):
+            yield stream_info
+        self._revision_keys = [(rev_id,) for rev_id in revision_ids]
+        yield self._get_filtered_inv_stream()
+        # The keys to exclude are part of the search recipe
+        _, _, exclude_keys, _ = search.get_recipe()
+        for stream_info in self._get_filtered_chk_streams(exclude_keys):
+            yield stream_info
+        yield self._get_text_stream()

=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py	2009-03-30 11:49:32 +0000
+++ b/bzrlib/tests/test_repository.py	2009-04-01 15:49:55 +0000
@@ -61,7 +61,12 @@
     upgrade,
     workingtree,
     )
-from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
+from bzrlib.repofmt import (
+    groupcompress_repo,
+    knitrepo,
+    pack_repo,
+    weaverepo,
+    )
 
 
 class TestDefaultFormat(TestCase):
@@ -1198,3 +1203,19 @@
         self.assertTrue(new_pack.inventory_index._optimize_for_size)
         self.assertTrue(new_pack.text_index._optimize_for_size)
         self.assertTrue(new_pack.signature_index._optimize_for_size)
+
+
+class TestGCCHKPackCollection(TestCaseWithTransport):
+
+    def test_stream_source_to_gc(self):
+        source = self.make_repository('source', format='gc-chk255-big')
+        target = self.make_repository('target', format='gc-chk255-big')
+        stream = source._get_source(target._format)
+        self.assertIsInstance(stream, groupcompress_repo.GroupCHKStreamSource)
+
+    def test_stream_source_to_non_gc(self):
+        source = self.make_repository('source', format='gc-chk255-big')
+        target = self.make_repository('target', format='rich-root-pack')
+        stream = source._get_source(target._format)
+        # We don't want the child GroupCHKStreamSource
+        self.assertIs(type(stream), repository.StreamSource)



More information about the bazaar-commits mailing list