Rev 3906: Add a _LazyGroupContentManager._check_rebuild_block in http://bzr.arbash-meinel.com/branches/bzr/brisbane/lazy_gc_stream

John Arbash Meinel john at arbash-meinel.com
Tue Mar 17 19:27:12 GMT 2009


At http://bzr.arbash-meinel.com/branches/bzr/brisbane/lazy_gc_stream

------------------------------------------------------------
revno: 3906
revision-id: john at arbash-meinel.com-20090317192705-8r4ny7purwsx3m0l
parent: john at arbash-meinel.com-20090317183751-uzqw7v9jdlvntbrj
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: lazy_gc_stream
timestamp: Tue 2009-03-17 14:27:05 -0500
message:
  Add a _LazyGroupContentManager._check_rebuild_block
  This can be called to ensure the content we are generated has been appropriately
  shrunk before we put it on-the-wire, etc.
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2009-03-17 18:29:06 +0000
+++ b/bzrlib/groupcompress.py	2009-03-17 19:27:05 +0000
@@ -19,6 +19,7 @@
 from itertools import izip
 from cStringIO import StringIO
 import struct
+import time
 import zlib
 try:
     import pylzma
@@ -34,6 +35,7 @@
     osutils,
     pack,
     patiencediff,
+    trace,
     )
 from bzrlib.graph import Graph
 from bzrlib.knit import _DirectPackAccess
@@ -463,11 +465,11 @@
         return ''.join(chunks)
 
 
-class LazyGroupCompressFactory(object):
+class _LazyGroupCompressFactory(object):
     """Yield content from a GroupCompressBlock on demand."""
 
     def __init__(self, key, parents, manager, start, end, first):
-        """Create a LazyGroupCompressFactory
+        """Create a _LazyGroupCompressFactory
 
         :param key: The key of just this record
         :param parents: The parents of this key (possibly None)
@@ -517,7 +519,7 @@
 
 
 class _LazyGroupContentManager(object):
-    """This manages a group of LazyGroupCompressFactory objects."""
+    """This manages a group of _LazyGroupCompressFactory objects."""
 
     def __init__(self, block):
         self._block = block
@@ -530,7 +532,7 @@
         else:
             first = False
         # Note that this creates a reference cycle....
