Rev 3812: Change _iter_nodes into a generator. in http://bzr.arbash-meinel.com/branches/bzr/brisbane/remap

John Arbash Meinel john at arbash-meinel.com
Wed Jan 7 18:06:13 GMT 2009


At http://bzr.arbash-meinel.com/branches/bzr/brisbane/remap

------------------------------------------------------------
revno: 3812
revision-id: john at arbash-meinel.com-20090107180553-5sr2e5lfxcan84u5
parent: john at arbash-meinel.com-20090107175432-hw8ozprv36irblz0
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: remap
timestamp: Wed 2009-01-07 12:05:53 -0600
message:
  Change _iter_nodes into a generator.
  
  This dramatically simplifies _check_remap, because all of the code to
  handle paging in new nodes is already present.
  
  All we needed to add was the ability to 'batch' requests for the
  get_record_stream(), instead of reading in everything at once.
-------------- next part --------------
=== modified file 'bzrlib/chk_map.py'
--- a/bzrlib/chk_map.py	2009-01-07 17:52:52 +0000
+++ b/bzrlib/chk_map.py	2009-01-07 18:05:53 +0000
@@ -688,22 +688,25 @@
             for item in node.iteritems(store, key_filter=key_filter):
                 yield item
 
-    def _iter_nodes(self, store, key_filter=None):
+    def _iter_nodes(self, store, key_filter=None, batch_size=None):
         """Iterate over node objects which match key_filter.
 
         :param store: A store to use for accessing content.
         :param key_filter: A key filter to filter nodes. Only nodes that might
             contain a key in key_filter will be returned.
-        :return: An iterable of nodes.
+        :param batch_size: If not None, then we will return the nodes that had
+            to be read using get_record_stream in batches, rather than reading
+            them all at once.
+        :return: An iterable of nodes. This function does not have to be fully
+            consumed.  (There will be no pending I/O when items are being returned.)
         """
-        nodes = []
         keys = {}
         if key_filter is None:
             for prefix, node in self._items.iteritems():
                 if type(node) == tuple:
                     keys[node] = prefix
                 else:
-                    nodes.append(node)
+                    yield node
         else:
             # XXX defaultdict ?
             length_filters = {}
@@ -719,7 +722,7 @@
                         if type(node) == tuple:
                             keys[node] = prefix
                         else:
-                            nodes.append(node)
+                            yield node
                         break
         if keys:
             # Look in the page cache for some more bytes
@@ -731,21 +734,31 @@
                     continue
                 else:
                     node = _deserialise(bytes, key)
-                    nodes.append(node)
                     self._items[keys[key]] = node
                     found_keys.add(key)
+                    yield node
             for key in found_keys:
                 del keys[key]
         if keys:
             # demand load some pages.
-            stream = store.get_record_stream(keys, 'unordered', True)
-            for record in stream:
-                bytes = record.get_bytes_as('fulltext')
-                node = _deserialise(bytes, record.key)
-                nodes.append(node)
-                self._items[keys[record.key]] = node
-                _page_cache.add(record.key, bytes)
-        return nodes
+            if batch_size is None:
+                # Read all the keys in
+                batch_size = len(keys)
+            key_order = list(keys)
+            for batch_start in range(0, len(key_order), batch_size):
+                batch = key_order[batch_start:batch_start + batch_size]
+                # We have to fully consume the stream so there is no pending
+                # I/O, so we buffer the nodes for now.
+                stream = store.get_record_stream(batch, 'unordered', True)
+                nodes = []
+                for record in stream:
+                    bytes = record.get_bytes_as('fulltext')
+                    node = _deserialise(bytes, record.key)
+                    nodes.append(node)
+                    self._items[keys[record.key]] = node
+                    _page_cache.add(record.key, bytes)
+                for node in nodes:
+                    yield node
 
     def map(self, store, key, value):
         """Map key to value."""
@@ -764,7 +777,7 @@
             new_parent.add_node(self._prefix[:len(new_prefix)+1], self)
             assert new_parent._node_width == len(new_parent._prefix) + 1
             return new_parent.map(store, key, value)
-        children = self._iter_nodes(store, key_filter=[key])
+        children = list(self._iter_nodes(store, key_filter=[key]))
         if children:
             child = children[0]
             # if isinstance(child, InternalNode):
@@ -911,7 +924,7 @@
         """Remove key from this node and it's children."""
         if not len(self._items):
             raise AssertionError("cant unmap in an empty InternalNode.")
