Rev 80: Merge Robert's streaming work, with a bit of cleanup. in http://bazaar.launchpad.net/~meliae-dev/meliae/trunk

John Arbash Meinel john at arbash-meinel.com
Fri Sep 11 18:34:50 BST 2009


At http://bazaar.launchpad.net/~meliae-dev/meliae/trunk

------------------------------------------------------------
revno: 80 [merge]
revision-id: john at arbash-meinel.com-20090911173433-q2v9r3nsp16c7ra4
parent: john at arbash-meinel.com-20090911170711-a08s2wm2ckk3u02n
parent: robertc at robertcollins.net-20090911043437-8dgywbyq3jceiuv7
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: trunk
timestamp: Fri 2009-09-11 12:34:33 -0500
message:
  Merge Robert's streaming work, with a bit of cleanup.
  
  Quite a few cleanups to get the test suite passing on py25 on windows.
added:
  meliae/files.py                files.py-20090910232446-ezz8k1lp07nl7ihv-1
  remove_expensive_references.py remove_expensive_ref-20090911040736-asexqy0v5sbyggt2-1
modified:
  Makefile                       makefile-20090910210635-0d8q79mv0x0lcqf7-1
  meliae/_loader.pyx             _loader.pyx-20090403200456-0nehu52g0iroknbx-1
  meliae/loader.py               loader.py-20090402195228-cw8lxf847wp00s90-1
  meliae/tests/test_loader.py    test_loader.py-20090402195228-cw8lxf847wp00s90-2
  strip_duplicates.py            strip_duplicates.py-20090427214004-igir4atheen5v7qn-1
-------------- next part --------------
=== modified file 'Makefile'
--- a/Makefile	2009-09-10 21:14:03 +0000
+++ b/Makefile	2009-09-11 03:38:35 +0000
@@ -1,6 +1,6 @@
 all: build_inplace
 
-check:
+check: all
 	python run_tests.py
 
 build_inplace:

=== modified file 'meliae/_loader.pyx'
--- a/meliae/_loader.pyx	2009-09-08 17:06:14 +0000
+++ b/meliae/_loader.pyx	2009-09-11 03:38:35 +0000
@@ -216,3 +216,27 @@
 
     def _intern_from_cache(self, cache):
         self.type_str = cache.setdefault(self.type_str, self.type_str)
+
+    def to_json(self):
+        """Convert this MemObject to json."""
+        refs = []
+        for ref in sorted(self.ref_list):
+            refs.append(str(ref))
+        if self.length != -1:
+            length = '"len": %d, ' % self.length
+        else:
+            length = ''
+        if self.value is not None:
+            if self.type_str == 'int':
+                value = '"value": %s, ' % self.value
+            else:
+                value = '"value": "%s", ' % self.value
+        else:
+            value = ''
+        if self.name:
+            name = '"name": "%s", ' % self.name
+        else:
+            name = ''
+        return '{"address": %d, "type": "%s", "size": %d, %s%s%s"refs": [%s]}' % (
+            self.address, self.type_str, self.size, name, length, value,
+            ', '.join(refs))

