Rev 4672: (robertc) Fix bug 402652: recompress badly packed groups during in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Fri Sep 4 03:43:11 BST 2009


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 4672 [merge]
revision-id: pqm at pqm.ubuntu.com-20090904024310-7a3uqlf6iruxvz6m
parent: pqm at pqm.ubuntu.com-20090904002239-lnn3vwtab1tcbqfg
parent: robertc at robertcollins.net-20090904014431-2x6fris6dyw42gfg
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Fri 2009-09-04 03:43:10 +0100
message:
  (robertc) Fix bug 402652: recompress badly packed groups during
  	fetch. (John Arbash Meinel, Robert  Collins)
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
  bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
  bzrlib/tests/test_groupcompress.py test_groupcompress.p-20080705181503-ccbxd6xuy1bdnrpu-13
  bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
=== modified file 'NEWS'
--- a/NEWS	2009-09-03 23:43:16 +0000
+++ b/NEWS	2009-09-04 00:49:55 +0000
@@ -33,6 +33,13 @@
 * Don't restrict the command name used to run the test suite.
   (Vincent Ladeuil, #419950)
 
+* Fetches from 2a to 2a are now again requested in 'groupcompress' order.
+  Groups that are seen as 'underutilized' will be repacked on-the-fly.
+  This means that when the source is fully packed, there is minimal
+  overhead during the fetch, but if the source is poorly packed the result
+  is a fairly well packed repository (not as good as 'bzr pack' but
+  good-enough.) (Robert Collins, John Arbash Meinel, #402652)
+
 * Network streams now decode adjacent records of the same type into a
   single stream, reducing layering churn. (Robert Collins)
 
@@ -109,6 +116,7 @@
 * The main table of contents now provides links to the new Migration Docs
   and Plugins Guide. (Ian Clatworthy)
 
+
 bzr 2.0rc1
 ##########
 

=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2009-08-30 21:34:42 +0000
+++ b/bzrlib/groupcompress.py	2009-09-04 01:44:31 +0000
@@ -457,7 +457,6 @@
                 # There are code paths that first extract as fulltext, and then
                 # extract as storage_kind (smart fetch). So we don't break the
                 # refcycle here, but instead in manager.get_record_stream()
-                # self._manager = None
             if storage_kind == 'fulltext':
                 return self._bytes
             else:
@@ -469,6 +468,14 @@
 class _LazyGroupContentManager(object):
     """This manages a group of _LazyGroupCompressFactory objects."""
 
+    _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
+                             # current size, and still be considered
+                             # resuable
+    _full_block_size = 4*1024*1024
+    _full_mixed_block_size = 2*1024*1024
+    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
+    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
+
     def __init__(self, block):
         self._block = block
         # We need to preserve the ordering
@@ -546,22 +553,23 @@
         # time (self._block._content) is a little expensive.
         self._block._ensure_content(self._last_byte)
 
-    def _check_rebuild_block(self):
+    def _check_rebuild_action(self):
         """Check to see if our block should be repacked."""
         total_bytes_used = 0
         last_byte_used = 0
         for factory in self._factories:
             total_bytes_used += factory._end - factory._start
-            last_byte_used = max(last_byte_used, factory._end)
-        # If we are using most of the bytes from the block, we have nothing
-        # else to check (currently more that 1/2)
+            if last_byte_used < factory._end:
+                last_byte_used = factory._end
+        # If we are using more than half of the bytes from the block, we have
+        # nothing else to check
         if total_bytes_used * 2 >= self._block._content_length:
-            return
-        # Can we just strip off the trailing bytes? If we are going to be
-        # transmitting more than 50% of the front of the content, go ahead
+            return None, last_byte_used, total_bytes_used
+        # We are using less than 50% of the content. Is the content we are
+        # using at the beginning of the block? If so, we can just trim the
+        # tail, rather than rebuilding from scratch.
         if total_bytes_used * 2 > last_byte_used:
-            self._trim_block(last_byte_used)
-            return
+            return 'trim', last_byte_used, total_bytes_used
 
         # We are using a small amount of the data, and it isn't just packed
         # nicely at the front, so rebuild the content.
@@ -574,7 +582,80 @@
         #       expanding many deltas into fulltexts, as well.
         #       If we build a cheap enough 'strip', then we could try a strip,
         #       if that expands the content, we then rebuild.
-        self._rebuild_block()
+        return 'rebuild', last_byte_used, total_bytes_used
+
+    def check_is_well_utilized(self):
+        """Is the current block considered 'well utilized'?
+
+        This heuristic asks if the current block considers itself to be a fully
+        developed group, rather than just a loose collection of data.
+        """
+        if len(self._factories) == 1:
+            # A block of length 1 could be improved by combining with other
+            # groups - don't look deeper. Even larger than max size groups
+            # could compress well with adjacent versions of the same thing.
+            return False
+        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
+        block_size = self._block._content_length
+        if total_bytes_used < block_size * self._max_cut_fraction:
+            # This block wants to trim itself small enough that we want to
+            # consider it under-utilized.
+            return False
+        # TODO: This code is meant to be the twin of _insert_record_stream's
+        #       'start_new_block' logic. It would probably be better to factor
+        #       out that logic into a shared location, so that it stays
+        #       together better
+        # We currently assume a block is properly utilized whenever it is >75%
+        # of the size of a 'full' block. In normal operation, a block is
+        # considered full when it hits 4MB of same-file content. So any block
+        # >3MB is 'full enough'.
+        # The only time this isn't true is when a given block has large-object
+        # content. (a single file >4MB, etc.)
+        # Under these circumstances, we allow a block to grow to
+        # 2 x largest_content.  Which means that if a given block had a large
+        # object, it may actually be under-utilized. However, given that this
+        # is 'pack-on-the-fly' it is probably reasonable to not repack large
+        # content blobs on-the-fly. Note that because we return False for all
+        # 1-item blobs, we will repack them; we may wish to reevaluate our
+        # treatment of large object blobs in the future.
+        if block_size >= self._full_enough_block_size:
+            return True
+        # If a block is <3MB, it still may be considered 'full' if it contains
+        # mixed content. The current rule is 2MB of mixed content is considered
+        # full. So check to see if this block contains mixed content, and
+        # set the threshold appropriately.
+        common_prefix = None
+        for factory in self._factories:
+            prefix = factory.key[:-1]
+            if common_prefix is None:
+                common_prefix = prefix
+            elif prefix != common_prefix:
+                # Mixed content, check the size appropriately
+                if block_size >= self._full_enough_mixed_block_size:
+                    return True
+                break
+        # The content failed both the mixed check and the single-content check
+        # so obviously it is not fully utilized
+        # TODO: there is one other constraint that isn't being checked
+        #       namely, that the entries in the block are in the appropriate
+        #       order. For example, you could insert the entries in exactly
+        #       reverse groupcompress order, and we would think that is ok.
+        #       (all the right objects are in one group, and it is fully
+        #       utilized, etc.) For now, we assume that case is rare,
+        #       especially since we should always fetch in 'groupcompress'
+        #       order.
+        return False
+
+    def _check_rebuild_block(self):
+        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
+        if action is None:
+            return
+        if action == 'trim':
+            self._trim_block(last_byte_used)
+        elif action == 'rebuild':
+            self._rebuild_block()
+        else:
+            raise ValueError('unknown rebuild action: %r' % (action,))
 
     def _wire_bytes(self):
         """Return a byte stream suitable for transmitting over the wire."""
@@ -1570,6 +1651,7 @@
         block_length = None
         # XXX: TODO: remove this, it is just for safety checking for now
         inserted_keys = set()
+        reuse_this_block = reuse_blocks
         for record in stream:
             # Raise an error when a record is missing.
             if record.storage_kind == 'absent':
@@ -1583,10 +1665,20 @@
             if reuse_blocks:
                 # If the reuse_blocks flag is set, check to see if we can just
                 # copy a groupcompress block as-is.
+                # We only check on the first record (groupcompress-block) not
+                # on all of the (groupcompress-block-ref) entries.
+                # The reuse_this_block flag is then kept for as long as
+                if record.storage_kind == 'groupcompress-block':
+                    # Check to see if we really want to re-use this block
+                    insert_manager = record._manager
+                    reuse_this_block = insert_manager.check_is_well_utilized()
+            else:
+                reuse_this_block = False
+            if reuse_this_block:
+                # We still want to reuse this block
                 if record.storage_kind == 'groupcompress-block':
                     # Insert the raw block into the target repo
                     insert_manager = record._manager
-                    insert_manager._check_rebuild_block()
                     bytes = record._manager._block.to_bytes()
                     _, start, length = self._access.add_raw_records(
                         [(None, len(bytes))], bytes)[0]
@@ -1597,6 +1689,11 @@
                                            'groupcompress-block-ref'):
                     if insert_manager is None:
                         raise AssertionError('No insert_manager set')
+                    if insert_manager is not record._manager:
+                        raise AssertionError('insert_manager does not match'
+                            ' the current record, we cannot be positive'
+                            ' that the appropriate content was inserted.'
+                            )
                     value = "%d %d %d %d" % (block_start, block_length,
                                              record._start, record._end)
                     nodes = [(record.key, value, (record.parents,))]

=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py	2009-08-24 19:34:13 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py	2009-09-01 06:10:24 +0000
@@ -932,7 +932,7 @@
         super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
         self._revision_keys = None
         self._text_keys = None
-        # self._text_fetch_order = 'unordered'
+        self._text_fetch_order = 'groupcompress'
         self._chk_id_roots = None
         self._chk_p_id_roots = None
 
@@ -949,7 +949,7 @@
             p_id_roots_set = set()
             source_vf = self.from_repository.inventories
             stream = source_vf.get_record_stream(inventory_keys,
-                                                 'unordered', True)
+                                                 'groupcompress', True)
             for record in stream:
                 if record.storage_kind == 'absent':
                     if allow_absent:

