Rev 4781: Checkpoint.... I think I've cut off another 50MB or so peak mem. in http://bazaar.launchpad.net/~jameinel/bzr/2.1-static-tuple-chk-map

John Arbash Meinel john at arbash-meinel.com
Fri Oct 23 16:24:56 BST 2009


At http://bazaar.launchpad.net/~jameinel/bzr/2.1-static-tuple-chk-map

------------------------------------------------------------
revno: 4781
revision-id: john at arbash-meinel.com-20091023152444-pomqe6xmbyaowpom
parent: john at arbash-meinel.com-20091021212719-05zh4t7oo5kaird3
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.1-static-tuple-chk-map
timestamp: Fri 2009-10-23 10:24:44 -0500
message:
  Checkpoint.... I think I've cut off another 50MB or so peak mem.
  
  But I've got a bunch of cruft in this patch. So I'm checkpointing so that
  I can get back to this point, and then reverting a bunch of the changes.
-------------- next part --------------
=== modified file 'bzrlib/chk_map.py'
--- a/bzrlib/chk_map.py	2009-10-21 21:19:41 +0000
+++ b/bzrlib/chk_map.py	2009-10-23 15:24:44 +0000
@@ -1460,6 +1460,7 @@
         #       that just meanst that we will have *both* a StaticTuple and a
         #       tuple() in memory, referring to the same object. (so a net
         #       increase in memory, not a decrease.)
+        trace.debug_memory('starting a new CHKMapDifference')
         self._store = store
         self._new_root_keys = new_root_keys
         self._old_root_keys = old_root_keys
@@ -1489,6 +1490,7 @@
         # this code. (We may want to evaluate saving the raw bytes into the
         # page cache, which would allow a working tree update after the fetch
         # to not have to read the bytes again.)
+        as_st = StaticTuple.from_sequence
         stream = self._store.get_record_stream(keys, 'unordered', True)
         for record in stream:
             if self._pb is not None:
@@ -1501,11 +1503,15 @@
             if type(node) is InternalNode:
                 # Note we don't have to do node.refs() because we know that
                 # there are no children that have been pushed into this node
-                prefix_refs = node._items.items()
+                # TODO: We might want to intern these StaticTuple instances, we
+                #       need to check how much redundancy there is
+                prefix_refs = [as_st(item) for item in node._items.iteritems()]
                 items = []
             else:
                 prefix_refs = []
-                items = node._items.items()
+                # TODO: We might want to intern these StaticTuple instances, we
+                #       need to check how much redundancy there is
+                items = [as_st(item) for item in node._items.iteritems()]
             yield record, node, prefix_refs, items
 
     def _read_old_roots(self):
@@ -1599,6 +1605,8 @@
         # First pass, flush all interesting items and convert to using direct refs
         all_old_chks = self._all_old_chks
         processed_new_refs = self._processed_new_refs
+        # We will be processing these in this pass
+        processed_new_refs.update(refs)
         all_old_items = self._all_old_items
         new_items = [item for item in self._new_item_queue
                            if item not in all_old_items]
@@ -1606,19 +1614,35 @@
         if new_items:
             yield None, new_items
         refs = refs.difference(all_old_chks)
+        trace.debug_memory('sent %d new_items' % (len(new_items),))
         while refs:
             next_refs = set()
             next_refs_update = next_refs.update
             # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
             # from 1m54s to 1m51s. Consider it.
+            idx = 0
             for record, _, p_refs, items in self._read_nodes_from_store(refs):
+                idx += 1
+                if idx & 0x7fff == 0:
+                    import sys
+                    from meliae import scanner
+                    #scanner.dump_all_objects('memory_dump-%d.json' % (idx,))
+                    # Generally *everything* is reachable from _getframe
+                    count, size = scanner.get_recursive_size(sys._getframe())
+                    trace.debug_memory('read %d records %d objects %.1fMB'
+                                       % (idx, count, size/ 1024. / 1024))
                 items = [item for item in items
                          if item not in all_old_items]
                 yield record, items
                 next_refs_update([p_r[1] for p_r in p_refs])
+            trace.debug_memory('processed %d refs, shrinking %d'
+                               % (len(refs), len(next_refs)))
             next_refs = next_refs.difference(all_old_chks)
             next_refs = next_refs.difference(processed_new_refs)
             processed_new_refs.update(next_refs)
+            trace.debug_memory('processed %d refs (%d total), working on %d'
+                               % (len(refs), len(processed_new_refs),
+                                  len(next_refs)))
             refs = next_refs
 
     def _process_next_old(self):
@@ -1636,13 +1660,19 @@
     def _process_queues(self):
         while self._old_queue:
             self._process_next_old()
