Rev 5760: Expose the setting up the stack. in http://bazaar.launchpad.net/~jameinel/bzr/2.4-max-entries-gc-602614

John Arbash Meinel john at arbash-meinel.com
Thu May 12 15:10:07 UTC 2011


At http://bazaar.launchpad.net/~jameinel/bzr/2.4-max-entries-gc-602614

------------------------------------------------------------
revno: 5760
revision-id: john at arbash-meinel.com-20110512150951-h7sdnrsn0zt9l5zl
parent: john at arbash-meinel.com-20110512115831-q8nzn6owcz3sy3ul
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.4-max-entries-gc-602614
timestamp: Thu 2011-05-12 17:09:51 +0200
message:
  Expose the setting up the stack.
  _LazyGroupContentManager needs to know about the value if it is going to
  rebuild a block. Which is created from _BatchingBlockFetcher.
  To avoid reading the config value all the time, use a callable
  so that we only read the config if we actually rebuild a block.
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2011-05-12 11:58:31 +0000
+++ b/bzrlib/groupcompress.py	2011-05-12 15:09:51 +0000
@@ -491,11 +491,25 @@
     _full_enough_block_size = 3*1024*1024 # size at which we won't repack
     _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
 
-    def __init__(self, block):
+    def __init__(self, block, get_max_entries_per_source=None):
         self._block = block
         # We need to preserve the ordering
         self._factories = []
         self._last_byte = 0
+        self._get_max = get_max_entries_per_source
+        self._max_entries_per_source = None
+
+    def _get_max_entries_per_source(self):
+        if self._max_entries_per_source is not None:
+            return self._max_entries_per_source
+        max_entries = None
+        if self._get_max is not None:
+            max_entries = self._get_max()
+        if max_entries is None:
+            vf = GroupCompressVersionedFiles
+            max_entries = vf._DEFAULT_MAX_ENTRIES_PER_SOURCE
+        self._max_entries_per_source = max_entries
+        return self._max_entries_per_source
 
     def add_factory(self, key, parents, start, end):
         if not self._factories:
@@ -534,9 +548,12 @@
         new_block.set_content(self._block._content[:last_byte])
         self._block = new_block
 
+    def _make_group_compressor(self):
+        return GroupCompressor(self._get_max_entries_per_source())
+
     def _rebuild_block(self):
         """Create a new GroupCompressBlock with only the referenced texts."""
-        compressor = GroupCompressor()
+        compressor = self._make_group_compressor()
         tstart = time.time()
         old_length = self._block._content_length
         end_point = 0
@@ -554,6 +571,11 @@
         #       block? It seems hard to come up with a method that it would
         #       expand, since we do full compression again. Perhaps based on a
         #       request that ends up poorly ordered?