=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py	2009-09-02 18:07:58 +0000
+++ b/bzrlib/tests/test_groupcompress.py	2009-09-04 00:49:55 +0000
@@ -538,7 +538,7 @@
                     'as-requested', False)]
         self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
 
-    def test_insert_record_stream_re_uses_blocks(self):
+    def test_insert_record_stream_reuses_blocks(self):
         vf = self.make_test_vf(True, dir='source')
         def grouped_stream(revision_ids, first_parents=()):
             parents = first_parents
@@ -582,8 +582,14 @@
         vf2 = self.make_test_vf(True, dir='target')
         # ordering in 'groupcompress' order, should actually swap the groups in
         # the target vf, but the groups themselves should not be disturbed.
-        vf2.insert_record_stream(vf.get_record_stream(
-            [(r,) for r in 'abcdefgh'], 'groupcompress', False))
+        def small_size_stream():
+            for record in vf.get_record_stream([(r,) for r in 'abcdefgh'],
+                                               'groupcompress', False):
+                record._manager._full_enough_block_size = \
+                    record._manager._block._content_length
+                yield record
+                        
+        vf2.insert_record_stream(small_size_stream())
         stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
                                        'groupcompress', False)
         vf2.writer.end()
@@ -594,6 +600,44 @@
                              record._manager._block._z_content)
         self.assertEqual(8, num_records)
 
