Rev 4781: Checkpoint.... I think I've cut off another 50MB or so peak mem. in http://bazaar.launchpad.net/~jameinel/bzr/2.1-static-tuple-chk-map
John Arbash Meinel
john at arbash-meinel.com
Fri Oct 23 16:24:56 BST 2009
At http://bazaar.launchpad.net/~jameinel/bzr/2.1-static-tuple-chk-map
------------------------------------------------------------
revno: 4781
revision-id: john at arbash-meinel.com-20091023152444-pomqe6xmbyaowpom
parent: john at arbash-meinel.com-20091021212719-05zh4t7oo5kaird3
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.1-static-tuple-chk-map
timestamp: Fri 2009-10-23 10:24:44 -0500
message:
Checkpoint.... I think I've cut off another 50MB or so peak mem.
But I've got a bunch of cruft in this patch. So I'm checkpointing so that
I can get back to this point, and then reverting a bunch of the changes.
-------------- next part --------------
=== modified file 'bzrlib/chk_map.py'
--- a/bzrlib/chk_map.py 2009-10-21 21:19:41 +0000
+++ b/bzrlib/chk_map.py 2009-10-23 15:24:44 +0000
@@ -1460,6 +1460,7 @@
# that just meanst that we will have *both* a StaticTuple and a
# tuple() in memory, referring to the same object. (so a net
# increase in memory, not a decrease.)
+ trace.debug_memory('starting a new CHKMapDifference')
self._store = store
self._new_root_keys = new_root_keys
self._old_root_keys = old_root_keys
@@ -1489,6 +1490,7 @@
# this code. (We may want to evaluate saving the raw bytes into the
# page cache, which would allow a working tree update after the fetch
# to not have to read the bytes again.)
+ as_st = StaticTuple.from_sequence
stream = self._store.get_record_stream(keys, 'unordered', True)
for record in stream:
if self._pb is not None:
@@ -1501,11 +1503,15 @@
if type(node) is InternalNode:
# Note we don't have to do node.refs() because we know that
# there are no children that have been pushed into this node
- prefix_refs = node._items.items()
+ # TODO: We might want to intern these StaticTuple instances, we
+ # need to check how much redundancy there is
+ prefix_refs = [as_st(item) for item in node._items.iteritems()]
items = []
else:
prefix_refs = []
- items = node._items.items()
+ # TODO: We might want to intern these StaticTuple instances, we
+ # need to check how much redundancy there is
+ items = [as_st(item) for item in node._items.iteritems()]
yield record, node, prefix_refs, items
def _read_old_roots(self):
@@ -1599,6 +1605,8 @@
# First pass, flush all interesting items and convert to using direct refs
all_old_chks = self._all_old_chks
processed_new_refs = self._processed_new_refs
+ # We will be processing these in this pass
+ processed_new_refs.update(refs)
all_old_items = self._all_old_items
new_items = [item for item in self._new_item_queue
if item not in all_old_items]
@@ -1606,19 +1614,35 @@
if new_items:
yield None, new_items
refs = refs.difference(all_old_chks)
+ trace.debug_memory('sent %d new_items' % (len(new_items),))
while refs:
next_refs = set()
next_refs_update = next_refs.update
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
# from 1m54s to 1m51s. Consider it.
+ idx = 0
for record, _, p_refs, items in self._read_nodes_from_store(refs):
+ idx += 1
+ if idx & 0x7fff == 0:
+ import sys
+ from meliae import scanner
+ #scanner.dump_all_objects('memory_dump-%d.json' % (idx,))
+ # Generally *everything* is reachable from _getframe
+ count, size = scanner.get_recursive_size(sys._getframe())
+ trace.debug_memory('read %d records %d objects %.1fMB'
+ % (idx, count, size/ 1024. / 1024))
items = [item for item in items
if item not in all_old_items]
yield record, items
next_refs_update([p_r[1] for p_r in p_refs])
+ trace.debug_memory('processed %d refs, shrinking %d'
+ % (len(refs), len(next_refs)))
next_refs = next_refs.difference(all_old_chks)
next_refs = next_refs.difference(processed_new_refs)
processed_new_refs.update(next_refs)
+ trace.debug_memory('processed %d refs (%d total), working on %d'
+ % (len(refs), len(processed_new_refs),
+ len(next_refs)))
refs = next_refs
def _process_next_old(self):
@@ -1636,13 +1660,19 @@
def _process_queues(self):
while self._old_queue:
self._process_next_old()
+ trace.debug_memory('processed next old')
+ trace.debug_memory('preparing to flush new queue %d'
+ % (len(self._new_queue),))
return self._flush_new_queue()
def process(self):
for record in self._read_all_roots():
yield record, []
+ trace.debug_memory('after reading all roots (%d old to proc)'
+ % (len(self._old_queue)))
for record, items in self._process_queues():
yield record, items
+ trace.debug_memory('processed queues')
def iter_interesting_nodes(store, interesting_root_keys,
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py 2009-10-19 15:45:10 +0000
+++ b/bzrlib/groupcompress.py 2009-10-23 15:24:44 +0000
@@ -1832,6 +1832,9 @@
self.has_graph = parents
self._is_locked = is_locked
self._inconsistency_fatal = inconsistency_fatal
+ # GroupCompress records tend to have the same 'group' start + offset
+ # repeated over and over, this creates a surplus of ints
+ self._int_cache = {}
if track_external_parent_refs:
self._key_dependencies = knit._KeyRefs(
track_new_keys=track_new_keys)
@@ -2013,11 +2016,17 @@
"""Convert an index value to position details."""
bits = node[2].split(' ')
# It would be nice not to read the entire gzip.
+ # Note, start and stop would be good objects to 'intern', they are
+ # likely to be duplicated many times in a given index
start = int(bits[0])
+ start = self._int_cache.setdefault(start, start)
stop = int(bits[1])
+ stop = self._int_cache.setdefault(stop, stop)
basis_end = int(bits[2])
delta_end = int(bits[3])
- return node[0], start, stop, basis_end, delta_end
+ # We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
+ # instance...
+ return (node[0], start, stop, basis_end, delta_end)
def scan_unvalidated_index(self, graph_index):
"""Inform this _GCGraphIndex that there is an unvalidated index.
=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py 2009-10-21 21:19:41 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py 2009-10-23 15:24:44 +0000
@@ -598,6 +598,7 @@
# - and any present parent inventories have their chk root
# entries too.
# And all this should be independent of any fallback repository.
+ trace.debug_memory('starting _check_new_inventories')
problems = []
key_deps = self.repo.revisions._index._key_dependencies
new_revisions_keys = key_deps.get_new_keys()
@@ -639,6 +640,7 @@
% (sorted(missing_chk_roots),))
# Don't bother checking any further.
return problems
+ trace.debug_memory('after looking for missing_chk_roots')
# Find all interesting chk_bytes records, and make sure they are
# present, as well as the text keys they reference.
chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
@@ -669,6 +671,7 @@
if missing_text_keys:
problems.append("missing text keys: %r"
% (sorted(missing_text_keys),))
+ trace.debug_memory('after checking for missing text keys')
return problems
def _execute_pack_operations(self, pack_operations,
@@ -1108,10 +1111,14 @@
for stream_info in self._fetch_revision_texts(revision_ids):
yield stream_info
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
+ trace.debug_memory('Before clearing revisions cache')
self.from_repository.revisions.clear_cache()
self.from_repository.signatures.clear_cache()
+ trace.debug_memory('after clearing revisions cache')
yield self._get_inventory_stream(self._revision_keys)
+ trace.debug_memory('before clearing inventories cache')
self.from_repository.inventories.clear_cache()
+ trace.debug_memory('after clearing inventories cache')
# TODO: The keys to exclude might be part of the search recipe
# For now, exclude all parents that are at the edge of ancestry, for
# which we have inventories
@@ -1119,10 +1126,15 @@
parent_keys = from_repo._find_parent_keys_of_revisions(
self._revision_keys)
for stream_info in self._get_filtered_chk_streams(parent_keys):
+ trace.debug_memory('during _get_filtered_chk_streams')
yield stream_info
+ trace.debug_memory('before clearing chk_bytes')
self.from_repository.chk_bytes.clear_cache()
+ trace.debug_memory('after clearing chk_bytes')
yield self._get_text_stream()
+ trace.debug_memory('after text stream')
self.from_repository.texts.clear_cache()
+ trace.debug_memory('after clearing text cache')
def get_stream_for_missing_keys(self, missing_keys):
# missing keys can only occur when we are byte copying and not
@@ -1192,7 +1204,9 @@
# are always rich-root, so there are no synthesised root records to
# ignore.
_, file_id, revision_id = bytes_to_info(bytes)
- text_keys.add((file_id, revision_id))
+ file_id = intern(file_id)
+ revision_id = intern(revision_id)
+ text_keys.add(StaticTuple(file_id, revision_id).intern())
yield record
More information about the bazaar-commits
mailing list