=== added file 'meliae/files.py'
--- a/meliae/files.py	1970-01-01 00:00:00 +0000
+++ b/meliae/files.py	2009-09-11 17:34:33 +0000
@@ -0,0 +1,85 @@
+# Copyright (C) 2009 Canonical Ltd
+# 
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License and
+# the GNU Lesser General Public License along with this program.  If
+# not, see <http://www.gnu.org/licenses/>.
+
+"""Work with files on disk."""
+
+import errno
+import gzip
+try:
+    import multiprocessing
+except ImportError:
+    multiprocessing = None
+import subprocess
+import sys
+
+
+def open_file(filename):
+    """Open a file which might be a regular file or a gzip.
+    
+    :return: An iterator of lines, and a cleanup function.
+    """
+    source = open(filename, 'r')
+    gzip_source = gzip.GzipFile(mode='rb', fileobj=source)
+    try:
+        line = gzip_source.readline()
+    except KeyboardInterrupt:
+        raise
+    except:
+        # probably not a gzip file
+        source.seek(0)
+        return source, None
+    else:
+        gzip_source.close()
+        source.close()
+        # a gzip file
+        # preference - a gzip subprocess
+        if sys.platform == 'win32':
+            close_fds = False # not supported
+        else:
+            close_fds = True
+        try:
+            process = subprocess.Popen(['gunzip', '-c', filename],
+                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE, close_fds=close_fds)
+        except OSError, e:
+            if e.errno == errno.ENOENT:
+                # failing that, use another python process
+                return _open_mprocess(filename)
+        # make reading from stdin, or writting errors cause immediate aborts
+        process.stdin.close()
+        process.stderr.close()
+        return process.stdout, process.terminate
+
+
+def _open_mprocess(filename):
+    if multiprocessing is None:
+        # can't multiprocess, use inprocess gzip.
+        return gzip.GzipFile(filename, mode='rb'), None
+    parent, child = multiprocessing.Pipe(False)
+    def stream_file(filename, child):
+        gzip_source = gzip.GzipFile(filename, 'rb')
+        for line in gzip_source:
+            child.send(line)
+        child.send(None)
+    process = multiprocessing.Process(target=stream_file, args=(filename, child))
+    process.start()
+    def iter_pipe():
+        while True:
+            line = parent.recv()
+            if line is None:
+                break
+            yield line
+    return iter_pipe(), process.join

=== modified file 'meliae/loader.py'
--- a/meliae/loader.py	2009-09-08 17:19:16 +0000
+++ b/meliae/loader.py	2009-09-11 17:34:33 +0000
@@ -31,6 +31,7 @@
     simplejson = None
 
 from meliae import (
+    files,
     _intset,
     _loader,
     )
@@ -208,7 +209,7 @@
             obj.referrers = referrers.get(obj.address, ())
 
     def remove_expensive_references(self):
-        """Filter out 'expensive' references.
+        """Filter out references that are mere houskeeping links.
 
         module.__dict__ tends to reference lots of other modules, which in turn
         brings in the global reference cycle. Going further
@@ -216,71 +217,15 @@
         the global cycle. Generally these references aren't interesting, simply
         because they end up referring to *everything*.
 