+            trace.debug_memory('processed next old')
+        trace.debug_memory('preparing to flush new queue %d'
+                           % (len(self._new_queue),))
         return self._flush_new_queue()
 
     def process(self):
         for record in self._read_all_roots():
             yield record, []
+        trace.debug_memory('after reading all roots (%d old to proc)'
+                           % (len(self._old_queue)))
         for record, items in self._process_queues():
             yield record, items
+        trace.debug_memory('processed queues')
 
 
 def iter_interesting_nodes(store, interesting_root_keys,

=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2009-10-19 15:45:10 +0000
+++ b/bzrlib/groupcompress.py	2009-10-23 15:24:44 +0000
@@ -1832,6 +1832,9 @@
         self.has_graph = parents
         self._is_locked = is_locked
         self._inconsistency_fatal = inconsistency_fatal
+        # GroupCompress records tend to have the same 'group' start + offset
+        # repeated over and over, this creates a surplus of ints
+        self._int_cache = {}
         if track_external_parent_refs:
             self._key_dependencies = knit._KeyRefs(
                 track_new_keys=track_new_keys)
@@ -2013,11 +2016,17 @@
         """Convert an index value to position details."""
         bits = node[2].split(' ')
         # It would be nice not to read the entire gzip.
+        # Note, start and stop would be good objects to 'intern', they are
+        # likely to be duplicated many times in a given index
         start = int(bits[0])
+        start = self._int_cache.setdefault(start, start)
         stop = int(bits[1])
+        stop = self._int_cache.setdefault(stop, stop)
         basis_end = int(bits[2])
         delta_end = int(bits[3])
-        return node[0], start, stop, basis_end, delta_end
+        # We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
+        # instance...
+        return (node[0], start, stop, basis_end, delta_end)
 
     def scan_unvalidated_index(self, graph_index):
         """Inform this _GCGraphIndex that there is an unvalidated index.

=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py	2009-10-21 21:19:41 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py	2009-10-23 15:24:44 +0000
@@ -598,6 +598,7 @@
         #   - and any present parent inventories have their chk root
         #     entries too.
         # And all this should be independent of any fallback repository.
+        trace.debug_memory('starting _check_new_inventories')
         problems = []
         key_deps = self.repo.revisions._index._key_dependencies
         new_revisions_keys = key_deps.get_new_keys()
@@ -639,6 +640,7 @@
                 % (sorted(missing_chk_roots),))
             # Don't bother checking any further.
             return problems
+        trace.debug_memory('after looking for missing_chk_roots')
         # Find all interesting chk_bytes records, and make sure they are
         # present, as well as the text keys they reference.
         chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
@@ -669,6 +671,7 @@
         if missing_text_keys:
             problems.append("missing text keys: %r"
                 % (sorted(missing_text_keys),))
+        trace.debug_memory('after checking for missing text keys')
         return problems
 
     def _execute_pack_operations(self, pack_operations,
@@ -1108,10 +1111,14 @@
         for stream_info in self._fetch_revision_texts(revision_ids):
             yield stream_info
         self._revision_keys = [(rev_id,) for rev_id in revision_ids]
+        trace.debug_memory('Before clearing revisions cache')
         self.from_repository.revisions.clear_cache()
         self.from_repository.signatures.clear_cache()
+        trace.debug_memory('after clearing revisions cache')
         yield self._get_inventory_stream(self._revision_keys)
+        trace.debug_memory('before clearing inventories cache')
         self.from_repository.inventories.clear_cache()
+        trace.debug_memory('after clearing inventories cache')
         # TODO: The keys to exclude might be part of the search recipe
         # For now, exclude all parents that are at the edge of ancestry, for
         # which we have inventories
@@ -1119,10 +1126,15 @@
         parent_keys = from_repo._find_parent_keys_of_revisions(
                         self._revision_keys)
         for stream_info in self._get_filtered_chk_streams(parent_keys):
+            trace.debug_memory('during _get_filtered_chk_streams')
             yield stream_info
+        trace.debug_memory('before clearing chk_bytes')
         self.from_repository.chk_bytes.clear_cache()
+        trace.debug_memory('after clearing chk_bytes')
         yield self._get_text_stream()
+        trace.debug_memory('after text stream')
         self.from_repository.texts.clear_cache()
+        trace.debug_memory('after clearing text cache')
 
     def get_stream_for_missing_keys(self, missing_keys):
         # missing keys can only occur when we are byte copying and not
@@ -1192,7 +1204,9 @@
             # are always rich-root, so there are no synthesised root records to
             # ignore.
             _, file_id, revision_id = bytes_to_info(bytes)
-            text_keys.add((file_id, revision_id))
+            file_id = intern(file_id)
+            revision_id = intern(revision_id)
+            text_keys.add(StaticTuple(file_id, revision_id).intern())
         yield record
 
 



More information about the bazaar-commits mailing list