+        # TODO: If the content would have expanded, then we would want to
+        #       handle a case where we need to split the block.
+        #       Now that we have a user-tweakable option
+        #       (max_entries_per_source), it is possible that one person set it
+        #       to a very low value, causing poor compression.
         delta = time.time() - tstart
         self._block = new_block
         trace.mutter('creating new compressed block on-the-fly in %.3fs'
@@ -1070,12 +1092,12 @@
 
 class _BatchingBlockFetcher(object):
     """Fetch group compress blocks in batches.
-    
+
     :ivar total_bytes: int of expected number of bytes needed to fetch the
         currently pending batch.
     """
 
-    def __init__(self, gcvf, locations):
+    def __init__(self, gcvf, locations, get_max_entries_per_source=None):
         self.gcvf = gcvf
         self.locations = locations
         self.keys = []
@@ -1084,10 +1106,11 @@
         self.total_bytes = 0
         self.last_read_memo = None
         self.manager = None
+        self._get_max_entries_per_source = get_max_entries_per_source
 
     def add_key(self, key):
         """Add another to key to fetch.
-        
+
         :return: The estimated number of bytes needed to fetch the batch so
             far.
         """
@@ -1118,7 +1141,7 @@
             # and then.
             self.batch_memos[read_memo] = cached_block
         return self.total_bytes
-        
+
     def _flush_manager(self):
         if self.manager is not None:
             for factory in self.manager.get_record_stream():
@@ -1129,7 +1152,7 @@
     def yield_factories(self, full_flush=False):
         """Yield factories for keys added since the last yield.  They will be
         returned in the order they were added via add_key.
-        
+
         :param full_flush: by default, some results may not be returned in case
             they can be part of the next batch.  If full_flush is True, then
             all results are returned.
@@ -1163,7 +1186,8 @@
                     memos_to_get_stack.pop()
                 else:
                     block = self.batch_memos[read_memo]
-                self.manager = _LazyGroupContentManager(block)
+                self.manager = _LazyGroupContentManager(block,
+                    get_max_entries_per_source=self._get_max_entries_per_source)
                 self.last_read_memo = read_memo
             start, end = index_memo[3:5]
             self.manager.add_factory(key, parents, start, end)
@@ -1589,7 +1613,8 @@
         #  - we encounter an unadded ref, or
         #  - we run out of keys, or
         #  - the total bytes to retrieve for this batch > BATCH_SIZE
-        batcher = _BatchingBlockFetcher(self, locations)
+        batcher = _BatchingBlockFetcher(self, locations,
+            get_max_entries_per_source=self._get_max_entries_per_source)
         for source, keys in source_keys:
             if source is self:
                 for key in keys:
@@ -1641,7 +1666,7 @@
         for _ in self._insert_record_stream(stream, random_id=False):
             pass
 
-    def _make_group_compressor(self):
+    def _get_max_entries_per_source(self):
         if self._max_entries_per_source is None:
             # TODO: VersionedFiles don't know about their containing
             #       repository, so they don't have much of an idea about their
@@ -1660,7 +1685,10 @@
             if val is None:
                 val = self._DEFAULT_MAX_ENTRIES_PER_SOURCE
             self._max_entries_per_source = val
-        return GroupCompressor(self._max_entries_per_source)
+        return self._max_entries_per_source
+
+    def _make_group_compressor(self):
+        return GroupCompressor(self._get_max_entries_per_source())
 
     def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
                               reuse_blocks=True):

=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py	2011-05-12 11:58:31 +0000
+++ b/bzrlib/tests/test_groupcompress.py	2011-05-12 15:09:51 +0000
@@ -1090,6 +1090,51 @@
             self.assertEqual(self._texts[record.key],
                              record.get_bytes_as('fulltext'))
 
+    def test_manager_default_max_entries_per_source(self):
+        locations, old_block = self.make_block(self._texts)
+        manager = groupcompress._LazyGroupContentManager(old_block)
+        gcvf = groupcompress.GroupCompressVersionedFiles
+        # It doesn't greedily evaluate _max_entries_per_source
+        self.assertIs(None, manager._max_entries_per_source)
+        self.assertEqual(gcvf._DEFAULT_MAX_ENTRIES_PER_SOURCE,
+                         manager._get_max_entries_per_source())
+
+    def test_manager_custom_max_entries_per_source(self):
+        locations, old_block = self.make_block(self._texts)
+        called = []
+        def max_entries():
+            called.append('called')
+            return 10
+        manager = groupcompress._LazyGroupContentManager(old_block,
+            get_max_entries_per_source=max_entries)
+        gcvf = groupcompress.GroupCompressVersionedFiles
+        # It doesn't greedily evaluate _max_entries_per_source
+        self.assertIs(None, manager._max_entries_per_source)
+        self.assertEqual(10, manager._get_max_entries_per_source())
+        self.assertEqual(10, manager._get_max_entries_per_source())
+        self.assertEqual(10, manager._max_entries_per_source)
+        # Only called 1 time
+        self.assertEqual(['called'], called)
+
+    def test__rebuild_handles_max_entries_per_source(self):
+        locations, old_block = self.make_block(self._texts)
+        manager = groupcompress._LazyGroupContentManager(old_block,
+            get_max_entries_per_source=lambda: 2)
+        gc = manager._make_group_compressor()
+        if isinstance(gc, groupcompress.PyrexGroupCompressor):
+            self.assertEqual(2, gc._delta_index._max_entries_per_source)
+        self.add_key_to_manager(('key3',), locations, old_block, manager)
+        self.add_key_to_manager(('key4',), locations, old_block, manager)
+        action, last_byte, total_bytes = manager._check_rebuild_action()
+        self.assertEqual('rebuild', action)
+        manager._rebuild_block()
+        new_block = manager._block
+        self.assertIsNot(old_block, new_block)
+        # Because of the new max_entries_per_source, we do a poor job of
+        # rebuilding. This is a side-effect of the change, but at least it does
+        # show the setting had an effect.
+        self.assertTrue(old_block._content_length < new_block._content_length)
+
     def test_check_is_well_utilized_all_keys(self):
         block, manager = self.make_block_and_full_manager(self._texts)
         self.assertFalse(manager.check_is_well_utilized())

=== modified file 'doc/en/release-notes/bzr-2.4.txt'
--- a/doc/en/release-notes/bzr-2.4.txt	2011-05-10 09:34:35 +0000
+++ b/doc/en/release-notes/bzr-2.4.txt	2011-05-12 15:09:51 +0000
@@ -20,6 +20,15 @@
 
 .. New commands, options, etc that users may wish to try out.
 
+* The text compressor used for 2a repositories now has a tweakable
+  parameter that can be set in bazaar.conf.
+  ``bzr.groupcompress.max_entries_per_source`` default of 65536.
+  When doing compression, we build up an index of locations to match
+  against. Setting this higher will result in slightly better compression,
+  at a cost of more memory. Note that a value of 65k represents fully
+  sampling a 1MB file. So this only has an effect when compressing texts
+  larger than N*16 bytes. (John Arbash Meinel, #602614)
+
 Improvements
 ************
 



More information about the bazaar-commits mailing list