-        So for now, we filter out any reference to a module.
+        We filter out any reference to modules, frames, types, function globals
+        pointers & LRU sideways references.
         """
-        null_memobj = _loader.MemObject(0, '<ex-reference>', 0, [])
-        self.objs[0] = null_memobj
-        # First pass, find objects we don't want to reference any more
-        noref_objs = _intset.IDSet()
-        lru_objs = _intset.IDSet()
+        source = lambda:self.objs.itervalues()
         total_objs = len(self.objs)
-        total_steps = total_objs * 2
-        for idx, obj in enumerate(self.objs.itervalues()):
-            # 'module's have a single __dict__, which tends to refer to other
-            # modules. As you start tracking into that, you end up getting into
-            # reference cycles, etc, which generally ends up referencing every
-            # object in memory.
-            # 'frame' also tends to be self referential, and a single frame
-            # ends up referencing the entire current state
-            # 'type' generally is self referential through several attributes.
-            # __bases__ means we recurse all the way up to object, and object
-            # has __subclasses__, which means we recurse down into all types.
-            # In general, not helpful for debugging memory consumption
-            if self.show_progress and idx & 0x1ff == 0:
-                sys.stderr.write('finding expensive refs... %8d / %8d    \r'
-                                 % (idx, total_steps))
-            if obj.type_str in ('module', 'frame', 'type'):
-                noref_objs.add(obj.address)
-            if obj.type_str == '_LRUNode':
-                lru_objs.add(obj.address)
-        # Second pass, any object which refers to something in noref_objs will
-        # have that reference removed, and replaced with the null_memobj
-        num_expensive = len(noref_objs)
-        for idx, obj in enumerate(self.objs.itervalues()):
-            if self.show_progress and idx & 0x1ff == 0:
-                sys.stderr.write('removing %d expensive refs... %8d / %8d   \r'
-                                 % (num_expensive, idx + total_objs,
-                                    total_steps))
-            if obj.type_str == 'function':
-                # Functions have a reference to 'globals' which is not very
-                # helpful for having a clear understanding of what is going on
-                # especially since the function itself is in its own globals
-                # XXX: This is probably not a guaranteed order, but currently
-                #       func_traverse returns:
-                #   func_code, func_globals, func_module, func_defaults,
-                #   func_doc, func_name, func_dict, func_closure
-                # We want to remove the reference to globals and module
-                refs = list(obj.ref_list)
-                obj.ref_list = refs[:1] + refs[3:] + [0]
-                continue
-            if obj.type_str == '_LRUNode':
-                # We remove the 'sideways' references
-                obj.ref_list = [ref for ref in obj.ref_list
-                                     if ref not in lru_objs]
-                continue
-            for ref in obj.ref_list:
-                if ref in noref_objs:
-                    break
-            else:
-                # No bad references, keep going
-                continue
-            new_ref_list = [ref for ref in obj.ref_list
-                                 if ref not in noref_objs]
-            new_ref_list.append(0)
-            obj.ref_list = new_ref_list
-        if self.show_progress:
-            sys.stderr.write('removed %d expensive refs from %d objs%s\n'
-                             % (num_expensive, total_objs, ' '*20))
+        for changed, obj in remove_expensive_references(source, total_objs,
+            self.show_progress):
+            if changed:
+                self.objs[obj.address] = obj
 
     def compute_total_size(self):
         """This computes the total bytes referenced from this object."""
@@ -341,18 +286,42 @@
     :param source: If this is a string, we will open it as a file and read all
         objects. For any other type, we will simply iterate and parse objects
         out, so the object should be an iterator of json lines.
+    :param using_json: Use simplejson rather than the regex. This allows
+        arbitrary ordered json dicts to be parsed but still requires per-line
+        layout.
     """
-    tstart = time.time()
+    cleanup = None
     if isinstance(source, str):
-        source = open(source, 'r')
-        input_size = os.fstat(source.fileno()).st_size
+        source, cleanup = files.open_file(source)
+        if isinstance(source, file):
+            input_size = os.fstat(source.fileno()).st_size
+        else:
+            input_size = 0
     elif isinstance(source, (list, tuple)):
         input_size = sum(map(len, source))
     else:
         input_size = 0
+    try:
+        return _load(source, using_json, show_prog, input_size)
+    finally:
+        if cleanup is not None:
+            cleanup()
+
+
+def iter_objs(source, using_json=False, show_prog=False, input_size=0, objs=None):
+    """Iterate MemObjects from json.
+
+    :param source: A line iterator.
+    :param using_json: Use simplejson. See load().
+    :param show_prog: Show progress.
+    :param input_size: The size of the input if known (in bytes) or 0.
+    :param objs: Either None or a dict containing objects by address. If not
+        None, then duplicate objects will not be parsed or output.
+    :return: A generator of MemObjects.
+    """
     # TODO: cStringIO?
+    tstart = time.time()
     input_mb = input_size / 1024. / 1024.
