Rev 81: Cleanups to get the test suite passing with py25 and Windows. in http://bazaar.launchpad.net/~meliae-dev/meliae/trunk

John Arbash Meinel john at arbash-meinel.com
Fri Sep 11 18:36:41 BST 2009


At http://bazaar.launchpad.net/~meliae-dev/meliae/trunk

------------------------------------------------------------
revno: 81
revision-id: john at arbash-meinel.com-20090911173624-o748syrxfya6m4dd
parent: john at arbash-meinel.com-20090911173540-kps21ahtv7vizg7w
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: trunk
timestamp: Fri 2009-09-11 12:36:24 -0500
message:
  Cleanups to get the test suite passing with py25 and Windows.
-------------- next part --------------
=== modified file 'meliae/files.py'
--- a/meliae/files.py	2009-09-11 00:19:07 +0000
+++ b/meliae/files.py	2009-09-11 17:36:24 +0000
@@ -21,8 +21,9 @@
 try:
     import multiprocessing
 except ImportError:
-    nultiprocessing = None
+    multiprocessing = None
 import subprocess
+import sys
 
 
 def open_file(filename):
@@ -41,12 +42,18 @@
         source.seek(0)
         return source, None
     else:
+        gzip_source.close()
+        source.close()
         # a gzip file
         # preference - a gzip subprocess
+        if sys.platform == 'win32':
+            close_fds = False # not supported
+        else:
+            close_fds = True
         try:
             process = subprocess.Popen(['gunzip', '-c', filename],
                 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
-                stderr=subprocess.PIPE, close_fds=True)
+                stderr=subprocess.PIPE, close_fds=close_fds)
         except OSError, e:
             if e.errno == errno.ENOENT:
                 # failing that, use another python process
@@ -56,13 +63,13 @@
         process.stderr.close()
         return process.stdout, process.terminate
 
+
 def _open_mprocess(filename):
     if multiprocessing is None:
         # can't multiprocess, use inprocess gzip.
         return gzip.GzipFile(filename, mode='rb'), None
     parent, child = multiprocessing.Pipe(False)
     def stream_file(filename, child):
-        source = open(filename, 'r')
         gzip_source = gzip.GzipFile(filename, 'rb')
         for line in gzip_source:
             child.send(line)

=== modified file 'meliae/loader.py'
--- a/meliae/loader.py	2009-09-11 04:34:37 +0000
+++ b/meliae/loader.py	2009-09-11 17:36:24 +0000
@@ -19,12 +19,7 @@
 Currently requires simplejson to parse.
 """
 
-import gzip
 import math
-try:
-    import multiprocessing
-except ImportError:
-    nultiprocessing = None
 import os
 import re
 import sys

=== modified file 'meliae/tests/test_loader.py'
--- a/meliae/tests/test_loader.py	2009-09-11 03:59:43 +0000
+++ b/meliae/tests/test_loader.py	2009-09-11 17:36:24 +0000
@@ -17,6 +17,7 @@
 """Read back in a dump file and process it"""
 
 import gzip
+import os
 import sys
 import tempfile
 
@@ -79,15 +80,23 @@
         objs = loader.load(_example_dump, show_prog=False)
 
     def test_load_compressed(self):
-        t = tempfile.NamedTemporaryFile(prefix='meliae-')
-        content = gzip.GzipFile(mode='wb', compresslevel=6, fileobj=t)
-        for line in _example_dump:
-            content.write(line + '\n')
-        content.flush()
-        content.close()
-        t.flush()
-        objs = loader.load(t.name, show_prog=False).objs
-        objs[1]
+        # unfortunately NamedTemporaryFile's cannot be re-opened on Windows
+        fd, name = tempfile.mkstemp(prefix='meliae-')
+        f = os.fdopen(fd, 'wb')
+        try:
+            content = gzip.GzipFile(mode='wb', compresslevel=6, fileobj=f)
+            for line in _example_dump:
+                content.write(line + '\n')
+            content.flush()
+            content.close()
+            del content
+            f.close()
+            objs = loader.load(name, show_prog=False).objs
+            objs[1]
+        finally:
+            f.close()
+            os.remove(name)
+            
 
 
 class TestRemoveExpensiveReferences(tests.TestCase):
@@ -122,6 +131,7 @@
         expected = sorted(_example_dump)
         self.assertEqual(expected, [obj.to_json() for obj in objs])
 
+
 class TestObjManager(tests.TestCase):
 
     def test_compute_referrers(self):



More information about the bazaar-commits mailing list