Rev 4657: (andrew) Fix #402657: 2a fetch over dumb transport reads one group at in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Thu Aug 27 03:27:23 BST 2009


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 4657 [merge]
revision-id: pqm at pqm.ubuntu.com-20090827022719-bl2yoqhpj3fcfczu
parent: pqm at pqm.ubuntu.com-20090827013447-ndjtt02ad7nfdoiy
parent: andrew.bennetts at canonical.com-20090827004122-88bn53xrv8mdh598
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Thu 2009-08-27 03:27:19 +0100
message:
  (andrew) Fix #402657: 2a fetch over dumb transport reads one group at
  	a time.
modified:
  bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2009-08-19 16:23:39 +0000
+++ b/bzrlib/groupcompress.py	2009-08-26 05:41:58 +0000
@@ -33,7 +33,6 @@
     pack,
     trace,
     )
-from bzrlib.graph import Graph
 from bzrlib.btree_index import BTreeBuilder
 from bzrlib.lru_cache import LRUSizeCache
 from bzrlib.tsort import topo_sort
@@ -45,12 +44,15 @@
     VersionedFiles,
     )
 
+# Minimum number of uncompressed bytes to try fetch at once when retrieving
+# groupcompress blocks.
+BATCH_SIZE = 2**16
+
 _USE_LZMA = False and (pylzma is not None)
 
 # osutils.sha_string('')
 _null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
 
-
 def sort_gc_optimal(parent_map):
     """Sort and group the keys in parent_map into groupcompress order.
 
@@ -974,6 +976,114 @@
     versioned_files.stream.close()
 
 
+class _BatchingBlockFetcher(object):
+    """Fetch group compress blocks in batches.
+    
+    :ivar total_bytes: int of expected number of bytes needed to fetch the
+        currently pending batch.
+    """
+
+    def __init__(self, gcvf, locations):
+        self.gcvf = gcvf
+        self.locations = locations
+        self.keys = []
+        self.batch_memos = {}
+        self.memos_to_get = []
+        self.total_bytes = 0
+        self.last_read_memo = None
+        self.manager = None
+
+    def add_key(self, key):
+        """Add another to key to fetch.
+        
+        :return: The estimated number of bytes needed to fetch the batch so
+            far.
+        """
+        self.keys.append(key)
+        index_memo, _, _, _ = self.locations[key]
+        read_memo = index_memo[0:3]
+        # Three possibilities for this read_memo:
+        #  - it's already part of this batch; or
+        #  - it's not yet part of this batch, but is already cached; or
+        #  - it's not yet part of this batch and will need to be fetched.
+        if read_memo in self.batch_memos:
+            # This read memo is already in this batch.
+            return self.total_bytes
+        try:
+            cached_block = self.gcvf._group_cache[read_memo]
+        except KeyError:
+            # This read memo is new to this batch, and the data isn't cached
+            # either.
+            self.batch_memos[read_memo] = None
+            self.memos_to_get.append(read_memo)
+            byte_length = read_memo[2]
+            self.total_bytes += byte_length
+        else:
+            # This read memo is new to this batch, but cached.
+            # Keep a reference to the cached block in batch_memos because it's
+            # certain that we'll use it when this batch is processed, but
+            # there's a risk that it would fall out of _group_cache between now
+            # and then.
+            self.batch_memos[read_memo] = cached_block
+        return self.total_bytes
+        
+    def _flush_manager(self):
+        if self.manager is not None:
+            for factory in self.manager.get_record_stream():
+                yield factory
+            self.manager = None
+            self.last_read_memo = None
+
+    def yield_factories(self, full_flush=False):
+        """Yield factories for keys added since the last yield.  They will be
+        returned in the order they were added via add_key.
+        
+        :param full_flush: by default, some results may not be returned in case
+            they can be part of the next batch.  If full_flush is True, then
+            all results are returned.
+        """
+        if self.manager is None and not self.keys:
+            return
+        # Fetch all memos in this batch.
+        blocks = self.gcvf._get_blocks(self.memos_to_get)
+        # Turn blocks into factories and yield them.
+        memos_to_get_stack = list(self.memos_to_get)
+        memos_to_get_stack.reverse()
+        for key in self.keys:
+            index_memo, _, parents, _ = self.locations[key]
+            read_memo = index_memo[:3]
+            if self.last_read_memo != read_memo:
+                # We are starting a new block. If we have a
+                # manager, we have found everything that fits for
+                # now, so yield records
+                for factory in self._flush_manager():
+                    yield factory
+                # Now start a new manager.
+                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
+                    # The next block from _get_blocks will be the block we
+                    # need.
+                    block_read_memo, block = blocks.next()
+                    if block_read_memo != read_memo:
+                        raise AssertionError(
+                            "block_read_memo out of sync with read_memo"
+                            "(%r != %r)" % (block_read_memo, read_memo))
+                    self.batch_memos[read_memo] = block
+                    memos_to_get_stack.pop()
+                else:
+                    block = self.batch_memos[read_memo]
+                self.manager = _LazyGroupContentManager(block)
+                self.last_read_memo = read_memo
+            start, end = index_memo[3:5]
+            self.manager.add_factory(key, parents, start, end)
+        if full_flush:
+            for factory in self._flush_manager():
+                yield factory
+        del self.keys[:]
+        self.batch_memos.clear()
+        del self.memos_to_get[:]
+        self.total_bytes = 0
+
+
 class GroupCompressVersionedFiles(VersionedFiles):
     """A group-compress based VersionedFiles implementation."""
 
@@ -1137,26 +1247,42 @@
             missing.difference_update(set(new_result))
         return result, source_results
 
-    def _get_block(self, index_memo):
-        read_memo = index_memo[0:3]
-        # get the group:
-        try:
-            block = self._group_cache[read_memo]
-        except KeyError:
-            # read the group
-            zdata = self._access.get_raw_records([read_memo]).next()
-            # decompress - whole thing - this is not a bug, as it
-            # permits caching. We might want to store the partially
-            # decompresed group and decompress object, so that recent
-            # texts are not penalised by big groups.
-            block = GroupCompressBlock.from_bytes(zdata)
-            self._group_cache[read_memo] = block
-        # cheapo debugging:
-        # print len(zdata), len(plain)
-        # parse - requires split_lines, better to have byte offsets
-        # here (but not by much - we only split the region for the
-        # recipe, and we often want to end up with lines anyway.
-        return block
+    def _get_blocks(self, read_memos):
+        """Get GroupCompressBlocks for the given read_memos.
+
+        :returns: a series of (read_memo, block) pairs, in the order they were
+            originally passed.
+        """
+        cached = {}
+        for read_memo in read_memos:
+            try:
+                block = self._group_cache[read_memo]
+            except KeyError:
+                pass
+            else:
+                cached[read_memo] = block
+        not_cached = []
+        not_cached_seen = set()
+        for read_memo in read_memos:
+            if read_memo in cached:
+                # Don't fetch what we already have
+                continue
+            if read_memo in not_cached_seen:
+                # Don't try to fetch the same data twice
+                continue
+            not_cached.append(read_memo)
+            not_cached_seen.add(read_memo)
+        raw_records = self._access.get_raw_records(not_cached)
+        for read_memo in read_memos:
+            try:
+                yield read_memo, cached[read_memo]
+            except KeyError:
+                # Read the block, and cache it.
+                zdata = raw_records.next()
+                block = GroupCompressBlock.from_bytes(zdata)
+                self._group_cache[read_memo] = block
+                cached[read_memo] = block
+                yield read_memo, block
 
     def get_missing_compression_parent_keys(self):
         """Return the keys of missing compression parents.