-    objs = {}
     temp_cache = {}
     address_re = re.compile(
         r'{"address": (?P<address>\d+)'
@@ -360,24 +329,25 @@
     bytes_read = count = 0
     last = 0
     mb_read = 0
-
+    if using_json:
+        decoder = _from_json
+    else:
+        decoder = _from_line
     for line_num, line in enumerate(source):
         bytes_read += len(line)
         if line in ("[\n", "]\n"):
             continue
         if line.endswith(',\n'):
             line = line[:-2]
-        m = address_re.match(line)
-        if not m:
-            continue
-        address = int(m.group('address'))
-        if address in objs: # Skip duplicate objects
-            continue
-        if using_json:
-            memobj = _from_json(_loader.MemObject, line, temp_cache=temp_cache)
-        else:
-            memobj = _from_line(_loader.MemObject, line, temp_cache=temp_cache)
-        objs[memobj.address] = memobj
+        if objs:
+            # Skip duplicate objects
+            m = address_re.match(line)
+            if not m:
+                continue
+            address = int(m.group('address'))
+            if address in objs:
+                continue
+        yield decoder(_loader.MemObject, line, temp_cache=temp_cache)
         if show_prog and (line_num - last > 5000):
             last = line_num
             mb_read = bytes_read / 1024. / 1024
@@ -391,5 +361,105 @@
         sys.stderr.write(
             'loaded line %d, %d objs, %5.1f / %5.1f MiB read in %.1fs        \n'
             % (line_num, len(objs), mb_read, input_mb, tdelta))
+
+
+def _load(source, using_json, show_prog, input_size):
+    objs = {}
+    for memobj in iter_objs(source, using_json, show_prog, input_size, objs):
+        objs[memobj.address] = memobj
     # _fill_total_size(objs)
     return ObjManager(objs, show_progress=show_prog)
+
+
+def remove_expensive_references(source, total_objs=0, show_progress=False):
+    """Filter out references that are mere houskeeping links.
+
+    module.__dict__ tends to reference lots of other modules, which in turn
+    brings in the global reference cycle. Going further
+    function.__globals__ references module.__dict__, so it *too* ends up in
+    the global cycle. Generally these references aren't interesting, simply
+    because they end up referring to *everything*.
+
+    We filter out any reference to modules, frames, types, function globals
+    pointers & LRU sideways references.
+
+    :param source: A callable that returns an iterator of MemObjects. This
+        will be called twice.
+    :param total_objs: The total objects to be filtered, if known. If
+        show_progress is False or the count of objects is unknown, 0.
+    :return: An iterator of (changed, MemObject) objects with expensive
+        references removed.
+    """
+    # First pass, find objects we don't want to reference any more
+    noref_objs = _intset.IDSet()
+    lru_objs = _intset.IDSet()
+    total_steps = total_objs * 2
+    seen_zero = False
+    for idx, obj in enumerate(source()):
+        # 'module's have a single __dict__, which tends to refer to other
+        # modules. As you start tracking into that, you end up getting into
+        # reference cycles, etc, which generally ends up referencing every
+        # object in memory.
+        # 'frame' also tends to be self referential, and a single frame
+        # ends up referencing the entire current state
+        # 'type' generally is self referential through several attributes.
+        # __bases__ means we recurse all the way up to object, and object
+        # has __subclasses__, which means we recurse down into all types.
+        # In general, not helpful for debugging memory consumption
+        if show_progress and idx & 0x1ff == 0:
+            sys.stderr.write('finding expensive refs... %8d / %8d    \r'
+                             % (idx, total_steps))
+        if obj.type_str in ('module', 'frame', 'type'):
+            noref_objs.add(obj.address)
+        if obj.type_str == '_LRUNode':
+            lru_objs.add(obj.address)
+        if obj.address == 0:
+            seen_zero = True
+    # Second pass, any object which refers to something in noref_objs will
+    # have that reference removed, and replaced with the null_memobj
+    num_expensive = len(noref_objs)
+    null_memobj = _loader.MemObject(0, '<ex-reference>', 0, [])
+    if not seen_zero:
+        yield (True, null_memobj)
+    if show_progress and total_objs == 0:
+        total_objs = idx
+        total_steps = total_objs * 2
+    for idx, obj in enumerate(source()):
+        if show_progress and idx & 0x1ff == 0:
+            sys.stderr.write('removing %d expensive refs... %8d / %8d   \r'
+                             % (num_expensive, idx + total_objs,
+                                total_steps))
+        if obj.type_str == 'function':
+            # Functions have a reference to 'globals' which is not very
+            # helpful for having a clear understanding of what is going on
+            # especially since the function itself is in its own globals
+            # XXX: This is probably not a guaranteed order, but currently
+            #       func_traverse returns:
+            #   func_code, func_globals, func_module, func_defaults,
+            #   func_doc, func_name, func_dict, func_closure
+            # We want to remove the reference to globals and module
+            refs = list(obj.ref_list)
+            obj.ref_list = refs[:1] + refs[3:] + [0]
+            yield (True, obj)
+            continue
+        elif obj.type_str == '_LRUNode':
+            # We remove the 'sideways' references
+            obj.ref_list = [ref for ref in obj.ref_list
+                                 if ref not in lru_objs]
+            yield (True, obj)
+            continue
+        for ref in obj.ref_list:
+            if ref in noref_objs:
+                break
+        else:
+            # No bad references, keep going
+            yield (False, obj)
+            continue
+        new_ref_list = [ref for ref in obj.ref_list
+                             if ref not in noref_objs]
+        new_ref_list.append(0)
+        obj.ref_list = new_ref_list
+        yield (True, obj)
+    if show_progress:
+        sys.stderr.write('removed %d expensive refs from %d objs%s\n'
+                         % (num_expensive, total_objs, ' '*20))

=== modified file 'meliae/tests/test_loader.py'
--- a/meliae/tests/test_loader.py	2009-09-08 17:19:16 +0000
+++ b/meliae/tests/test_loader.py	2009-09-11 17:34:33 +0000
@@ -16,6 +16,8 @@
 
 """Read back in a dump file and process it"""
 
+import gzip
+import os
 import sys
 import tempfile
 
@@ -41,9 +43,9 @@
 '{"address": 3, "type": "list", "size": 44, "len": 3, "refs": [3, 4, 5]}',
 '{"address": 5, "type": "int", "size": 12, "value": 1, "refs": []}',
 '{"address": 4, "type": "int", "size": 12, "value": 2, "refs": []}',
-'{"address": 2, "type": "dict", "size": 124, "len": 2, "refs": [5, 4, 6, 7]}',
+'{"address": 2, "type": "dict", "size": 124, "len": 2, "refs": [4, 5, 6, 7]}',
 '{"address": 7, "type": "tuple", "size": 20, "len": 2, "refs": [4, 5]}',
-'{"address": 6, "type": "str", "size": 29, "len": 5, "value": "a str"'
+'{"address": 6, "type": "str", "size": 29, "name": "bah", "len": 5, "value": "a str"'
  ', "refs": []}',
 ]
 