+    def test_insert_record_stream_packs_on_the_fly(self):
+        vf = self.make_test_vf(True, dir='source')
+        def grouped_stream(revision_ids, first_parents=()):
+            parents = first_parents
+            for revision_id in revision_ids:
+                key = (revision_id,)
+                record = versionedfile.FulltextContentFactory(
+                    key, parents, None,
+                    'some content that is\n'
+                    'identical except for\n'
+                    'revision_id:%s\n' % (revision_id,))
+                yield record
+                parents = (key,)
+        # One group, a-d
+        vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd']))
+        # Second group, e-h
+        vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'],
+                                               first_parents=(('d',),)))
+        # Now copy the blocks into another vf, and see that the
+        # insert_record_stream rebuilt a new block on-the-fly because of
+        # under-utilization
+        vf2 = self.make_test_vf(True, dir='target')
+        vf2.insert_record_stream(vf.get_record_stream(
+            [(r,) for r in 'abcdefgh'], 'groupcompress', False))
+        stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
+                                       'groupcompress', False)
+        vf2.writer.end()
+        num_records = 0
+        # All of the records should be recombined into a single block
+        block = None
+        for record in stream:
+            num_records += 1
+            if block is None:
+                block = record._manager._block
+            else:
+                self.assertIs(block, record._manager._block)
+        self.assertEqual(8, num_records)
+
     def test__insert_record_stream_no_reuse_block(self):
         vf = self.make_test_vf(True, dir='source')
         def grouped_stream(revision_ids, first_parents=()):
