Rev 3812: Change _iter_nodes into a generator. in http://bzr.arbash-meinel.com/branches/bzr/brisbane/remap
John Arbash Meinel
john at arbash-meinel.com
Wed Jan 7 18:06:13 GMT 2009
At http://bzr.arbash-meinel.com/branches/bzr/brisbane/remap
------------------------------------------------------------
revno: 3812
revision-id: john at arbash-meinel.com-20090107180553-5sr2e5lfxcan84u5
parent: john at arbash-meinel.com-20090107175432-hw8ozprv36irblz0
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: remap
timestamp: Wed 2009-01-07 12:05:53 -0600
message:
Change _iter_nodes into a generator.
This dramatically simplifies _check_remap, because all of the code to
handle paging in new nodes is already present.
All we needed to add was the ability to 'batch' requests for the
get_record_stream(), instead of reading in everything at once.
-------------- next part --------------
=== modified file 'bzrlib/chk_map.py'
--- a/bzrlib/chk_map.py 2009-01-07 17:52:52 +0000
+++ b/bzrlib/chk_map.py 2009-01-07 18:05:53 +0000
@@ -688,22 +688,25 @@
for item in node.iteritems(store, key_filter=key_filter):
yield item
- def _iter_nodes(self, store, key_filter=None):
+ def _iter_nodes(self, store, key_filter=None, batch_size=None):
"""Iterate over node objects which match key_filter.
:param store: A store to use for accessing content.
:param key_filter: A key filter to filter nodes. Only nodes that might
contain a key in key_filter will be returned.
- :return: An iterable of nodes.
+ :param batch_size: If not None, then we will return the nodes that had
+ to be read using get_record_stream in batches, rather than reading
+ them all at once.
+ :return: An iterable of nodes. This function does not have to be fully
+ consumed. (There will be no pending I/O when items are being returned.)
"""
- nodes = []
keys = {}
if key_filter is None:
for prefix, node in self._items.iteritems():
if type(node) == tuple:
keys[node] = prefix
else:
- nodes.append(node)
+ yield node
else:
# XXX defaultdict ?
length_filters = {}
@@ -719,7 +722,7 @@
if type(node) == tuple:
keys[node] = prefix
else:
- nodes.append(node)
+ yield node
break
if keys:
# Look in the page cache for some more bytes
@@ -731,21 +734,31 @@
continue
else:
node = _deserialise(bytes, key)
- nodes.append(node)
self._items[keys[key]] = node
found_keys.add(key)
+ yield node
for key in found_keys:
del keys[key]
if keys:
# demand load some pages.
- stream = store.get_record_stream(keys, 'unordered', True)
- for record in stream:
- bytes = record.get_bytes_as('fulltext')
- node = _deserialise(bytes, record.key)
- nodes.append(node)
- self._items[keys[record.key]] = node
- _page_cache.add(record.key, bytes)
- return nodes
+ if batch_size is None:
+ # Read all the keys in
+ batch_size = len(keys)
+ key_order = list(keys)
+ for batch_start in range(0, len(key_order), batch_size):
+ batch = key_order[batch_start:batch_start + batch_size]
+ # We have to fully consume the stream so there is no pending
+ # I/O, so we buffer the nodes for now.
+ stream = store.get_record_stream(batch, 'unordered', True)
+ nodes = []
+ for record in stream:
+ bytes = record.get_bytes_as('fulltext')
+ node = _deserialise(bytes, record.key)
+ nodes.append(node)
+ self._items[keys[record.key]] = node
+ _page_cache.add(record.key, bytes)
+ for node in nodes:
+ yield node
def map(self, store, key, value):
"""Map key to value."""
@@ -764,7 +777,7 @@
new_parent.add_node(self._prefix[:len(new_prefix)+1], self)
assert new_parent._node_width == len(new_parent._prefix) + 1
return new_parent.map(store, key, value)
- children = self._iter_nodes(store, key_filter=[key])
+ children = list(self._iter_nodes(store, key_filter=[key]))
if children:
child = children[0]
# if isinstance(child, InternalNode):
@@ -911,7 +924,7 @@
"""Remove key from this node and it's children."""
if not len(self._items):
raise AssertionError("cant unmap in an empty InternalNode.")
- children = self._iter_nodes(store, key_filter=[key])
+ children = list(self._iter_nodes(store, key_filter=[key]))
if children:
child = children[0]
else:
@@ -965,96 +978,20 @@
new_leaf = LeafNode()
new_leaf.set_maximum_size(self._maximum_size)
new_leaf._key_width = self._key_width
- keys = {}
- # There is some overlap with _iter_nodes here, but not a lot, and it
- # allows us to do quick evaluation without paging everything in
- for prefix, node in self._items.iteritems():
- if type(node) == tuple:
- keys[node] = prefix
- else:
- if isinstance(node, InternalNode):
- # Without looking at any leaf nodes, we are sure
- def child_is_internal_node(): pass
- child_is_internal_node()
+ # A batch_size of 16 was chosen because:
+ # a) In testing, a 4k page held 14 times. So if we have more than 16
+ # leaf nodes we are unlikely to hold them in a single new leaf
+ # node. This still allows for 1 round trip
+ # b) With 16-way fan out, we can still do a single round trip
+ # c) With 255-way fan out, we don't want to read all 255 and destroy
+ # the page cache, just to determine that we really don't need it.
+ for node in self._iter_nodes(store, batch_size=16):
+ if isinstance(node, InternalNode):
+ # Without looking at any leaf nodes, we are sure
+ return self
+ for key, value in node._items.iteritems():
+ if new_leaf._map_no_split(key, value):
return self
- for key, value in node._items.iteritems():
- if new_leaf._map_no_split(key, value):
- # Adding this key would cause a split, so we know we
- # don't need to collapse
- def child_causes_split(): pass
- child_causes_split()
- return self
- if keys:
- # Look in the page cache for some more bytes
- found_keys = set()
- for chk, prefix in keys.iteritems():
- try:
- bytes = _page_cache[chk]
- except KeyError:
- continue
- else:
- found_keys.add(chk)
- node = _deserialise(bytes, chk)
- self._items[prefix] = node
- if isinstance(node, InternalNode):
- # We have done enough to know that we can stop
- def page_is_internal(): pass
- page_is_internal()
- return self
- for key, value in node._items.iteritems():
- if new_leaf._map_no_split(key, value):
- def page_causes_split(): pass
- page_causes_split()
- return self
- for chk in found_keys:
- del keys[chk]
- # So far, everything fits. Page in the rest of the nodes, and see if it
- # holds true.
- if keys:
- # We loop over a limited number of child nodes at a time. 16 is
- # arbitrary, but the basic ideas are:
- # a) In a 4k page, we generally can only fit 14 items before we
- # are too full. We would like to fill up in a single request
- # if we have it.
- # b) If we have 16-way fan out, we can still read everything in
- # one go
- # c) But if we have 255-way fan out, we aren't reading all 255
- # nodes and overflowing our page cache just to find out we are
- # full after the first 10 nodes.
- # d) Different page sizes and fan out will have different
- # results, but our fan-out is likely to be a near multiple of
- # 16 anyway.
- batch_size = 16
- key_order = list(keys)
- for batch_start in range(0, len(key_order), batch_size):
- batch = key_order[batch_start:batch_start + batch_size]
- stream = store.get_record_stream(batch, 'unordered', True)
- nodes = []
- # Fully consume the stream, even if we could determine that we
- # don't need to continue. We requested the bytes, we may as well
- # use them
- for record in stream:
- bytes = record.get_bytes_as('fulltext')
- node = _deserialise(bytes, record.key)
- self._items[keys[record.key]] = node
- _page_cache[record.key] = bytes
- nodes.append(node)
- for node in nodes:
- if isinstance(node, InternalNode):
- # We know we won't fit
- def stream_is_internal(): pass
- stream_is_internal()
- return self
- for key, value in node._items.iteritems():
- if new_leaf._map_no_split(key, value):
- def stream_causes_split(): pass
- stream_causes_split()
- return self
-
- # We have gone to every child, and everything fits in a single leaf
- # node, we no longer need this internal node
- def check_remap_collapsed(): pass
- check_remap_collapsed()
return new_leaf
More information about the bazaar-commits
mailing list