@@ -77,6 +79,58 @@
     def test_load_example(self):
         objs = loader.load(_example_dump, show_prog=False)
 
+    def test_load_compressed(self):
+        # unfortunately NamedTemporaryFile's cannot be re-opened on Windows
+        fd, name = tempfile.mkstemp(prefix='meliae-')
+        f = os.fdopen(fd, 'wb')
+        try:
+            content = gzip.GzipFile(mode='wb', compresslevel=6, fileobj=f)
+            for line in _example_dump:
+                content.write(line + '\n')
+            content.flush()
+            content.close()
+            del content
+            f.close()
+            objs = loader.load(name, show_prog=False).objs
+            objs[1]
+        finally:
+            f.close()
+            os.remove(name)
+            
+
+
+class TestRemoveExpensiveReferences(tests.TestCase):
+
+    def test_remove_expensive_references(self):
+        lines = list(_example_dump)
+        lines.append('{"address": 8, "type": "module", "size": 12'
+                     ', "name": "mymod", "refs": [9]}')
+        lines.append('{"address": 9, "type": "dict", "size": 124'
+                     ', "refs": [10, 11]}')
+        lines.append('{"address": 10, "type": "module", "size": 12'
+                     ', "name": "mod2", "refs": [12]}')
+        lines.append('{"address": 11, "type": "str", "size": 27'
+                     ', "value": "boo", "refs": []}')
+        lines.append('{"address": 12, "type": "dict", "size": 124'
+                     ', "refs": []}')
+        source = lambda:loader.iter_objs(lines)
+        mymod_dict = list(source())[8]
+        self.assertEqual([10, 11], mymod_dict.ref_list)
+        result = list(loader.remove_expensive_references(source))
+        null_obj = result[0][1]
+        self.assertEqual(0, null_obj.address)
+        self.assertEqual('<ex-reference>', null_obj.type_str)
+        self.assertEqual([11, 0], result[9][1].ref_list)
+
+
+class TestMemObj(tests.TestCase):
+
+    def test_to_json(self):
+        objs = list(loader.iter_objs(_example_dump))
+        objs.sort(key=lambda x:x.address)
+        expected = sorted(_example_dump)
+        self.assertEqual(expected, [obj.to_json() for obj in objs])
+
 
 class TestObjManager(tests.TestCase):
 

