Rev 4657: (andrew) Fix #402657: 2a fetch over dumb transport reads one group at in file:///home/pqm/archives/thelove/bzr/%2Btrunk/
Canonical.com Patch Queue Manager
pqm at pqm.ubuntu.com
Thu Aug 27 03:27:23 BST 2009
At file:///home/pqm/archives/thelove/bzr/%2Btrunk/
------------------------------------------------------------
revno: 4657 [merge]
revision-id: pqm at pqm.ubuntu.com-20090827022719-bl2yoqhpj3fcfczu
parent: pqm at pqm.ubuntu.com-20090827013447-ndjtt02ad7nfdoiy
parent: andrew.bennetts at canonical.com-20090827004122-88bn53xrv8mdh598
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Thu 2009-08-27 03:27:19 +0100
message:
(andrew) Fix #402657: 2a fetch over dumb transport reads one group at
a time.
modified:
bzrlib/groupcompress.py groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py 2009-08-19 16:23:39 +0000
+++ b/bzrlib/groupcompress.py 2009-08-26 05:41:58 +0000
@@ -33,7 +33,6 @@
pack,
trace,
)
-from bzrlib.graph import Graph
from bzrlib.btree_index import BTreeBuilder
from bzrlib.lru_cache import LRUSizeCache
from bzrlib.tsort import topo_sort
@@ -45,12 +44,15 @@
VersionedFiles,
)
+# Minimum number of uncompressed bytes to try fetch at once when retrieving
+# groupcompress blocks.
+BATCH_SIZE = 2**16
+
_USE_LZMA = False and (pylzma is not None)
# osutils.sha_string('')
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
-
def sort_gc_optimal(parent_map):
"""Sort and group the keys in parent_map into groupcompress order.
@@ -974,6 +976,114 @@
versioned_files.stream.close()
+class _BatchingBlockFetcher(object):
+ """Fetch group compress blocks in batches.
+
+ :ivar total_bytes: int of expected number of bytes needed to fetch the
+ currently pending batch.
+ """
+
+ def __init__(self, gcvf, locations):
+ self.gcvf = gcvf
+ self.locations = locations
+ self.keys = []
+ self.batch_memos = {}
+ self.memos_to_get = []
+ self.total_bytes = 0
+ self.last_read_memo = None
+ self.manager = None
+
+ def add_key(self, key):
+ """Add another to key to fetch.
+
+ :return: The estimated number of bytes needed to fetch the batch so
+ far.
+ """
+ self.keys.append(key)
+ index_memo, _, _, _ = self.locations[key]
+ read_memo = index_memo[0:3]
+ # Three possibilities for this read_memo:
+ # - it's already part of this batch; or
+ # - it's not yet part of this batch, but is already cached; or
+ # - it's not yet part of this batch and will need to be fetched.
+ if read_memo in self.batch_memos:
+ # This read memo is already in this batch.
+ return self.total_bytes
+ try:
+ cached_block = self.gcvf._group_cache[read_memo]
+ except KeyError:
+ # This read memo is new to this batch, and the data isn't cached
+ # either.
+ self.batch_memos[read_memo] = None
+ self.memos_to_get.append(read_memo)
+ byte_length = read_memo[2]
+ self.total_bytes += byte_length
+ else:
+ # This read memo is new to this batch, but cached.
+ # Keep a reference to the cached block in batch_memos because it's
+ # certain that we'll use it when this batch is processed, but
+ # there's a risk that it would fall out of _group_cache between now
+ # and then.
+ self.batch_memos[read_memo] = cached_block
+ return self.total_bytes
+
+ def _flush_manager(self):
+ if self.manager is not None:
+ for factory in self.manager.get_record_stream():
+ yield factory
+ self.manager = None
+ self.last_read_memo = None
+
+ def yield_factories(self, full_flush=False):
+ """Yield factories for keys added since the last yield. They will be
+ returned in the order they were added via add_key.
+
+ :param full_flush: by default, some results may not be returned in case
+ they can be part of the next batch. If full_flush is True, then
+ all results are returned.
+ """
+ if self.manager is None and not self.keys:
+ return
+ # Fetch all memos in this batch.
+ blocks = self.gcvf._get_blocks(self.memos_to_get)
+ # Turn blocks into factories and yield them.
+ memos_to_get_stack = list(self.memos_to_get)
+ memos_to_get_stack.reverse()
+ for key in self.keys:
+ index_memo, _, parents, _ = self.locations[key]
+ read_memo = index_memo[:3]
+ if self.last_read_memo != read_memo:
+ # We are starting a new block. If we have a
+ # manager, we have found everything that fits for
+ # now, so yield records
+ for factory in self._flush_manager():
+ yield factory
+ # Now start a new manager.
+ if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
+ # The next block from _get_blocks will be the block we
+ # need.
+ block_read_memo, block = blocks.next()
+ if block_read_memo != read_memo:
+ raise AssertionError(
+ "block_read_memo out of sync with read_memo"
+ "(%r != %r)" % (block_read_memo, read_memo))
+ self.batch_memos[read_memo] = block
+ memos_to_get_stack.pop()
+ else:
+ block = self.batch_memos[read_memo]
+ self.manager = _LazyGroupContentManager(block)
+ self.last_read_memo = read_memo
+ start, end = index_memo[3:5]
+ self.manager.add_factory(key, parents, start, end)
+ if full_flush:
+ for factory in self._flush_manager():
+ yield factory
+ del self.keys[:]
+ self.batch_memos.clear()
+ del self.memos_to_get[:]
+ self.total_bytes = 0
+
+
class GroupCompressVersionedFiles(VersionedFiles):
"""A group-compress based VersionedFiles implementation."""
@@ -1137,26 +1247,42 @@
missing.difference_update(set(new_result))
return result, source_results
- def _get_block(self, index_memo):
- read_memo = index_memo[0:3]
- # get the group:
- try:
- block = self._group_cache[read_memo]
- except KeyError:
- # read the group
- zdata = self._access.get_raw_records([read_memo]).next()
- # decompress - whole thing - this is not a bug, as it
- # permits caching. We might want to store the partially
- # decompresed group and decompress object, so that recent
- # texts are not penalised by big groups.
- block = GroupCompressBlock.from_bytes(zdata)
- self._group_cache[read_memo] = block
- # cheapo debugging:
- # print len(zdata), len(plain)
- # parse - requires split_lines, better to have byte offsets
- # here (but not by much - we only split the region for the
- # recipe, and we often want to end up with lines anyway.
- return block
+ def _get_blocks(self, read_memos):
+ """Get GroupCompressBlocks for the given read_memos.
+
+ :returns: a series of (read_memo, block) pairs, in the order they were
+ originally passed.
+ """
+ cached = {}
+ for read_memo in read_memos:
+ try:
+ block = self._group_cache[read_memo]
+ except KeyError:
+ pass
+ else:
+ cached[read_memo] = block
+ not_cached = []
+ not_cached_seen = set()
+ for read_memo in read_memos:
+ if read_memo in cached:
+ # Don't fetch what we already have
+ continue
+ if read_memo in not_cached_seen:
+ # Don't try to fetch the same data twice
+ continue
+ not_cached.append(read_memo)
+ not_cached_seen.add(read_memo)
+ raw_records = self._access.get_raw_records(not_cached)
+ for read_memo in read_memos:
+ try:
+ yield read_memo, cached[read_memo]
+ except KeyError:
+ # Read the block, and cache it.
+ zdata = raw_records.next()
+ block = GroupCompressBlock.from_bytes(zdata)
+ self._group_cache[read_memo] = block
+ cached[read_memo] = block
+ yield read_memo, block
def get_missing_compression_parent_keys(self):
"""Return the keys of missing compression parents.
@@ -1328,55 +1454,35 @@
unadded_keys, source_result)
for key in missing:
yield AbsentContentFactory(key)
- manager = None
- last_read_memo = None
- # TODO: This works fairly well at batching up existing groups into a
- # streamable format, and possibly allowing for taking one big
- # group and splitting it when it isn't fully utilized.
- # However, it doesn't allow us to find under-utilized groups and
- # combine them into a bigger group on the fly.
- # (Consider the issue with how chk_map inserts texts
- # one-at-a-time.) This could be done at insert_record_stream()
- # time, but it probably would decrease the number of
- # bytes-on-the-wire for fetch.
+ # Batch up as many keys as we can until either:
+ # - we encounter an unadded ref, or
+ # - we run out of keys, or
+ # - the total bytes to retrieve for this batch > BATCH_SIZE
+ batcher = _BatchingBlockFetcher(self, locations)
for source, keys in source_keys:
if source is self:
for key in keys:
if key in self._unadded_refs:
- if manager is not None:
- for factory in manager.get_record_stream():
- yield factory
- last_read_memo = manager = None
+ # Flush batch, then yield unadded ref from
+ # self._compressor.
+ for factory in batcher.yield_factories(full_flush=True):
+ yield factory
bytes, sha1 = self._compressor.extract(key)
parents = self._unadded_refs[key]
yield FulltextContentFactory(key, parents, sha1, bytes)
- else:
- index_memo, _, parents, (method, _) = locations[key]
- read_memo = index_memo[0:3]
- if last_read_memo != read_memo:
- # We are starting a new block. If we have a
- # manager, we have found everything that fits for
- # now, so yield records
- if manager is not None:
- for factory in manager.get_record_stream():
- yield factory
- # Now start a new manager
- block = self._get_block(index_memo)
- manager = _LazyGroupContentManager(block)
- last_read_memo = read_memo
- start, end = index_memo[3:5]
- manager.add_factory(key, parents, start, end)
+ continue
+ if batcher.add_key(key) > BATCH_SIZE:
+ # Ok, this batch is big enough. Yield some results.
+ for factory in batcher.yield_factories():
+ yield factory
else:
- if manager is not None:
- for factory in manager.get_record_stream():
- yield factory
- last_read_memo = manager = None
+ for factory in batcher.yield_factories(full_flush=True):
+ yield factory
for record in source.get_record_stream(keys, ordering,
include_delta_closure):
yield record
- if manager is not None:
- for factory in manager.get_record_stream():
- yield factory
+ for factory in batcher.yield_factories(full_flush=True):
+ yield factory
def get_sha1s(self, keys):
"""See VersionedFiles.get_sha1s()."""
More information about the bazaar-commits
mailing list