@@ -1328,55 +1454,35 @@
                 unadded_keys, source_result)
         for key in missing:
             yield AbsentContentFactory(key)
-        manager = None
-        last_read_memo = None
-        # TODO: This works fairly well at batching up existing groups into a
-        #       streamable format, and possibly allowing for taking one big
-        #       group and splitting it when it isn't fully utilized.
-        #       However, it doesn't allow us to find under-utilized groups and
-        #       combine them into a bigger group on the fly.
-        #       (Consider the issue with how chk_map inserts texts
-        #       one-at-a-time.) This could be done at insert_record_stream()
-        #       time, but it probably would decrease the number of
-        #       bytes-on-the-wire for fetch.
+        # Batch up as many keys as we can until either:
+        #  - we encounter an unadded ref, or
+        #  - we run out of keys, or
+        #  - the total bytes to retrieve for this batch > BATCH_SIZE
+        batcher = _BatchingBlockFetcher(self, locations)
         for source, keys in source_keys:
             if source is self:
                 for key in keys:
                     if key in self._unadded_refs:
-                        if manager is not None:
-                            for factory in manager.get_record_stream():
-                                yield factory
-                            last_read_memo = manager = None
+                        # Flush batch, then yield unadded ref from
+                        # self._compressor.
+                        for factory in batcher.yield_factories(full_flush=True):
+                            yield factory
                         bytes, sha1 = self._compressor.extract(key)
                         parents = self._unadded_refs[key]
                         yield FulltextContentFactory(key, parents, sha1, bytes)
-                    else:
-                        index_memo, _, parents, (method, _) = locations[key]
-                        read_memo = index_memo[0:3]
-                        if last_read_memo != read_memo:
-                            # We are starting a new block. If we have a
-                            # manager, we have found everything that fits for
-                            # now, so yield records
-                            if manager is not None:
-                                for factory in manager.get_record_stream():
-                                    yield factory
-                            # Now start a new manager
-                            block = self._get_block(index_memo)
-                            manager = _LazyGroupContentManager(block)
-                            last_read_memo = read_memo
-                        start, end = index_memo[3:5]
-                        manager.add_factory(key, parents, start, end)
+                        continue
+                    if batcher.add_key(key) > BATCH_SIZE:
+                        # Ok, this batch is big enough.  Yield some results.
+                        for factory in batcher.yield_factories():
+                            yield factory
             else:
-                if manager is not None:
-                    for factory in manager.get_record_stream():
-                        yield factory
-                    last_read_memo = manager = None
+                for factory in batcher.yield_factories(full_flush=True):
+                    yield factory
                 for record in source.get_record_stream(keys, ordering,
                                                        include_delta_closure):
                     yield record
-        if manager is not None:
-            for factory in manager.get_record_stream():
-                yield factory
+        for factory in batcher.yield_factories(full_flush=True):
+            yield factory
 
     def get_sha1s(self, keys):
         """See VersionedFiles.get_sha1s()."""




More information about the bazaar-commits mailing list