Rev 4207: Create fork and reinvoke parallel testing support. in http://people.ubuntu.com/~robertc/baz2.0/pending/tests.parallel

Robert Collins robertc at robertcollins.net
Fri Mar 27 09:31:54 GMT 2009


At http://people.ubuntu.com/~robertc/baz2.0/pending/tests.parallel

------------------------------------------------------------
revno: 4207
revision-id: robertc at robertcollins.net-20090327093143-gli7efprtwutx0p4
parent: robertc at robertcollins.net-20090326121545-t30g0qejzx7tknxl
committer: Robert Collins <robertc at robertcollins.net>
branch nick: tests.parallel
timestamp: Fri 2009-03-27 20:31:43 +1100
message:
  Create fork and reinvoke parallel testing support.
=== modified file 'bzrlib/builtins.py'
--- a/bzrlib/builtins.py	2009-03-24 12:15:01 +0000
+++ b/bzrlib/builtins.py	2009-03-27 09:31:43 +0000
@@ -3155,6 +3155,11 @@
                             ),
                      Option('list-only',
                             help='List the tests instead of running them.'),
+                     RegistryOption('parallel',
+                        help="Run the test suite in parallel",
+                        lazy_registry=('bzrlib.tests', 'parallel_registry'),
+                        value_switches=False,
+                        ),
                      Option('randomize', type=str, argname="SEED",
                             help='Randomize the order of tests using the given'
                                  ' seed or "now" for the current time.'),
@@ -3186,7 +3191,8 @@
             lsprof_timed=None, cache_dir=None,
             first=False, list_only=False,
             randomize=None, exclude=None, strict=False,
-            load_list=None, debugflag=None, starting_with=None, subunit=False):
+            load_list=None, debugflag=None, starting_with=None, subunit=False,
+            parallel=None):
         from bzrlib.tests import selftest
         import bzrlib.benchmarks as benchmarks
         from bzrlib.benchmarks import tree_creator
@@ -3215,6 +3221,9 @@
                 raise errors.BzrCommandError("subunit not available. subunit "
                     "needs to be installed to use --subunit.")
             self.additional_selftest_args['runner_class'] = SubUnitBzrRunner
+        if parallel:
+            self.additional_selftest_args.setdefault(
+                'suite_decorators', []).append(parallel)
         if benchmark:
             test_suite_factory = benchmarks.test_suite
             # Unless user explicitly asks for quiet, be verbose in benchmarks

=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py	2009-03-26 12:15:45 +0000
+++ b/bzrlib/tests/__init__.py	2009-03-27 09:31:43 +0000
@@ -33,13 +33,14 @@
 import doctest
 import errno
 import logging
+import math
 import os
 from pprint import pformat
 import random
 import re
 import shlex
 import stat
-from subprocess import Popen, PIPE
+from subprocess import Popen, PIPE, STDOUT
 import sys
 import tempfile
 import threading
@@ -543,7 +544,15 @@
                 run += 1
             actionTaken = "Listed"
         else:
-            test.run(result)
+            try:
+                from testtools import ThreadsafeForwardingResult
+            except ImportError:
+                test.run(result)
+            else:
+                if type(result) == ThreadsafeForwardingResult:
+                    test.run(BZRTransformingResult(result))
+                else:
+                    test.run(result)
             run = result.testsRun
             actionTaken = "Ran"
         stopTime = time.time()