@@ -811,15 +855,19 @@
 
     _texts = {
         ('key1',): "this is a text\n"
-                   "with a reasonable amount of compressible bytes\n",
+                   "with a reasonable amount of compressible bytes\n"
+                   "which can be shared between various other texts\n",
         ('key2',): "another text\n"
-                   "with a reasonable amount of compressible bytes\n",
+                   "with a reasonable amount of compressible bytes\n"
+                   "which can be shared between various other texts\n",
         ('key3',): "yet another text which won't be extracted\n"
-                   "with a reasonable amount of compressible bytes\n",
+                   "with a reasonable amount of compressible bytes\n"
+                   "which can be shared between various other texts\n",
         ('key4',): "this will be extracted\n"
                    "but references most of its bytes from\n"
                    "yet another text which won't be extracted\n"
-                   "with a reasonable amount of compressible bytes\n",
+                   "with a reasonable amount of compressible bytes\n"
+                   "which can be shared between various other texts\n",
     }
     def make_block(self, key_to_text):
         """Create a GroupCompressBlock, filling it with the given texts."""
@@ -837,6 +885,13 @@
         start, end = locations[key]
         manager.add_factory(key, (), start, end)
 
+    def make_block_and_full_manager(self, texts):
+        locations, block = self.make_block(texts)
+        manager = groupcompress._LazyGroupContentManager(block)
+        for key in sorted(texts):
+            self.add_key_to_manager(key, locations, block, manager)
+        return block, manager
+
     def test_get_fulltexts(self):
         locations, block = self.make_block(self._texts)
         manager = groupcompress._LazyGroupContentManager(block)
@@ -893,8 +948,8 @@
         header_len = int(header_len)
         block_len = int(block_len)
         self.assertEqual('groupcompress-block', storage_kind)
-        self.assertEqual(33, z_header_len)
-        self.assertEqual(25, header_len)
+        self.assertEqual(34, z_header_len)
+        self.assertEqual(26, header_len)
         self.assertEqual(len(block_bytes), block_len)
         z_header = rest[:z_header_len]
         header = zlib.decompress(z_header)
@@ -934,13 +989,7 @@
         self.assertEqual([('key1',), ('key4',)], result_order)
 
     def test__check_rebuild_no_changes(self):
-        locations, block = self.make_block(self._texts)
-        manager = groupcompress._LazyGroupContentManager(block)
-        # Request all the keys, which ensures that we won't rebuild
-        self.add_key_to_manager(('key1',), locations, block, manager)
-        self.add_key_to_manager(('key2',), locations, block, manager)
-        self.add_key_to_manager(('key3',), locations, block, manager)
-        self.add_key_to_manager(('key4',), locations, block, manager)
+        block, manager = self.make_block_and_full_manager(self._texts)
         manager._check_rebuild_block()
         self.assertIs(block, manager._block)
 
@@ -971,3 +1020,50 @@
             self.assertEqual(('key4',), record.key)
             self.assertEqual(self._texts[record.key],
                              record.get_bytes_as('fulltext'))