-        children = self._iter_nodes(store, key_filter=[key])
+        children = list(self._iter_nodes(store, key_filter=[key]))
         if children:
             child = children[0]
         else:
@@ -965,96 +978,20 @@
         new_leaf = LeafNode()
         new_leaf.set_maximum_size(self._maximum_size)
         new_leaf._key_width = self._key_width
-        keys = {}
-        # There is some overlap with _iter_nodes here, but not a lot, and it
-        # allows us to do quick evaluation without paging everything in
-        for prefix, node in self._items.iteritems():
-            if type(node) == tuple:
-                keys[node] = prefix
-            else:
-                if isinstance(node, InternalNode):
-                    # Without looking at any leaf nodes, we are sure
-                    def child_is_internal_node(): pass
-                    child_is_internal_node()
+        # A batch_size of 16 was chosen because:
+        #   a) In testing, a 4k page held 14 times. So if we have more than 16
+        #      leaf nodes we are unlikely to hold them in a single new leaf
+        #      node. This still allows for 1 round trip
+        #   b) With 16-way fan out, we can still do a single round trip
+        #   c) With 255-way fan out, we don't want to read all 255 and destroy
+        #      the page cache, just to determine that we really don't need it.
+        for node in self._iter_nodes(store, batch_size=16):
+            if isinstance(node, InternalNode):
+                # Without looking at any leaf nodes, we are sure
+                return self
+            for key, value in node._items.iteritems():
+                if new_leaf._map_no_split(key, value):
                     return self
-                for key, value in node._items.iteritems():
-                    if new_leaf._map_no_split(key, value):
-                        # Adding this key would cause a split, so we know we
-                        # don't need to collapse
-                        def child_causes_split(): pass
-                        child_causes_split()
-                        return self
-        if keys:
-            # Look in the page cache for some more bytes
-            found_keys = set()
-            for chk, prefix in keys.iteritems():
-                try:
-                    bytes = _page_cache[chk]
-                except KeyError:
-                    continue
-                else:
-                    found_keys.add(chk)
-                    node = _deserialise(bytes, chk)
-                    self._items[prefix] = node
-                    if isinstance(node, InternalNode):
-                        # We have done enough to know that we can stop
-                        def page_is_internal(): pass
-                        page_is_internal()
-                        return self
-                    for key, value in node._items.iteritems():
-                        if new_leaf._map_no_split(key, value):
-                            def page_causes_split(): pass
-                            page_causes_split()
-                            return self
-            for chk in found_keys:
-                del keys[chk]
-        # So far, everything fits. Page in the rest of the nodes, and see if it
-        # holds true.
-        if keys:
-            # We loop over a limited number of child nodes at a time. 16 is
-            # arbitrary, but the basic ideas are:
-            #   a) In a 4k page, we generally can only fit 14 items before we
-            #      are too full. We would like to fill up in a single request
-            #      if we have it.
-            #   b) If we have 16-way fan out, we can still read everything in
-            #      one go
-            #   c) But if we have 255-way fan out, we aren't reading all 255
-            #      nodes and overflowing our page cache just to find out we are
-            #      full after the first 10 nodes.
-            #   d) Different page sizes and fan out will have different
-            #      results, but our fan-out is likely to be a near multiple of
-            #      16 anyway.
-            batch_size = 16
-            key_order = list(keys)
-            for batch_start in range(0, len(key_order), batch_size):
-                batch = key_order[batch_start:batch_start + batch_size]
-                stream = store.get_record_stream(batch, 'unordered', True)
-                nodes = []
-                # Fully consume the stream, even if we could determine that we
-                # don't need to continue. We requested the bytes, we may as well
-                # use them
-                for record in stream:
-                    bytes = record.get_bytes_as('fulltext')
-                    node = _deserialise(bytes, record.key)
-                    self._items[keys[record.key]] = node
-                    _page_cache[record.key] = bytes
-                    nodes.append(node)
-                for node in nodes:
-                    if isinstance(node, InternalNode):
-                        # We know we won't fit
-                        def stream_is_internal(): pass
-                        stream_is_internal()
-                        return self
-                    for key, value in node._items.iteritems():
-                        if new_leaf._map_no_split(key, value):
-                            def stream_causes_split(): pass
-                            stream_causes_split()
-                            return self
-
-        # We have gone to every child, and everything fits in a single leaf
-        # node, we no longer need this internal node
-        def check_remap_collapsed(): pass
-        check_remap_collapsed()
         return new_leaf
 
 



More information about the bazaar-commits mailing list