Rev 3904: We now have a 'reuse_blocks=False' flag for autopack et al. in http://bzr.arbash-meinel.com/branches/bzr/brisbane/lazy_gc_stream

John Arbash Meinel john at arbash-meinel.com
Tue Mar 17 18:29:12 GMT 2009


At http://bzr.arbash-meinel.com/branches/bzr/brisbane/lazy_gc_stream

------------------------------------------------------------
revno: 3904
revision-id: john at arbash-meinel.com-20090317182906-s7ynapnrcxj9i99s
parent: john at arbash-meinel.com-20090317174617-osa5ia09no26xm1w
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: lazy_gc_stream
timestamp: Tue 2009-03-17 13:29:06 -0500
message:
  We now have a 'reuse_blocks=False' flag for autopack et al.
  We need to be careful that insert_record_stream() is a simple function,
  but _insert_record_stream() is a generator.
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2009-03-17 17:46:17 +0000
+++ b/bzrlib/groupcompress.py	2009-03-17 18:29:06 +0000
@@ -1295,7 +1295,8 @@
         for _ in self._insert_record_stream(stream):
             pass
 
-    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None):
+    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
+                              reuse_blocks=True):
         """Internal core to insert a record stream into this container.
 
         This helper function has a different interface than insert_record_stream
@@ -1304,6 +1305,9 @@
         :param stream: A stream of records to insert.
         :param nostore_sha: If the sha1 of a given text matches nostore_sha,
             raise ExistingContent, rather than committing the new text.
+        :param reuse_blocks: If the source is streaming from
+            groupcompress-blocks, just insert the blocks as-is, rather than
+            expanding the texts and inserting again.
         :return: An iterator over the sha1 of the inserted records.
         :seealso insert_record_stream:
         :seealso add_lines:
@@ -1346,24 +1350,27 @@
             # Raise an error when a record is missing.
             if record.storage_kind == 'absent':
                 raise errors.RevisionNotPresent(record.key, self)
-            if record.storage_kind == 'groupcompress-block':
-                # Insert the raw block into the target repo
-                insert_manager = record._manager
-                bytes = record._manager._block.to_bytes()
-                _, start, length = self._access.add_raw_records(
-                    [(None, len(bytes))], bytes)[0]
-                del bytes
-                block_start = start
-                block_length = length
-            if record.storage_kind in ('groupcompress-block',
-                                       'groupcompress-block-ref'):
-                assert insert_manager is not None
-                assert record._manager is insert_manager
-                value = "%d %d %d %d" % (block_start, block_length,
-                                         record._start, record._end)
-                nodes = [(record.key, value, (record.parents,))]
-                self._index.add_records(nodes, random_id=random_id)
-                continue
+            if reuse_blocks:
+                # If the reuse_blocks flag is set, check to see if we can just
+                # copy a groupcompress block as-is.
+                if record.storage_kind == 'groupcompress-block':
+                    # Insert the raw block into the target repo
+                    insert_manager = record._manager
+                    bytes = record._manager._block.to_bytes()
+                    _, start, length = self._access.add_raw_records(
+                        [(None, len(bytes))], bytes)[0]
+                    del bytes
+                    block_start = start
+                    block_length = length
+                if record.storage_kind in ('groupcompress-block',
+                                           'groupcompress-block-ref'):
+                    assert insert_manager is not None
+                    assert record._manager is insert_manager
+                    value = "%d %d %d %d" % (block_start, block_length,
+                                             record._start, record._end)
+                    nodes = [(record.key, value, (record.parents,))]
+                    self._index.add_records(nodes, random_id=random_id)
+                    continue
             try:
                 bytes = record.get_bytes_as('fulltext')
             except errors.UnavailableRepresentation:

=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py	2009-03-17 17:46:17 +0000
+++ b/bzrlib/tests/test_groupcompress.py	2009-03-17 18:29:06 +0000
@@ -446,8 +446,7 @@
     def test_get_record_stream_as_requested(self):
         # Consider promoting 'as-requested' to general availability, and
         # make this a VF interface test
-        vf = self.make_test_vf(False, do_cleanup=False,
-                               dir='source')
+        vf = self.make_test_vf(False, dir='source')
         vf.add_lines(('a',), (), ['lines\n'])
         vf.add_lines(('b',), (), ['lines\n'])
         vf.add_lines(('c',), (), ['lines\n'])
@@ -461,8 +460,6 @@
                     [('b',), ('a',), ('d',), ('c',)],
                     'as-requested', False)]
         self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
-        # We have to cleanup manually, because we create a second VF
-        groupcompress.cleanup_pack_group(vf)
 
         # It should work even after being repacked into another VF
         vf2 = self.make_test_vf(False, dir='target')
@@ -479,8 +476,8 @@
                     'as-requested', False)]
         self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
 
-    def test_get_record_stream_block(self):
-        vf = self.make_test_vf(True, do_cleanup=False, dir='source')
+    def test_insert_record_stream_re_uses_blocks(self):
+        vf = self.make_test_vf(True, dir='source')
         def grouped_stream(revision_ids, first_parents=()):
             parents = first_parents
             for revision_id in revision_ids:
@@ -500,6 +497,7 @@
         block_bytes = {}
         stream = vf.get_record_stream([(r,) for r in 'abcdefgh'],
                                       'unordered', False)
+        num_records = 0
         for record in stream:
             if record.key in [('a',), ('e',)]:
                 self.assertEqual('groupcompress-block', record.storage_kind)
@@ -507,6 +505,8 @@
                 self.assertEqual('groupcompress-block-ref',
                                  record.storage_kind)
             block_bytes[record.key] = record._manager._block._z_content
+            num_records += 1
+        self.assertEqual(8, num_records)
         for r in 'abcd':
             key = (r,)
             self.assertIs(block_bytes[key], block_bytes[('a',)])
@@ -522,13 +522,58 @@
         # the target vf, but the groups themselves should not be disturbed.
         vf2.insert_record_stream(vf.get_record_stream(
             [(r,) for r in 'abcdefgh'], 'groupcompress', False))
-        groupcompress.cleanup_pack_group(vf)
         stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
                                        'groupcompress', False)
         vf2.writer.end()
+        num_records = 0
         for record in stream:
+            num_records += 1
             self.assertEqual(block_bytes[record.key],
                              record._manager._block._z_content)
+        self.assertEqual(8, num_records)
+
+    def test__insert_record_stream_no_reuse_block(self):
+        vf = self.make_test_vf(True, dir='source')
+        def grouped_stream(revision_ids, first_parents=()):
+            parents = first_parents
+            for revision_id in revision_ids:
+                key = (revision_id,)
+                record = versionedfile.FulltextContentFactory(
+                    key, parents, None,
+                    'some content that is\n'
+                    'identical except for\n'
+                    'revision_id:%s\n' % (revision_id,))
+                yield record
+                parents = (key,)
+        # One group, a-d
+        vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd']))
+        # Second group, e-h
+        vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'],
+                                               first_parents=(('d',),)))
+        vf.writer.end()
+        self.assertEqual(8, len(list(vf.get_record_stream(
+                                        [(r,) for r in 'abcdefgh'],
+                                        'unordered', False))))
+        # Now copy the blocks into another vf, and ensure that the blocks are
+        # preserved without creating new entries
+        vf2 = self.make_test_vf(True, dir='target')
+        # ordering in 'groupcompress' order, should actually swap the groups in
+        # the target vf, but the groups themselves should not be disturbed.
+        list(vf2._insert_record_stream(vf.get_record_stream(
+            [(r,) for r in 'abcdefgh'], 'groupcompress', False),
+            reuse_blocks=False))
+        vf2.writer.end()
+        # After inserting with reuse_blocks=False, we should have everything in
+        # a single new block.
+        stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
+                                       'groupcompress', False)
+        block = None
+        for record in stream:
+            if block is None:
+                block = record._manager._block
+            else:
+                self.assertIs(block, record._manager._block)
+
 
 class TestLazyGroupCompress(tests.TestCaseWithTransport):
 



More information about the bazaar-commits mailing list