-        factory = LazyGroupCompressFactory(key, parents, self,
+        factory = _LazyGroupCompressFactory(key, parents, self,
             start, end, first=first)
         self._factories.append(factory)
 
@@ -541,6 +543,74 @@
         # TODO: Consider setting self._factories = None after the above loop,
         #       as it will break the reference cycle
 
+    def _trim_block(self, last_byte):
+        """Create a new GroupCompressBlock, with just some of the content."""
+        # None of the factories need to be adjusted, because the content is
+        # located in an identical place. Just that some of the unreferenced
+        # trailing bytes are stripped
+        trace.mutter('stripping trailing bytes from groupcompress block'
+                     ' %d => %d', self._block._content_length, last_byte)
+        new_block = GroupCompressBlock()
+        self._block._ensure_content(last_byte)
+        new_block.set_content(self._block._content[:last_byte])
+        self._block = new_block
+
+    def _rebuild_block(self):
+        """Create a new GroupCompressBlock with only the referenced texts."""
+        compressor = GroupCompressor()
+        tstart = time.time()
+        old_length = self._block._content_length
+        cur_endpoint = 0
+        for factory in self._factories:
+            bytes = factory.get_bytes_as('fulltext')
+            (found_sha1, end_point, type,
+             length) = compressor.compress(factory.key, bytes, factory.sha1)
+            # Now update this factory with the new offsets, etc
+            factory.sha1 = found_sha1
+            factory._start = cur_endpoint
+            factory._end = end_point
+            cur_endpoint = end_point
+        new_block = compressor.flush()
+        # TODO: Should we check that new_block really *is* smaller than the old
+        #       block? It seems hard to come up with a method that it would
+        #       expand, since we do full compression again. Perhaps based on a
+        #       request that ends up poorly ordered?
+        delta = time.time() - tstart
+        self._block = new_block
+        trace.mutter('creating new compressed block on-the-fly in %.3fs'
+                     ' %d bytes => %d bytes', delta, old_length,
+                     self._block._content_length)
+
+    def _check_rebuild_block(self):
+        """Check to see if our block should be repacked."""
+        total_bytes_used = 0
+        last_byte_used = 0
+        for factory in self._factories:
+            total_bytes_used += factory._end - factory._start
+            last_byte_used = max(last_byte_used, factory._end)
+        # If we are using most of the bytes from the block, we have nothing
+        # else to check (currently more that 1/2)
+        if total_bytes_used * 2 >= self._block._content_length:
+            return
+        # Can we just strip off the trailing bytes? If we are going to be
+        # transmitting more than 50% of the front of the content, go ahead
+        if total_bytes_used * 2 > last_byte_used:
+            self._trim_block(last_byte_used)
+            return
+
+        # We are using a small amount of the data, and it isn't just packed
+        # nicely at the front, so rebuild the content.
+        # Note: This would be *nicer* as a strip-data-from-group, rather than
+        #       building it up again from scratch
+        #       It might be reasonable to consider the fulltext sizes for
+        #       different bits when deciding this, too. As you may have a small
+        #       fulltext, and a trivial delta, and you are just trading around
+        #       for another fulltext. If we do a simple 'prune' you may end up
+        #       expanding many deltas into fulltexts, as well.
+        #       If we build a cheap enough 'strip', then we could try a strip,
+        #       if that expands the content, we then rebuild.
+        self._rebuild_block()
+
     def _wire_bytes(self):
         """Return a byte stream suitable for transmitting over the wire."""
         # TODO: this might be a really good time to determine that we want to
@@ -811,7 +881,7 @@
         content = ''.join(self.lines)
         self.lines = None
         self._block.set_content(content)
-        return self._block.to_bytes()
+        return self._block
 
     def output_chunks(self, new_chunks):
         """Output some chunks.
@@ -1328,7 +1398,7 @@
         keys_to_add = []
         basis_end = 0
         def flush():
-            bytes = self._compressor.flush()
+            bytes = self._compressor.flush().to_bytes()
             index, start, length = self._access.add_raw_records(
                 [(None, len(bytes))], bytes)[0]
             nodes = []

=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py	2009-03-17 18:29:06 +0000
+++ b/bzrlib/tests/test_groupcompress.py	2009-03-17 19:27:05 +0000
@@ -194,9 +194,12 @@
         start = 0
         for key in sorted(key_to_text):
             compressor.compress(key, key_to_text[key], None)
-        entries = compressor._block._entries
-        raw_bytes = compressor.flush()
-        return entries, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
+        block = compressor.flush()
+        entries = block._entries
+        # Go through from_bytes(to_bytes()) so that we start with a compressed
+        # content object
+        return entries, groupcompress.GroupCompressBlock.from_bytes(
+            block.to_bytes())
 
     def test_from_empty_bytes(self):
         self.assertRaises(ValueError,
@@ -595,8 +598,9 @@
         start = 0
         for key in sorted(key_to_text):
             compressor.compress(key, key_to_text[key], None)
-        entries = compressor._block._entries
-        raw_bytes = compressor.flush()
+        block = compressor.flush()
+        entries = block._entries
+        raw_bytes = block.to_bytes()
         return entries, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
 
     def add_key_to_manager(self, key, entries, block, manager):
@@ -692,3 +696,42 @@
             text = self._texts[record.key]
             self.assertEqual(text, record.get_bytes_as('fulltext'))
         self.assertEqual([('key1',), ('key4',)], result_order)
+
+    def test__check_rebuild_no_changes(self):
+        entries, block = self.make_block(self._texts)
+        manager = groupcompress._LazyGroupContentManager(block)
+        # Request all the keys, which ensures that we won't rebuild
+        self.add_key_to_manager(('key1',), entries, block, manager)
+        self.add_key_to_manager(('key2',), entries, block, manager)
+        self.add_key_to_manager(('key3',), entries, block, manager)
+        self.add_key_to_manager(('key4',), entries, block, manager)
+        manager._check_rebuild_block()
+        self.assertIs(block, manager._block)
+
+    def test__check_rebuild_only_one(self):
+        entries, block = self.make_block(self._texts)
+        manager = groupcompress._LazyGroupContentManager(block)
+        # Request just the first key, which should trigger a 'strip' action
+        self.add_key_to_manager(('key1',), entries, block, manager)
+        manager._check_rebuild_block()
+        self.assertIsNot(block, manager._block)
+        self.assertTrue(block._content_length > manager._block._content_length)
+        # We should be able to still get the content out of this block, though
+        # it should only have 1 entry
+        for record in manager.get_record_stream():
+            self.assertEqual(('key1',), record.key)
+            self.assertEqual(self._texts[record.key],
+                             record.get_bytes_as('fulltext'))
+
+    def test__check_rebuild_middle(self):
+        entries, block = self.make_block(self._texts)
+        manager = groupcompress._LazyGroupContentManager(block)
+        # Request a small key in the middle should trigger a 'rebuild'
+        self.add_key_to_manager(('key4',), entries, block, manager)
+        manager._check_rebuild_block()
+        self.assertIsNot(block, manager._block)
+        self.assertTrue(block._content_length > manager._block._content_length)
+        for record in manager.get_record_stream():
+            self.assertEqual(('key4',), record.key)
+            self.assertEqual(self._texts[record.key],
+                             record.get_bytes_as('fulltext'))



More information about the bazaar-commits mailing list