Rev 3906: Add a _LazyGroupContentManager._check_rebuild_block in http://bzr.arbash-meinel.com/branches/bzr/brisbane/lazy_gc_stream
John Arbash Meinel
john at arbash-meinel.com
Tue Mar 17 19:27:12 GMT 2009
At http://bzr.arbash-meinel.com/branches/bzr/brisbane/lazy_gc_stream
------------------------------------------------------------
revno: 3906
revision-id: john at arbash-meinel.com-20090317192705-8r4ny7purwsx3m0l
parent: john at arbash-meinel.com-20090317183751-uzqw7v9jdlvntbrj
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: lazy_gc_stream
timestamp: Tue 2009-03-17 14:27:05 -0500
message:
Add a _LazyGroupContentManager._check_rebuild_block
This can be called to ensure the content we are generated has been appropriately
shrunk before we put it on-the-wire, etc.
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py 2009-03-17 18:29:06 +0000
+++ b/bzrlib/groupcompress.py 2009-03-17 19:27:05 +0000
@@ -19,6 +19,7 @@
from itertools import izip
from cStringIO import StringIO
import struct
+import time
import zlib
try:
import pylzma
@@ -34,6 +35,7 @@
osutils,
pack,
patiencediff,
+ trace,
)
from bzrlib.graph import Graph
from bzrlib.knit import _DirectPackAccess
@@ -463,11 +465,11 @@
return ''.join(chunks)
-class LazyGroupCompressFactory(object):
+class _LazyGroupCompressFactory(object):
"""Yield content from a GroupCompressBlock on demand."""
def __init__(self, key, parents, manager, start, end, first):
- """Create a LazyGroupCompressFactory
+ """Create a _LazyGroupCompressFactory
:param key: The key of just this record
:param parents: The parents of this key (possibly None)
@@ -517,7 +519,7 @@
class _LazyGroupContentManager(object):
- """This manages a group of LazyGroupCompressFactory objects."""
+ """This manages a group of _LazyGroupCompressFactory objects."""
def __init__(self, block):
self._block = block
@@ -530,7 +532,7 @@
else:
first = False
# Note that this creates a reference cycle....
- factory = LazyGroupCompressFactory(key, parents, self,
+ factory = _LazyGroupCompressFactory(key, parents, self,
start, end, first=first)
self._factories.append(factory)
@@ -541,6 +543,74 @@
# TODO: Consider setting self._factories = None after the above loop,
# as it will break the reference cycle
+ def _trim_block(self, last_byte):
+ """Create a new GroupCompressBlock, with just some of the content."""
+ # None of the factories need to be adjusted, because the content is
+ # located in an identical place. Just that some of the unreferenced
+ # trailing bytes are stripped
+ trace.mutter('stripping trailing bytes from groupcompress block'
+ ' %d => %d', self._block._content_length, last_byte)
+ new_block = GroupCompressBlock()
+ self._block._ensure_content(last_byte)
+ new_block.set_content(self._block._content[:last_byte])
+ self._block = new_block
+
+ def _rebuild_block(self):
+ """Create a new GroupCompressBlock with only the referenced texts."""
+ compressor = GroupCompressor()
+ tstart = time.time()
+ old_length = self._block._content_length
+ cur_endpoint = 0
+ for factory in self._factories:
+ bytes = factory.get_bytes_as('fulltext')
+ (found_sha1, end_point, type,
+ length) = compressor.compress(factory.key, bytes, factory.sha1)
+ # Now update this factory with the new offsets, etc
+ factory.sha1 = found_sha1
+ factory._start = cur_endpoint
+ factory._end = end_point
+ cur_endpoint = end_point
+ new_block = compressor.flush()
+ # TODO: Should we check that new_block really *is* smaller than the old
+ # block? It seems hard to come up with a method that it would
+ # expand, since we do full compression again. Perhaps based on a
+ # request that ends up poorly ordered?
+ delta = time.time() - tstart
+ self._block = new_block
+ trace.mutter('creating new compressed block on-the-fly in %.3fs'
+ ' %d bytes => %d bytes', delta, old_length,
+ self._block._content_length)
+
+ def _check_rebuild_block(self):
+ """Check to see if our block should be repacked."""
+ total_bytes_used = 0
+ last_byte_used = 0
+ for factory in self._factories:
+ total_bytes_used += factory._end - factory._start
+ last_byte_used = max(last_byte_used, factory._end)
+ # If we are using most of the bytes from the block, we have nothing
+ # else to check (currently more that 1/2)
+ if total_bytes_used * 2 >= self._block._content_length:
+ return
+ # Can we just strip off the trailing bytes? If we are going to be
+ # transmitting more than 50% of the front of the content, go ahead
+ if total_bytes_used * 2 > last_byte_used:
+ self._trim_block(last_byte_used)
+ return
+
+ # We are using a small amount of the data, and it isn't just packed
+ # nicely at the front, so rebuild the content.
+ # Note: This would be *nicer* as a strip-data-from-group, rather than
+ # building it up again from scratch
+ # It might be reasonable to consider the fulltext sizes for
+ # different bits when deciding this, too. As you may have a small
+ # fulltext, and a trivial delta, and you are just trading around
+ # for another fulltext. If we do a simple 'prune' you may end up
+ # expanding many deltas into fulltexts, as well.
+ # If we build a cheap enough 'strip', then we could try a strip,
+ # if that expands the content, we then rebuild.
+ self._rebuild_block()
+
def _wire_bytes(self):
"""Return a byte stream suitable for transmitting over the wire."""
# TODO: this might be a really good time to determine that we want to
@@ -811,7 +881,7 @@
content = ''.join(self.lines)
self.lines = None
self._block.set_content(content)
- return self._block.to_bytes()
+ return self._block
def output_chunks(self, new_chunks):
"""Output some chunks.
@@ -1328,7 +1398,7 @@
keys_to_add = []
basis_end = 0
def flush():
- bytes = self._compressor.flush()
+ bytes = self._compressor.flush().to_bytes()
index, start, length = self._access.add_raw_records(
[(None, len(bytes))], bytes)[0]
nodes = []
=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py 2009-03-17 18:29:06 +0000
+++ b/bzrlib/tests/test_groupcompress.py 2009-03-17 19:27:05 +0000
@@ -194,9 +194,12 @@
start = 0
for key in sorted(key_to_text):
compressor.compress(key, key_to_text[key], None)
- entries = compressor._block._entries
- raw_bytes = compressor.flush()
- return entries, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
+ block = compressor.flush()
+ entries = block._entries
+ # Go through from_bytes(to_bytes()) so that we start with a compressed
+ # content object
+ return entries, groupcompress.GroupCompressBlock.from_bytes(
+ block.to_bytes())
def test_from_empty_bytes(self):
self.assertRaises(ValueError,
@@ -595,8 +598,9 @@
start = 0
for key in sorted(key_to_text):
compressor.compress(key, key_to_text[key], None)
- entries = compressor._block._entries
- raw_bytes = compressor.flush()
+ block = compressor.flush()
+ entries = block._entries
+ raw_bytes = block.to_bytes()
return entries, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
def add_key_to_manager(self, key, entries, block, manager):
@@ -692,3 +696,42 @@
text = self._texts[record.key]
self.assertEqual(text, record.get_bytes_as('fulltext'))
self.assertEqual([('key1',), ('key4',)], result_order)
+
+ def test__check_rebuild_no_changes(self):
+ entries, block = self.make_block(self._texts)
+ manager = groupcompress._LazyGroupContentManager(block)
+ # Request all the keys, which ensures that we won't rebuild
+ self.add_key_to_manager(('key1',), entries, block, manager)
+ self.add_key_to_manager(('key2',), entries, block, manager)
+ self.add_key_to_manager(('key3',), entries, block, manager)
+ self.add_key_to_manager(('key4',), entries, block, manager)
+ manager._check_rebuild_block()
+ self.assertIs(block, manager._block)
+
+ def test__check_rebuild_only_one(self):
+ entries, block = self.make_block(self._texts)
+ manager = groupcompress._LazyGroupContentManager(block)
+ # Request just the first key, which should trigger a 'strip' action
+ self.add_key_to_manager(('key1',), entries, block, manager)
+ manager._check_rebuild_block()
+ self.assertIsNot(block, manager._block)
+ self.assertTrue(block._content_length > manager._block._content_length)
+ # We should be able to still get the content out of this block, though
+ # it should only have 1 entry
+ for record in manager.get_record_stream():
+ self.assertEqual(('key1',), record.key)
+ self.assertEqual(self._texts[record.key],
+ record.get_bytes_as('fulltext'))
+
+ def test__check_rebuild_middle(self):
+ entries, block = self.make_block(self._texts)
+ manager = groupcompress._LazyGroupContentManager(block)
+ # Request a small key in the middle should trigger a 'rebuild'
+ self.add_key_to_manager(('key4',), entries, block, manager)
+ manager._check_rebuild_block()
+ self.assertIsNot(block, manager._block)
+ self.assertTrue(block._content_length > manager._block._content_length)
+ for record in manager.get_record_stream():
+ self.assertEqual(('key4',), record.key)
+ self.assertEqual(self._texts[record.key],
+ record.get_bytes_as('fulltext'))
More information about the bazaar-commits
mailing list