Rev 4674: Start doing some work to make sure that we call _check_rebuild_block in http://bazaar.launchpad.net/~jameinel/bzr/2.1b1-pack-on-the-fly

John Arbash Meinel john at arbash-meinel.com
Wed Sep 2 21:46:23 BST 2009


At http://bazaar.launchpad.net/~jameinel/bzr/2.1b1-pack-on-the-fly

------------------------------------------------------------
revno: 4674
revision-id: john at arbash-meinel.com-20090902204614-33hhdj8dmimdhxw9
parent: john at arbash-meinel.com-20090902200007-vhiub5qwc1gafxt9
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.1b1-pack-on-the-fly
timestamp: Wed 2009-09-02 15:46:14 -0500
message:
  Start doing some work to make sure that we call _check_rebuild_block
  as part of the 'reuse-block' code.
  Going further, we should start using the new api soon. I just want direct tests
  that insert_record_stream is doing the right thing in a couple of edge cases
  first.
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2009-09-02 20:00:07 +0000
+++ b/bzrlib/groupcompress.py	2009-09-02 20:46:14 +0000
@@ -633,6 +633,14 @@
                 break
         # The content failed both the mixed check and the single-content check
         # so obviously it is not fully utilized
+        # TODO: there is one other constraint that isn't being checked
+        #       namely, that the entries in the block are in the appropriate
+        #       order. For example, you could insert the entries in exactly
+        #       reverse groupcompress order, and we would think that is ok.
+        #       (all the right objects are in one group, and it is fully
+        #       utilized, etc.) For now, we assume that case is rare,
+        #       especially since we should always fetch in 'groupcompress'
+        #       order.
         return False
 
     def _check_rebuild_block(self):
@@ -1640,6 +1648,7 @@
         block_length = None
         # XXX: TODO: remove this, it is just for safety checking for now
         inserted_keys = set()
+        reuse_this_block = reuse_blocks
         for record in stream:
             # Raise an error when a record is missing.
             if record.storage_kind == 'absent':
@@ -1650,19 +1659,13 @@
                                ' but then inserted %r two times', record.key)
                     continue
                 inserted_keys.add(record.key)
-            reuse_this_block = reuse_blocks
-            if reuse_this_block:
+            if reuse_blocks:
                 # If the reuse_blocks flag is set, check to see if we can just
                 # copy a groupcompress block as-is.
                 if record.storage_kind == 'groupcompress-block':
                     # Check to see if we really want to re-use this block
                     insert_manager = record._manager
-                    # if not insert_manager.check_is_well_utilized():
-                    #     reuse_this_block = False
-                    if len(insert_manager._factories) == 1:
-                        # This block only has a single record in it
-                        # Mark this block to be rebuilt
-                        reuse_this_block = False
+                    reuse_this_block = insert_manager.check_is_well_utilized()
             if reuse_this_block:
                 # We still want to reuse this block
                 if record.storage_kind == 'groupcompress-block':

=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py	2009-09-02 20:00:07 +0000
+++ b/bzrlib/tests/test_groupcompress.py	2009-09-02 20:46:14 +0000
@@ -538,7 +538,7 @@
                     'as-requested', False)]
         self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
 
-    def test_insert_record_stream_re_uses_blocks(self):
+    def test_insert_record_stream_reuses_blocks(self):
         vf = self.make_test_vf(True, dir='source')
         def grouped_stream(revision_ids, first_parents=()):
             parents = first_parents
@@ -582,8 +582,14 @@
         vf2 = self.make_test_vf(True, dir='target')
         # ordering in 'groupcompress' order, should actually swap the groups in
         # the target vf, but the groups themselves should not be disturbed.
-        vf2.insert_record_stream(vf.get_record_stream(
-            [(r,) for r in 'abcdefgh'], 'groupcompress', False))
+        def small_size_stream():
+            for record in vf.get_record_stream([(r,) for r in 'abcdefgh'],
+                                               'groupcompress', False):
+                record._manager._full_enough_block_size = \
+                    record._manager._block._content_length
+                yield record
+                        
+        vf2.insert_record_stream(small_size_stream())
         stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
                                        'groupcompress', False)
         vf2.writer.end()
@@ -594,6 +600,44 @@
                              record._manager._block._z_content)
         self.assertEqual(8, num_records)
 
+    def test_insert_record_stream_packs_on_the_fly(self):
+        vf = self.make_test_vf(True, dir='source')
+        def grouped_stream(revision_ids, first_parents=()):
+            parents = first_parents
+            for revision_id in revision_ids:
+                key = (revision_id,)
+                record = versionedfile.FulltextContentFactory(
+                    key, parents, None,
+                    'some content that is\n'
+                    'identical except for\n'
+                    'revision_id:%s\n' % (revision_id,))
+                yield record
+                parents = (key,)
+        # One group, a-d
+        vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd']))
+        # Second group, e-h
+        vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'],
+                                               first_parents=(('d',),)))
+        # Now copy the blocks into another vf, and see that the
+        # insert_record_stream rebuilt a new block on-the-fly because of
+        # under-utilization
+        vf2 = self.make_test_vf(True, dir='target')
+        vf2.insert_record_stream(vf.get_record_stream(
+            [(r,) for r in 'abcdefgh'], 'groupcompress', False))
+        stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
+                                       'groupcompress', False)
+        vf2.writer.end()
+        num_records = 0
+        # All of the records should be recombined into a single block
+        block = None
+        for record in stream:
+            num_records += 1
+            if block is None:
+                block = record._manager._block
+            else:
+                self.assertIs(block, record._manager._block)
+        self.assertEqual(8, num_records)
+
     def test__insert_record_stream_no_reuse_block(self):
         vf = self.make_test_vf(True, dir='source')
         def grouped_stream(revision_ids, first_parents=()):
@@ -636,6 +680,37 @@
             else:
                 self.assertIs(block, record._manager._block)
 
+    def test_insert_record_stream_truncates_partial_blocks(self):
+        vf = self.make_test_vf(True, dir='source')
+        def grouped_stream(revision_ids, first_parents=()):
+            parents = first_parents
+            for revision_id in revision_ids:
+                key = (revision_id,)
+                record = versionedfile.FulltextContentFactory(
+                    key, parents, None,
+                    'some content that is\n'
+                    'identical except for\n'
+                    'revision_id:%s\n' % (revision_id,))
+                yield record
+                parents = (key,)
+        # One group, a-l
+        vf.insert_record_stream(grouped_stream('abcdefghijkl'))
+        vf.writer.end()
+        block = manager = None
+        record_order = []
+        # Everything should fit in a single block
+        for record in vf.get_record_stream([(r,) for r in 'abcdefghijkl'],
+                                           'unordered', False):
+            record_order.append(record.key)
+            if block is None:
+                block = record._manager._block
+                manager = record._manager
+            else:
+                self.assertIs(block, record._manager._block)
+                self.assertIs(manager, record._manager)
+        # 'unordered' fetching will put that in the same order it was inserted
+        self.assertEqual([(r,) for r in 'abcdefghijkl'], record_order)
+
     def test_add_missing_noncompression_parent_unvalidated_index(self):
         unvalidated = self.make_g_index_missing_parent()
         combined = _mod_index.CombinedGraphIndex([unvalidated])



More information about the bazaar-commits mailing list