Rev 3903: groupcompress now copies the blocks exactly as they were given. in http://bzr.arbash-meinel.com/branches/bzr/brisbane/lazy_gc_stream

John Arbash Meinel john at arbash-meinel.com
Tue Mar 17 17:46:26 GMT 2009


At http://bzr.arbash-meinel.com/branches/bzr/brisbane/lazy_gc_stream

------------------------------------------------------------
revno: 3903
revision-id: john at arbash-meinel.com-20090317174617-osa5ia09no26xm1w
parent: john at arbash-meinel.com-20090317161231-nzb4dk8t35ucw84u
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: lazy_gc_stream
timestamp: Tue 2009-03-17 12:46:17 -0500
message:
  groupcompress now copies the blocks exactly as they were given.
  
  One major concern here is that 'topo_sort' is not particularly stable. For example,
  given a history of a=>b=>c=>d and e=>f=>g=>h, it easily groups the contents as,
  h,a,b,c,d,e,f,g. Which is interleaving unrelated histories.
  This will actually cause transmission of the e-h group 2x, and cause effective
  'bloat'.
  We can still tell 'get_record_stream' to remove some of this.
  Also, autopack still needs to be told to *not* re-use blocks.
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2009-03-17 16:12:31 +0000
+++ b/bzrlib/groupcompress.py	2009-03-17 17:46:17 +0000
@@ -1339,10 +1339,31 @@
         last_fulltext_len = None
         max_fulltext_len = 0
         max_fulltext_prefix = None
+        insert_manager = None
+        block_start = None
+        block_length = None
         for record in stream:
             # Raise an error when a record is missing.
             if record.storage_kind == 'absent':
                 raise errors.RevisionNotPresent(record.key, self)
+            if record.storage_kind == 'groupcompress-block':
+                # Insert the raw block into the target repo
+                insert_manager = record._manager
+                bytes = record._manager._block.to_bytes()
+                _, start, length = self._access.add_raw_records(
+                    [(None, len(bytes))], bytes)[0]
+                del bytes
+                block_start = start
+                block_length = length
+            if record.storage_kind in ('groupcompress-block',
+                                       'groupcompress-block-ref'):
+                assert insert_manager is not None
+                assert record._manager is insert_manager
+                value = "%d %d %d %d" % (block_start, block_length,
+                                         record._start, record._end)
+                nodes = [(record.key, value, (record.parents,))]
+                self._index.add_records(nodes, random_id=random_id)
+                continue
             try:
                 bytes = record.get_bytes_as('fulltext')
             except errors.UnavailableRepresentation:

=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py	2009-03-17 16:12:31 +0000
+++ b/bzrlib/tests/test_groupcompress.py	2009-03-17 17:46:17 +0000
@@ -23,6 +23,7 @@
     errors,
     osutils,
     tests,
+    versionedfile,
     )
 from bzrlib.osutils import sha_string
 from bzrlib.tests import (
@@ -429,8 +430,10 @@
 
 class TestCaseWithGroupCompressVersionedFiles(tests.TestCaseWithTransport):
 
-    def make_test_vf(self, create_graph, keylength=1, do_cleanup=True):
-        t = self.get_transport()
+    def make_test_vf(self, create_graph, keylength=1, do_cleanup=True,
+                     dir='.'):
+        t = self.get_transport(dir)
+        t.ensure_base()
         vf = groupcompress.make_pack_factory(graph=create_graph,
             delta=False, keylength=keylength)(t)
         if do_cleanup:
@@ -443,7 +446,8 @@
     def test_get_record_stream_as_requested(self):
         # Consider promoting 'as-requested' to general availability, and
         # make this a VF interface test
-        vf = self.make_test_vf(False, do_cleanup=False)
+        vf = self.make_test_vf(False, do_cleanup=False,
+                               dir='source')
         vf.add_lines(('a',), (), ['lines\n'])
         vf.add_lines(('b',), (), ['lines\n'])
         vf.add_lines(('c',), (), ['lines\n'])
@@ -461,7 +465,7 @@
         groupcompress.cleanup_pack_group(vf)
 
         # It should work even after being repacked into another VF
-        vf2 = self.make_test_vf(False)
+        vf2 = self.make_test_vf(False, dir='target')
         vf2.insert_record_stream(vf.get_record_stream(
                     [('b',), ('a',), ('d',), ('c',)], 'as-requested', False))
         vf2.writer.end()
@@ -475,6 +479,56 @@
                     'as-requested', False)]
         self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
 
+    def test_get_record_stream_block(self):
+        vf = self.make_test_vf(True, do_cleanup=False, dir='source')
+        def grouped_stream(revision_ids, first_parents=()):
+            parents = first_parents
+            for revision_id in revision_ids:
+                key = (revision_id,)
+                record = versionedfile.FulltextContentFactory(
+                    key, parents, None,
+                    'some content that is\n'
+                    'identical except for\n'
+                    'revision_id:%s\n' % (revision_id,))
+                yield record
+                parents = (key,)
+        # One group, a-d
+        vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd']))
+        # Second group, e-h
+        vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'],
+                                               first_parents=(('d',),)))
+        block_bytes = {}
+        stream = vf.get_record_stream([(r,) for r in 'abcdefgh'],
+                                      'unordered', False)
+        for record in stream:
+            if record.key in [('a',), ('e',)]:
+                self.assertEqual('groupcompress-block', record.storage_kind)
+            else:
+                self.assertEqual('groupcompress-block-ref',
+                                 record.storage_kind)
+            block_bytes[record.key] = record._manager._block._z_content
+        for r in 'abcd':
+            key = (r,)
+            self.assertIs(block_bytes[key], block_bytes[('a',)])
+            self.assertNotEqual(block_bytes[key], block_bytes[('e',)])
+        for r in 'efgh':
+            key = (r,)
+            self.assertIs(block_bytes[key], block_bytes[('e',)])
+            self.assertNotEqual(block_bytes[key], block_bytes[('a',)])
+        # Now copy the blocks into another vf, and ensure that the blocks are
+        # preserved without creating new entries
+        vf2 = self.make_test_vf(True, dir='target')
+        # ordering in 'groupcompress' order, should actually swap the groups in
+        # the target vf, but the groups themselves should not be disturbed.
+        vf2.insert_record_stream(vf.get_record_stream(
+            [(r,) for r in 'abcdefgh'], 'groupcompress', False))
+        groupcompress.cleanup_pack_group(vf)
+        stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
+                                       'groupcompress', False)
+        vf2.writer.end()
+        for record in stream:
+            self.assertEqual(block_bytes[record.key],
+                             record._manager._block._z_content)
 
 class TestLazyGroupCompress(tests.TestCaseWithTransport):
 



More information about the bazaar-commits mailing list