=== added file 'remove_expensive_references.py'
--- a/remove_expensive_references.py	1970-01-01 00:00:00 +0000
+++ b/remove_expensive_references.py	2009-09-11 04:34:37 +0000
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+# Copyright (C) 2009 Canonical Ltd
+# 
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License and
+# the GNU Lesser General Public License along with this program.  If
+# not, see <http://www.gnu.org/licenses/>.
+
+"""Remove expensive references.
+
+This script takes 1 or two filenames and filters the first into either std out,
+or the second filename.
+"""
+
+import os
+import re
+import sys
+import time
+
+from meliae import files, loader
+
+
+def main(args):
+    import optparse
+    p = optparse.OptionParser(
+        '%prog INFILE [OUTFILE]')
+
+    opts, args = p.parse_args(args)
+    if len(args) > 2:
+        sys.stderr.write('We only support 2 filenames, not %d\n' % (len(args),))
+        return -1
+    if len(args) < 1:
+        sys.stderr.write("Must supply INFILE\n")
+        return -1
+
+    def source():
+        infile, cleanup = files.open_file(args[0])
+        for obj in loader.iter_objs(infile):
+            yield obj
+        cleanup()
+    if len(args) == 1:
+        outfile = sys.stdout
+    else:
+        outfile = open(args[1], 'wb')
+    for _, obj in loader.remove_expensive_references(source, show_progress=True):
+        outfile.write(obj.to_json() + '\n')   
+    outfile.flush()
+
+
+if __name__ == '__main__':
+    sys.exit(main(sys.argv[1:]))
+

=== modified file 'strip_duplicates.py'
--- a/strip_duplicates.py	2009-09-08 17:06:14 +0000
+++ b/strip_duplicates.py	2009-09-11 04:34:37 +0000
@@ -29,9 +29,11 @@
 import sys
 import time
 
+from meliae import files
+
 
 def strip_duplicate(infile, outfile, insize=None):
-    from memory_dump import _intset
+    from meliae import _intset
     seen = _intset.IntSet()
 
     address_re = re.compile(
@@ -71,22 +73,33 @@
         '%prog [INFILE [OUTFILE]]')
 
     opts, args = p.parse_args(args)
-
     if len(args) > 2:
         sys.stderr.write('We only support 2 filenames, not %d\n' % (len(args),))
         return -1
-    if len(args) == 0:
-        infile = sys.stdin
-        insize = None
-        outfile = sys.stdout
-    else:
-        infile = open(args[0], 'rb')
-        insize = os.fstat(infile.fileno()).st_size
-        if len(args) == 1:
+
+    cleanups = []
+    try:
+        if len(args) == 0:
+            infile = sys.stdin
+            insize = None
             outfile = sys.stdout
         else:
-            outfile = open(args[1], 'wb')
-    strip_duplicate(infile, outfile, insize)
+            infile, cleanup = files.open_file(args[0])
+            if cleanup is not None:
+                cleanups.append(cleanup)
+            if isinstance(infile, file):
+                # pipes are files, but 0 isn't useful.
+                insize = os.fstat(infile.fileno()).st_size or None
+            else:
+                insize = None
+            if len(args) == 1:
+                outfile = sys.stdout
+            else:
+                outfile = open(args[1], 'wb')
+        strip_duplicate(infile, outfile, insize)
+    finally:
+        for cleanup in cleanups:
+            cleanup()
 
 
 if __name__ == '__main__':



More information about the bazaar-commits mailing list