@@ -2227,7 +2236,8 @@
         For TestCaseInTempDir we create a temporary directory based on the test
         name and then create two subdirs - test and home under it.
         """
-        name_prefix = osutils.pathjoin(self.TEST_ROOT, self._getTestDirPrefix())
+        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
+            self._getTestDirPrefix())
         name = name_prefix
         for i in range(100):
             if os.path.exists(name):
@@ -2251,7 +2261,7 @@
         self.addCleanup(self.deleteTestDir)
 
     def deleteTestDir(self):
-        os.chdir(self.TEST_ROOT)
+        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
         _rmtree_temp_dir(self.test_base_dir)
 
     def build_tree(self, shape, line_endings='binary', transport=None):
@@ -2671,6 +2681,24 @@
         return result.wasSuccessful()
 
 
+# A registry where get() returns a suite decorator.
+parallel_registry = registry.Registry()
+def fork_decorator(suite):
+    concurrency = local_concurrency()
+    if concurrency == 1:
+        return suite
+    from testtools import ConcurrentTestSuite
+    return ConcurrentTestSuite(suite, fork_for_tests)
+parallel_registry.register('fork', fork_decorator)
+def subprocess_decorator(suite):
+    concurrency = local_concurrency()
+    if concurrency == 1:
+        return suite
+    from testtools import ConcurrentTestSuite
+    return ConcurrentTestSuite(suite, reinvoke_for_tests)
+parallel_registry.register('subprocess', subprocess_decorator)
+
+
 def exclude_tests(exclude_pattern):
     """Return a test suite decorator that excludes tests."""
     if exclude_pattern is None:
@@ -2836,6 +2864,194 @@
         return iter(self._tests)
 
 
+def partition_tests(suite, count):
+    """Partition suite into count lists of tests."""
+    result = []
+    tests = list(iter_suite_tests(suite))
+    tests_per_process = int(math.ceil(float(len(tests)) / count))
+    for block in range(count):
+        low_test = block * tests_per_process
+        high_test = low_test + tests_per_process
+        process_tests = tests[low_test:high_test]
+        result.append(process_tests)
+    return result
+
+
+def fork_for_tests(suite):
+    """Take suite and start up one runner per CPU by forking()
+
+    :return: An iterable of TestCase-like objects which can each have
+        run(result) called on them to feed tests to result, and
+        cleanup() called on them to stop them/kill children/end threads.
+    """
+    concurrency = local_concurrency()
+    result = []
+    from subunit import TestProtocolClient, ProtocolTestCase
+    class TestInOtherProcess(ProtocolTestCase):
+        # Should be in subunit, I think. RBC.
+        def __init__(self, stream, pid):
+            ProtocolTestCase.__init__(self, stream)
+            self.pid = pid
+
+        def run(self, result):
+            try:
+                ProtocolTestCase.run(self, result)
+            finally:
+                os.waitpid(self.pid, os.WNOHANG)
+            # print "pid %d finished" % finished_process
+
+    test_blocks = partition_tests(suite, concurrency)
+    for process_tests in test_blocks:
+        process_suite = TestSuite()
+        process_suite.addTests(process_tests)
+        c2pread, c2pwrite = os.pipe()
+        pid = os.fork()
+        if pid == 0:
+            try:
+                os.close(c2pread)
+                # Leave stderr and stdout open so we can see test noise
+                # Close stdin so that the child goes away if it decides to
+                # read from stdin (otherwise its a roulette to see what
+                # child actually gets keystrokes for pdb etc.
+                sys.stdin.close()
+                sys.stdin = None
+                stream = os.fdopen(c2pwrite, 'wb', 0)
+                subunit_result = TestProtocolClient(stream)
+                process_suite.run(subunit_result)
+            finally:
+                os._exit(0)
+        else:
+            os.close(c2pwrite)
+            stream = os.fdopen(c2pread, 'rb', 1)
+            test = TestInOtherProcess(stream, pid)
+            result.append(test)
+    return result
+
+
+def reinvoke_for_tests(suite):
+    """Take suite and start up one runner per CPU using subprocess().
+
+    :return: An iterable of TestCase-like objects which can each have
+        run(result) called on them to feed tests to result, and
+        cleanup() called on them to stop them/kill children/end threads.
+    """
+    concurrency = local_concurrency()
+    result = []
+    from subunit import TestProtocolClient, ProtocolTestCase
+    class TestInSubprocess(ProtocolTestCase):
+        def __init__(self, process, name):
+            ProtocolTestCase.__init__(self, process.stdout)
+            self.process = process
+            self.process.stdin.close()
+            self.name = name
+
+        def run(self, result):
+            try:
+                ProtocolTestCase.run(self, result)
+            finally:
+                self.process.wait()
+                os.unlink(self.name)
+            # print "pid %d finished" % finished_process
+    test_blocks = partition_tests(suite, concurrency)
+    for process_tests in test_blocks:
+        # ugly; currently reimplement rather than reuses TestCase methods.
+        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
+        if not os.path.isfile(bzr_path):
+            # We are probably installed. Assume sys.argv is the right file
+            bzr_path = sys.argv[0]
+        fd, test_list_file_name = tempfile.mkstemp()
+        test_list_file = os.fdopen(fd, 'wb', 1)
+        for test in process_tests:
+            test_list_file.write(test.id() + '\n')
+        test_list_file.close()
+        try:
+            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
+                '--subunit']
+            # Note that without subunit in core - epic FAIL. But we don't really
+            # care anyway because load-list means we don't get plugin tests.
+            if '--no-plugins' in sys.argv:
+                argv.append('--no-plugins')
+            # stderr=STDOUT would be ideal, but until we prevent noise on
+            # stderr it can interrupt the subunit protocol.
+            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
+                bufsize=1)
+            test = TestInSubprocess(process, test_list_file_name)
+            result.append(test)
+        except:
+            os.unlink(test_list_file_name)
+            raise
+    return result
+
+
+def cpucount(content):
+    lines = content.splitlines()
+    prefix = 'processor'
+    for line in lines:
+        if line.startswith(prefix):
+            concurrency = int(line[line.find(':')+1:]) + 1
+    return concurrency
+
+
+def local_concurrency():
+    try:
+        content = file('/proc/cpuinfo', 'rb').read()
+        concurrency = cpucount(content)
+    except Exception, e:
+        concurrency = 1
+    return concurrency
+
+
+class BZRTransformingResult(unittest.TestResult):
+
+    def __init__(self, target):
+        unittest.TestResult.__init__(self)
+        self.result = target
+
+    def startTest(self, test):
+        self.result.startTest(test)
+
+    def stopTest(self, test):
+        self.result.stopTest(test)
+
+    def addError(self, test, err):
+        feature = self._error_looks_like('UnavailableFeature: ', err)
+        if feature is not None:
+            self.result.addNotSupported(test, feature)
+        else:
+            self.result.addError(test, err)
+
+    def addFailure(self, test, err):
+        known = self._error_looks_like('KnownFailure: ', err)
+        if known is not None:
+            self.result._addKnownFailure(test, [KnownFailure,
+                                                KnownFailure(known), None])
+        else:
+            self.result.addFailure(test, err)
+
+    def addSkip(self, test, reason):
+        self.result.addSkip(test, reason)
+
+    def addSuccess(self, test):
+        self.result.addSuccess(test)
+
+    def _error_looks_like(self, prefix, err):
+        """Deserialize exception and returns the stringify value."""
+        import subunit
+        value = None
+        typ, exc, _ = err
+        if isinstance(exc, subunit.RemoteException):
+            # stringify the exception gives access to the remote traceback
+            # We search the last line for 'prefix'
+            lines = str(exc).split('\n')
+            if len(lines) > 1:
+                last = lines[-2] # -1 is empty, final \n
+            else:
+                last = lines[-1]
+            if last.startswith(prefix):
+                value = last[len(prefix):]
+        return value
+
+
 # Controlled by "bzr selftest -E=..." option
 selftest_debug_flags = set()
 
@@ -2854,6 +3070,7 @@
              debug_flags=None,
              starting_with=None,
              runner_class=None,
+             suite_decorators=None,
              ):
     """Run the whole test suite under the enhanced runner"""
     # XXX: Very ugly way to do this...
@@ -2891,6 +3108,7 @@
                      exclude_pattern=exclude_pattern,
                      strict=strict,
                      runner_class=runner_class,
+                     suite_decorators=suite_decorators,
                      )
     finally:
         default_transport = old_transport




More information about the bazaar-commits mailing list