+
+    def test_check_is_well_utilized_all_keys(self):
+        block, manager = self.make_block_and_full_manager(self._texts)
+        self.assertFalse(manager.check_is_well_utilized())
+        # Though we can fake it by changing the recommended minimum size
+        manager._full_enough_block_size = block._content_length
+        self.assertTrue(manager.check_is_well_utilized())
+        # Setting it just above causes it to fail
+        manager._full_enough_block_size = block._content_length + 1
+        self.assertFalse(manager.check_is_well_utilized())
+        # Setting the mixed-block size doesn't do anything, because the content
+        # is considered to not be 'mixed'
+        manager._full_enough_mixed_block_size = block._content_length
+        self.assertFalse(manager.check_is_well_utilized())
+
+    def test_check_is_well_utilized_mixed_keys(self):
+        texts = {}
+        f1k1 = ('f1', 'k1')
+        f1k2 = ('f1', 'k2')
+        f2k1 = ('f2', 'k1')
+        f2k2 = ('f2', 'k2')
+        texts[f1k1] = self._texts[('key1',)]
+        texts[f1k2] = self._texts[('key2',)]
+        texts[f2k1] = self._texts[('key3',)]
+        texts[f2k2] = self._texts[('key4',)]
+        block, manager = self.make_block_and_full_manager(texts)
+        self.assertFalse(manager.check_is_well_utilized())
+        manager._full_enough_block_size = block._content_length
+        self.assertTrue(manager.check_is_well_utilized())
+        manager._full_enough_block_size = block._content_length + 1
+        self.assertFalse(manager.check_is_well_utilized())
+        manager._full_enough_mixed_block_size = block._content_length
+        self.assertTrue(manager.check_is_well_utilized())
+
+    def test_check_is_well_utilized_partial_use(self):
+        locations, block = self.make_block(self._texts)
+        manager = groupcompress._LazyGroupContentManager(block)
+        manager._full_enough_block_size = block._content_length
+        self.add_key_to_manager(('key1',), locations, block, manager)
+        self.add_key_to_manager(('key2',), locations, block, manager)
+        # Just using the content from key1 and 2 is not enough to be considered
+        # 'complete'
+        self.assertFalse(manager.check_is_well_utilized())
+        # However if we add key3, then we have enough, as we only require 75%
+        # consumption
+        self.add_key_to_manager(('key4',), locations, block, manager)
+        self.assertTrue(manager.check_is_well_utilized())

=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py	2009-09-03 04:51:46 +0000
+++ b/bzrlib/tests/test_repository.py	2009-09-04 00:49:55 +0000
@@ -683,6 +683,28 @@
 
 class Test2a(tests.TestCaseWithMemoryTransport):
 
+    def test_fetch_combines_groups(self):
+        builder = self.make_branch_builder('source', format='2a')
+        builder.start_series()
+        builder.build_snapshot('1', None, [
+            ('add', ('', 'root-id', 'directory', '')),
+            ('add', ('file', 'file-id', 'file', 'content\n'))])
+        builder.build_snapshot('2', ['1'], [
+            ('modify', ('file-id', 'content-2\n'))])
+        builder.finish_series()
+        source = builder.get_branch()
+        target = self.make_repository('target', format='2a')
+        target.fetch(source.repository)
+        target.lock_read()
+        self.addCleanup(target.unlock)
+        details = target.texts._index.get_build_details(
+            [('file-id', '1',), ('file-id', '2',)])
+        file_1_details = details[('file-id', '1')]
+        file_2_details = details[('file-id', '2')]
+        # The index, and what to read off disk, should be the same for both
+        # versions of the file.
+        self.assertEqual(file_1_details[0][:3], file_2_details[0][:3])
+
     def test_format_pack_compresses_True(self):
         repo = self.make_repository('repo', format='2a')
         self.assertTrue(repo._format.pack_compresses)




More information about the bazaar-commits mailing list