Rev 80: Merge Robert's streaming work, with a bit of cleanup. in http://bazaar.launchpad.net/~meliae-dev/meliae/trunk
John Arbash Meinel
john at arbash-meinel.com
Fri Sep 11 18:34:50 BST 2009
At http://bazaar.launchpad.net/~meliae-dev/meliae/trunk
------------------------------------------------------------
revno: 80 [merge]
revision-id: john at arbash-meinel.com-20090911173433-q2v9r3nsp16c7ra4
parent: john at arbash-meinel.com-20090911170711-a08s2wm2ckk3u02n
parent: robertc at robertcollins.net-20090911043437-8dgywbyq3jceiuv7
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: trunk
timestamp: Fri 2009-09-11 12:34:33 -0500
message:
Merge Robert's streaming work, with a bit of cleanup.
Quite a few cleanups to get the test suite passing on py25 on windows.
added:
meliae/files.py files.py-20090910232446-ezz8k1lp07nl7ihv-1
remove_expensive_references.py remove_expensive_ref-20090911040736-asexqy0v5sbyggt2-1
modified:
Makefile makefile-20090910210635-0d8q79mv0x0lcqf7-1
meliae/_loader.pyx _loader.pyx-20090403200456-0nehu52g0iroknbx-1
meliae/loader.py loader.py-20090402195228-cw8lxf847wp00s90-1
meliae/tests/test_loader.py test_loader.py-20090402195228-cw8lxf847wp00s90-2
strip_duplicates.py strip_duplicates.py-20090427214004-igir4atheen5v7qn-1
-------------- next part --------------
=== modified file 'Makefile'
--- a/Makefile 2009-09-10 21:14:03 +0000
+++ b/Makefile 2009-09-11 03:38:35 +0000
@@ -1,6 +1,6 @@
all: build_inplace
-check:
+check: all
python run_tests.py
build_inplace:
=== modified file 'meliae/_loader.pyx'
--- a/meliae/_loader.pyx 2009-09-08 17:06:14 +0000
+++ b/meliae/_loader.pyx 2009-09-11 03:38:35 +0000
@@ -216,3 +216,27 @@
def _intern_from_cache(self, cache):
self.type_str = cache.setdefault(self.type_str, self.type_str)
+
+ def to_json(self):
+ """Convert this MemObject to json."""
+ refs = []
+ for ref in sorted(self.ref_list):
+ refs.append(str(ref))
+ if self.length != -1:
+ length = '"len": %d, ' % self.length
+ else:
+ length = ''
+ if self.value is not None:
+ if self.type_str == 'int':
+ value = '"value": %s, ' % self.value
+ else:
+ value = '"value": "%s", ' % self.value
+ else:
+ value = ''
+ if self.name:
+ name = '"name": "%s", ' % self.name
+ else:
+ name = ''
+ return '{"address": %d, "type": "%s", "size": %d, %s%s%s"refs": [%s]}' % (
+ self.address, self.type_str, self.size, name, length, value,
+ ', '.join(refs))
=== added file 'meliae/files.py'
--- a/meliae/files.py 1970-01-01 00:00:00 +0000
+++ b/meliae/files.py 2009-09-11 17:34:33 +0000
@@ -0,0 +1,85 @@
+# Copyright (C) 2009 Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License and
+# the GNU Lesser General Public License along with this program. If
+# not, see <http://www.gnu.org/licenses/>.
+
+"""Work with files on disk."""
+
+import errno
+import gzip
+try:
+ import multiprocessing
+except ImportError:
+ multiprocessing = None
+import subprocess
+import sys
+
+
+def open_file(filename):
+ """Open a file which might be a regular file or a gzip.
+
+ :return: An iterator of lines, and a cleanup function.
+ """
+ source = open(filename, 'r')
+ gzip_source = gzip.GzipFile(mode='rb', fileobj=source)
+ try:
+ line = gzip_source.readline()
+ except KeyboardInterrupt:
+ raise
+ except:
+ # probably not a gzip file
+ source.seek(0)
+ return source, None
+ else:
+ gzip_source.close()
+ source.close()
+ # a gzip file
+ # preference - a gzip subprocess
+ if sys.platform == 'win32':
+ close_fds = False # not supported
+ else:
+ close_fds = True
+ try:
+ process = subprocess.Popen(['gunzip', '-c', filename],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, close_fds=close_fds)
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ # failing that, use another python process
+ return _open_mprocess(filename)
+ # make reading from stdin, or writting errors cause immediate aborts
+ process.stdin.close()
+ process.stderr.close()
+ return process.stdout, process.terminate
+
+
+def _open_mprocess(filename):
+ if multiprocessing is None:
+ # can't multiprocess, use inprocess gzip.
+ return gzip.GzipFile(filename, mode='rb'), None
+ parent, child = multiprocessing.Pipe(False)
+ def stream_file(filename, child):
+ gzip_source = gzip.GzipFile(filename, 'rb')
+ for line in gzip_source:
+ child.send(line)
+ child.send(None)
+ process = multiprocessing.Process(target=stream_file, args=(filename, child))
+ process.start()
+ def iter_pipe():
+ while True:
+ line = parent.recv()
+ if line is None:
+ break
+ yield line
+ return iter_pipe(), process.join
=== modified file 'meliae/loader.py'
--- a/meliae/loader.py 2009-09-08 17:19:16 +0000
+++ b/meliae/loader.py 2009-09-11 17:34:33 +0000
@@ -31,6 +31,7 @@
simplejson = None
from meliae import (
+ files,
_intset,
_loader,
)
@@ -208,7 +209,7 @@
obj.referrers = referrers.get(obj.address, ())
def remove_expensive_references(self):
- """Filter out 'expensive' references.
+ """Filter out references that are mere houskeeping links.
module.__dict__ tends to reference lots of other modules, which in turn
brings in the global reference cycle. Going further
@@ -216,71 +217,15 @@
the global cycle. Generally these references aren't interesting, simply
because they end up referring to *everything*.
- So for now, we filter out any reference to a module.
+ We filter out any reference to modules, frames, types, function globals
+ pointers & LRU sideways references.
"""
- null_memobj = _loader.MemObject(0, '<ex-reference>', 0, [])
- self.objs[0] = null_memobj
- # First pass, find objects we don't want to reference any more
- noref_objs = _intset.IDSet()
- lru_objs = _intset.IDSet()
+ source = lambda:self.objs.itervalues()
total_objs = len(self.objs)
- total_steps = total_objs * 2
- for idx, obj in enumerate(self.objs.itervalues()):
- # 'module's have a single __dict__, which tends to refer to other
- # modules. As you start tracking into that, you end up getting into
- # reference cycles, etc, which generally ends up referencing every
- # object in memory.
- # 'frame' also tends to be self referential, and a single frame
- # ends up referencing the entire current state
- # 'type' generally is self referential through several attributes.
- # __bases__ means we recurse all the way up to object, and object
- # has __subclasses__, which means we recurse down into all types.
- # In general, not helpful for debugging memory consumption
- if self.show_progress and idx & 0x1ff == 0:
- sys.stderr.write('finding expensive refs... %8d / %8d \r'
- % (idx, total_steps))
- if obj.type_str in ('module', 'frame', 'type'):
- noref_objs.add(obj.address)
- if obj.type_str == '_LRUNode':
- lru_objs.add(obj.address)
- # Second pass, any object which refers to something in noref_objs will
- # have that reference removed, and replaced with the null_memobj
- num_expensive = len(noref_objs)
- for idx, obj in enumerate(self.objs.itervalues()):
- if self.show_progress and idx & 0x1ff == 0:
- sys.stderr.write('removing %d expensive refs... %8d / %8d \r'
- % (num_expensive, idx + total_objs,
- total_steps))
- if obj.type_str == 'function':
- # Functions have a reference to 'globals' which is not very
- # helpful for having a clear understanding of what is going on
- # especially since the function itself is in its own globals
- # XXX: This is probably not a guaranteed order, but currently
- # func_traverse returns:
- # func_code, func_globals, func_module, func_defaults,
- # func_doc, func_name, func_dict, func_closure
- # We want to remove the reference to globals and module
- refs = list(obj.ref_list)
- obj.ref_list = refs[:1] + refs[3:] + [0]
- continue
- if obj.type_str == '_LRUNode':
- # We remove the 'sideways' references
- obj.ref_list = [ref for ref in obj.ref_list
- if ref not in lru_objs]
- continue
- for ref in obj.ref_list:
- if ref in noref_objs:
- break
- else:
- # No bad references, keep going
- continue
- new_ref_list = [ref for ref in obj.ref_list
- if ref not in noref_objs]
- new_ref_list.append(0)
- obj.ref_list = new_ref_list
- if self.show_progress:
- sys.stderr.write('removed %d expensive refs from %d objs%s\n'
- % (num_expensive, total_objs, ' '*20))
+ for changed, obj in remove_expensive_references(source, total_objs,
+ self.show_progress):
+ if changed:
+ self.objs[obj.address] = obj
def compute_total_size(self):
"""This computes the total bytes referenced from this object."""
@@ -341,18 +286,42 @@
:param source: If this is a string, we will open it as a file and read all
objects. For any other type, we will simply iterate and parse objects
out, so the object should be an iterator of json lines.
+ :param using_json: Use simplejson rather than the regex. This allows
+ arbitrary ordered json dicts to be parsed but still requires per-line
+ layout.
"""
- tstart = time.time()
+ cleanup = None
if isinstance(source, str):
- source = open(source, 'r')
- input_size = os.fstat(source.fileno()).st_size
+ source, cleanup = files.open_file(source)
+ if isinstance(source, file):
+ input_size = os.fstat(source.fileno()).st_size
+ else:
+ input_size = 0
elif isinstance(source, (list, tuple)):
input_size = sum(map(len, source))
else:
input_size = 0
+ try:
+ return _load(source, using_json, show_prog, input_size)
+ finally:
+ if cleanup is not None:
+ cleanup()
+
+
+def iter_objs(source, using_json=False, show_prog=False, input_size=0, objs=None):
+ """Iterate MemObjects from json.
+
+ :param source: A line iterator.
+ :param using_json: Use simplejson. See load().
+ :param show_prog: Show progress.
+ :param input_size: The size of the input if known (in bytes) or 0.
+ :param objs: Either None or a dict containing objects by address. If not
+ None, then duplicate objects will not be parsed or output.
+ :return: A generator of MemObjects.
+ """
# TODO: cStringIO?
+ tstart = time.time()
input_mb = input_size / 1024. / 1024.
- objs = {}
temp_cache = {}
address_re = re.compile(
r'{"address": (?P<address>\d+)'
@@ -360,24 +329,25 @@
bytes_read = count = 0
last = 0
mb_read = 0
-
+ if using_json:
+ decoder = _from_json
+ else:
+ decoder = _from_line
for line_num, line in enumerate(source):
bytes_read += len(line)
if line in ("[\n", "]\n"):
continue
if line.endswith(',\n'):
line = line[:-2]
- m = address_re.match(line)
- if not m:
- continue
- address = int(m.group('address'))
- if address in objs: # Skip duplicate objects
- continue
- if using_json:
- memobj = _from_json(_loader.MemObject, line, temp_cache=temp_cache)
- else:
- memobj = _from_line(_loader.MemObject, line, temp_cache=temp_cache)
- objs[memobj.address] = memobj
+ if objs:
+ # Skip duplicate objects
+ m = address_re.match(line)
+ if not m:
+ continue
+ address = int(m.group('address'))
+ if address in objs:
+ continue
+ yield decoder(_loader.MemObject, line, temp_cache=temp_cache)
if show_prog and (line_num - last > 5000):
last = line_num
mb_read = bytes_read / 1024. / 1024
@@ -391,5 +361,105 @@
sys.stderr.write(
'loaded line %d, %d objs, %5.1f / %5.1f MiB read in %.1fs \n'
% (line_num, len(objs), mb_read, input_mb, tdelta))
+
+
+def _load(source, using_json, show_prog, input_size):
+ objs = {}
+ for memobj in iter_objs(source, using_json, show_prog, input_size, objs):
+ objs[memobj.address] = memobj
# _fill_total_size(objs)
return ObjManager(objs, show_progress=show_prog)
+
+
+def remove_expensive_references(source, total_objs=0, show_progress=False):
+ """Filter out references that are mere houskeeping links.
+
+ module.__dict__ tends to reference lots of other modules, which in turn
+ brings in the global reference cycle. Going further
+ function.__globals__ references module.__dict__, so it *too* ends up in
+ the global cycle. Generally these references aren't interesting, simply
+ because they end up referring to *everything*.
+
+ We filter out any reference to modules, frames, types, function globals
+ pointers & LRU sideways references.
+
+ :param source: A callable that returns an iterator of MemObjects. This
+ will be called twice.
+ :param total_objs: The total objects to be filtered, if known. If
+ show_progress is False or the count of objects is unknown, 0.
+ :return: An iterator of (changed, MemObject) objects with expensive
+ references removed.
+ """
+ # First pass, find objects we don't want to reference any more
+ noref_objs = _intset.IDSet()
+ lru_objs = _intset.IDSet()
+ total_steps = total_objs * 2
+ seen_zero = False
+ for idx, obj in enumerate(source()):
+ # 'module's have a single __dict__, which tends to refer to other
+ # modules. As you start tracking into that, you end up getting into
+ # reference cycles, etc, which generally ends up referencing every
+ # object in memory.
+ # 'frame' also tends to be self referential, and a single frame
+ # ends up referencing the entire current state
+ # 'type' generally is self referential through several attributes.
+ # __bases__ means we recurse all the way up to object, and object
+ # has __subclasses__, which means we recurse down into all types.
+ # In general, not helpful for debugging memory consumption
+ if show_progress and idx & 0x1ff == 0:
+ sys.stderr.write('finding expensive refs... %8d / %8d \r'
+ % (idx, total_steps))
+ if obj.type_str in ('module', 'frame', 'type'):
+ noref_objs.add(obj.address)
+ if obj.type_str == '_LRUNode':
+ lru_objs.add(obj.address)
+ if obj.address == 0:
+ seen_zero = True
+ # Second pass, any object which refers to something in noref_objs will
+ # have that reference removed, and replaced with the null_memobj
+ num_expensive = len(noref_objs)
+ null_memobj = _loader.MemObject(0, '<ex-reference>', 0, [])
+ if not seen_zero:
+ yield (True, null_memobj)
+ if show_progress and total_objs == 0:
+ total_objs = idx
+ total_steps = total_objs * 2
+ for idx, obj in enumerate(source()):
+ if show_progress and idx & 0x1ff == 0:
+ sys.stderr.write('removing %d expensive refs... %8d / %8d \r'
+ % (num_expensive, idx + total_objs,
+ total_steps))
+ if obj.type_str == 'function':
+ # Functions have a reference to 'globals' which is not very
+ # helpful for having a clear understanding of what is going on
+ # especially since the function itself is in its own globals
+ # XXX: This is probably not a guaranteed order, but currently
+ # func_traverse returns:
+ # func_code, func_globals, func_module, func_defaults,
+ # func_doc, func_name, func_dict, func_closure
+ # We want to remove the reference to globals and module
+ refs = list(obj.ref_list)
+ obj.ref_list = refs[:1] + refs[3:] + [0]
+ yield (True, obj)
+ continue
+ elif obj.type_str == '_LRUNode':
+ # We remove the 'sideways' references
+ obj.ref_list = [ref for ref in obj.ref_list
+ if ref not in lru_objs]
+ yield (True, obj)
+ continue
+ for ref in obj.ref_list:
+ if ref in noref_objs:
+ break
+ else:
+ # No bad references, keep going
+ yield (False, obj)
+ continue
+ new_ref_list = [ref for ref in obj.ref_list
+ if ref not in noref_objs]
+ new_ref_list.append(0)
+ obj.ref_list = new_ref_list
+ yield (True, obj)
+ if show_progress:
+ sys.stderr.write('removed %d expensive refs from %d objs%s\n'
+ % (num_expensive, total_objs, ' '*20))
=== modified file 'meliae/tests/test_loader.py'
--- a/meliae/tests/test_loader.py 2009-09-08 17:19:16 +0000
+++ b/meliae/tests/test_loader.py 2009-09-11 17:34:33 +0000
@@ -16,6 +16,8 @@
"""Read back in a dump file and process it"""
+import gzip
+import os
import sys
import tempfile
@@ -41,9 +43,9 @@
'{"address": 3, "type": "list", "size": 44, "len": 3, "refs": [3, 4, 5]}',
'{"address": 5, "type": "int", "size": 12, "value": 1, "refs": []}',
'{"address": 4, "type": "int", "size": 12, "value": 2, "refs": []}',
-'{"address": 2, "type": "dict", "size": 124, "len": 2, "refs": [5, 4, 6, 7]}',
+'{"address": 2, "type": "dict", "size": 124, "len": 2, "refs": [4, 5, 6, 7]}',
'{"address": 7, "type": "tuple", "size": 20, "len": 2, "refs": [4, 5]}',
-'{"address": 6, "type": "str", "size": 29, "len": 5, "value": "a str"'
+'{"address": 6, "type": "str", "size": 29, "name": "bah", "len": 5, "value": "a str"'
', "refs": []}',
]
@@ -77,6 +79,58 @@
def test_load_example(self):
objs = loader.load(_example_dump, show_prog=False)
+ def test_load_compressed(self):
+ # unfortunately NamedTemporaryFile's cannot be re-opened on Windows
+ fd, name = tempfile.mkstemp(prefix='meliae-')
+ f = os.fdopen(fd, 'wb')
+ try:
+ content = gzip.GzipFile(mode='wb', compresslevel=6, fileobj=f)
+ for line in _example_dump:
+ content.write(line + '\n')
+ content.flush()
+ content.close()
+ del content
+ f.close()
+ objs = loader.load(name, show_prog=False).objs
+ objs[1]
+ finally:
+ f.close()
+ os.remove(name)
+
+
+
+class TestRemoveExpensiveReferences(tests.TestCase):
+
+ def test_remove_expensive_references(self):
+ lines = list(_example_dump)
+ lines.append('{"address": 8, "type": "module", "size": 12'
+ ', "name": "mymod", "refs": [9]}')
+ lines.append('{"address": 9, "type": "dict", "size": 124'
+ ', "refs": [10, 11]}')
+ lines.append('{"address": 10, "type": "module", "size": 12'
+ ', "name": "mod2", "refs": [12]}')
+ lines.append('{"address": 11, "type": "str", "size": 27'
+ ', "value": "boo", "refs": []}')
+ lines.append('{"address": 12, "type": "dict", "size": 124'
+ ', "refs": []}')
+ source = lambda:loader.iter_objs(lines)
+ mymod_dict = list(source())[8]
+ self.assertEqual([10, 11], mymod_dict.ref_list)
+ result = list(loader.remove_expensive_references(source))
+ null_obj = result[0][1]
+ self.assertEqual(0, null_obj.address)
+ self.assertEqual('<ex-reference>', null_obj.type_str)
+ self.assertEqual([11, 0], result[9][1].ref_list)
+
+
+class TestMemObj(tests.TestCase):
+
+ def test_to_json(self):
+ objs = list(loader.iter_objs(_example_dump))
+ objs.sort(key=lambda x:x.address)
+ expected = sorted(_example_dump)
+ self.assertEqual(expected, [obj.to_json() for obj in objs])
+
class TestObjManager(tests.TestCase):
=== added file 'remove_expensive_references.py'
--- a/remove_expensive_references.py 1970-01-01 00:00:00 +0000
+++ b/remove_expensive_references.py 2009-09-11 04:34:37 +0000
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+# Copyright (C) 2009 Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License and
+# the GNU Lesser General Public License along with this program. If
+# not, see <http://www.gnu.org/licenses/>.
+
+"""Remove expensive references.
+
+This script takes 1 or two filenames and filters the first into either std out,
+or the second filename.
+"""
+
+import os
+import re
+import sys
+import time
+
+from meliae import files, loader
+
+
+def main(args):
+ import optparse
+ p = optparse.OptionParser(
+ '%prog INFILE [OUTFILE]')
+
+ opts, args = p.parse_args(args)
+ if len(args) > 2:
+ sys.stderr.write('We only support 2 filenames, not %d\n' % (len(args),))
+ return -1
+ if len(args) < 1:
+ sys.stderr.write("Must supply INFILE\n")
+ return -1
+
+ def source():
+ infile, cleanup = files.open_file(args[0])
+ for obj in loader.iter_objs(infile):
+ yield obj
+ cleanup()
+ if len(args) == 1:
+ outfile = sys.stdout
+ else:
+ outfile = open(args[1], 'wb')
+ for _, obj in loader.remove_expensive_references(source, show_progress=True):
+ outfile.write(obj.to_json() + '\n')
+ outfile.flush()
+
+
+if __name__ == '__main__':
+ sys.exit(main(sys.argv[1:]))
+
=== modified file 'strip_duplicates.py'
--- a/strip_duplicates.py 2009-09-08 17:06:14 +0000
+++ b/strip_duplicates.py 2009-09-11 04:34:37 +0000
@@ -29,9 +29,11 @@
import sys
import time
+from meliae import files
+
def strip_duplicate(infile, outfile, insize=None):
- from memory_dump import _intset
+ from meliae import _intset
seen = _intset.IntSet()
address_re = re.compile(
@@ -71,22 +73,33 @@
'%prog [INFILE [OUTFILE]]')
opts, args = p.parse_args(args)
-
if len(args) > 2:
sys.stderr.write('We only support 2 filenames, not %d\n' % (len(args),))
return -1
- if len(args) == 0:
- infile = sys.stdin
- insize = None
- outfile = sys.stdout
- else:
- infile = open(args[0], 'rb')
- insize = os.fstat(infile.fileno()).st_size
- if len(args) == 1:
+
+ cleanups = []
+ try:
+ if len(args) == 0:
+ infile = sys.stdin
+ insize = None
outfile = sys.stdout
else:
- outfile = open(args[1], 'wb')
- strip_duplicate(infile, outfile, insize)
+ infile, cleanup = files.open_file(args[0])
+ if cleanup is not None:
+ cleanups.append(cleanup)
+ if isinstance(infile, file):
+ # pipes are files, but 0 isn't useful.
+ insize = os.fstat(infile.fileno()).st_size or None
+ else:
+ insize = None
+ if len(args) == 1:
+ outfile = sys.stdout
+ else:
+ outfile = open(args[1], 'wb')
+ strip_duplicate(infile, outfile, insize)
+ finally:
+ for cleanup in cleanups:
+ cleanup()
if __name__ == '__main__':
More information about the bazaar-commits
mailing list