Rev 4207: Create fork and reinvoke parallel testing support. in http://people.ubuntu.com/~robertc/baz2.0/pending/tests.parallel
Robert Collins
robertc at robertcollins.net
Fri Mar 27 09:31:54 GMT 2009
At http://people.ubuntu.com/~robertc/baz2.0/pending/tests.parallel
------------------------------------------------------------
revno: 4207
revision-id: robertc at robertcollins.net-20090327093143-gli7efprtwutx0p4
parent: robertc at robertcollins.net-20090326121545-t30g0qejzx7tknxl
committer: Robert Collins <robertc at robertcollins.net>
branch nick: tests.parallel
timestamp: Fri 2009-03-27 20:31:43 +1100
message:
Create fork and reinvoke parallel testing support.
=== modified file 'bzrlib/builtins.py'
--- a/bzrlib/builtins.py 2009-03-24 12:15:01 +0000
+++ b/bzrlib/builtins.py 2009-03-27 09:31:43 +0000
@@ -3155,6 +3155,11 @@
),
Option('list-only',
help='List the tests instead of running them.'),
+ RegistryOption('parallel',
+ help="Run the test suite in parallel",
+ lazy_registry=('bzrlib.tests', 'parallel_registry'),
+ value_switches=False,
+ ),
Option('randomize', type=str, argname="SEED",
help='Randomize the order of tests using the given'
' seed or "now" for the current time.'),
@@ -3186,7 +3191,8 @@
lsprof_timed=None, cache_dir=None,
first=False, list_only=False,
randomize=None, exclude=None, strict=False,
- load_list=None, debugflag=None, starting_with=None, subunit=False):
+ load_list=None, debugflag=None, starting_with=None, subunit=False,
+ parallel=None):
from bzrlib.tests import selftest
import bzrlib.benchmarks as benchmarks
from bzrlib.benchmarks import tree_creator
@@ -3215,6 +3221,9 @@
raise errors.BzrCommandError("subunit not available. subunit "
"needs to be installed to use --subunit.")
self.additional_selftest_args['runner_class'] = SubUnitBzrRunner
+ if parallel:
+ self.additional_selftest_args.setdefault(
+ 'suite_decorators', []).append(parallel)
if benchmark:
test_suite_factory = benchmarks.test_suite
# Unless user explicitly asks for quiet, be verbose in benchmarks
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py 2009-03-26 12:15:45 +0000
+++ b/bzrlib/tests/__init__.py 2009-03-27 09:31:43 +0000
@@ -33,13 +33,14 @@
import doctest
import errno
import logging
+import math
import os
from pprint import pformat
import random
import re
import shlex
import stat
-from subprocess import Popen, PIPE
+from subprocess import Popen, PIPE, STDOUT
import sys
import tempfile
import threading
@@ -543,7 +544,15 @@
run += 1
actionTaken = "Listed"
else:
- test.run(result)
+ try:
+ from testtools import ThreadsafeForwardingResult
+ except ImportError:
+ test.run(result)
+ else:
+ if type(result) == ThreadsafeForwardingResult:
+ test.run(BZRTransformingResult(result))
+ else:
+ test.run(result)
run = result.testsRun
actionTaken = "Ran"
stopTime = time.time()
@@ -2227,7 +2236,8 @@
For TestCaseInTempDir we create a temporary directory based on the test
name and then create two subdirs - test and home under it.
"""
- name_prefix = osutils.pathjoin(self.TEST_ROOT, self._getTestDirPrefix())
+ name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
+ self._getTestDirPrefix())
name = name_prefix
for i in range(100):
if os.path.exists(name):
@@ -2251,7 +2261,7 @@
self.addCleanup(self.deleteTestDir)
def deleteTestDir(self):
- os.chdir(self.TEST_ROOT)
+ os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
_rmtree_temp_dir(self.test_base_dir)
def build_tree(self, shape, line_endings='binary', transport=None):
@@ -2671,6 +2681,24 @@
return result.wasSuccessful()
+# A registry where get() returns a suite decorator.
+parallel_registry = registry.Registry()
+def fork_decorator(suite):
+ concurrency = local_concurrency()
+ if concurrency == 1:
+ return suite
+ from testtools import ConcurrentTestSuite
+ return ConcurrentTestSuite(suite, fork_for_tests)
+parallel_registry.register('fork', fork_decorator)
+def subprocess_decorator(suite):
+ concurrency = local_concurrency()
+ if concurrency == 1:
+ return suite
+ from testtools import ConcurrentTestSuite
+ return ConcurrentTestSuite(suite, reinvoke_for_tests)
+parallel_registry.register('subprocess', subprocess_decorator)
+
+
def exclude_tests(exclude_pattern):
"""Return a test suite decorator that excludes tests."""
if exclude_pattern is None:
@@ -2836,6 +2864,194 @@
return iter(self._tests)
+def partition_tests(suite, count):
+ """Partition suite into count lists of tests."""
+ result = []
+ tests = list(iter_suite_tests(suite))
+ tests_per_process = int(math.ceil(float(len(tests)) / count))
+ for block in range(count):
+ low_test = block * tests_per_process
+ high_test = low_test + tests_per_process
+ process_tests = tests[low_test:high_test]
+ result.append(process_tests)
+ return result
+
+
+def fork_for_tests(suite):
+ """Take suite and start up one runner per CPU by forking()
+
+ :return: An iterable of TestCase-like objects which can each have
+ run(result) called on them to feed tests to result, and
+ cleanup() called on them to stop them/kill children/end threads.
+ """
+ concurrency = local_concurrency()
+ result = []
+ from subunit import TestProtocolClient, ProtocolTestCase
+ class TestInOtherProcess(ProtocolTestCase):
+ # Should be in subunit, I think. RBC.
+ def __init__(self, stream, pid):
+ ProtocolTestCase.__init__(self, stream)
+ self.pid = pid
+
+ def run(self, result):
+ try:
+ ProtocolTestCase.run(self, result)
+ finally:
+ os.waitpid(self.pid, os.WNOHANG)
+ # print "pid %d finished" % finished_process
+
+ test_blocks = partition_tests(suite, concurrency)
+ for process_tests in test_blocks:
+ process_suite = TestSuite()
+ process_suite.addTests(process_tests)
+ c2pread, c2pwrite = os.pipe()
+ pid = os.fork()
+ if pid == 0:
+ try:
+ os.close(c2pread)
+ # Leave stderr and stdout open so we can see test noise
+ # Close stdin so that the child goes away if it decides to
+ # read from stdin (otherwise its a roulette to see what
+ # child actually gets keystrokes for pdb etc.
+ sys.stdin.close()
+ sys.stdin = None
+ stream = os.fdopen(c2pwrite, 'wb', 0)
+ subunit_result = TestProtocolClient(stream)
+ process_suite.run(subunit_result)
+ finally:
+ os._exit(0)
+ else:
+ os.close(c2pwrite)
+ stream = os.fdopen(c2pread, 'rb', 1)
+ test = TestInOtherProcess(stream, pid)
+ result.append(test)
+ return result
+
+
+def reinvoke_for_tests(suite):
+ """Take suite and start up one runner per CPU using subprocess().
+
+ :return: An iterable of TestCase-like objects which can each have
+ run(result) called on them to feed tests to result, and
+ cleanup() called on them to stop them/kill children/end threads.
+ """
+ concurrency = local_concurrency()
+ result = []
+ from subunit import TestProtocolClient, ProtocolTestCase
+ class TestInSubprocess(ProtocolTestCase):
+ def __init__(self, process, name):
+ ProtocolTestCase.__init__(self, process.stdout)
+ self.process = process
+ self.process.stdin.close()
+ self.name = name
+
+ def run(self, result):
+ try:
+ ProtocolTestCase.run(self, result)
+ finally:
+ self.process.wait()
+ os.unlink(self.name)
+ # print "pid %d finished" % finished_process
+ test_blocks = partition_tests(suite, concurrency)
+ for process_tests in test_blocks:
+ # ugly; currently reimplement rather than reuses TestCase methods.
+ bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
+ if not os.path.isfile(bzr_path):
+ # We are probably installed. Assume sys.argv is the right file
+ bzr_path = sys.argv[0]
+ fd, test_list_file_name = tempfile.mkstemp()
+ test_list_file = os.fdopen(fd, 'wb', 1)
+ for test in process_tests:
+ test_list_file.write(test.id() + '\n')
+ test_list_file.close()
+ try:
+ argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
+ '--subunit']
+ # Note that without subunit in core - epic FAIL. But we don't really
+ # care anyway because load-list means we don't get plugin tests.
+ if '--no-plugins' in sys.argv:
+ argv.append('--no-plugins')
+ # stderr=STDOUT would be ideal, but until we prevent noise on
+ # stderr it can interrupt the subunit protocol.
+ process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
+ bufsize=1)
+ test = TestInSubprocess(process, test_list_file_name)
+ result.append(test)
+ except:
+ os.unlink(test_list_file_name)
+ raise
+ return result
+
+
+def cpucount(content):
+ lines = content.splitlines()
+ prefix = 'processor'
+ for line in lines:
+ if line.startswith(prefix):
+ concurrency = int(line[line.find(':')+1:]) + 1
+ return concurrency
+
+
+def local_concurrency():
+ try:
+ content = file('/proc/cpuinfo', 'rb').read()
+ concurrency = cpucount(content)
+ except Exception, e:
+ concurrency = 1
+ return concurrency
+
+
+class BZRTransformingResult(unittest.TestResult):
+
+ def __init__(self, target):
+ unittest.TestResult.__init__(self)
+ self.result = target
+
+ def startTest(self, test):
+ self.result.startTest(test)
+
+ def stopTest(self, test):
+ self.result.stopTest(test)
+
+ def addError(self, test, err):
+ feature = self._error_looks_like('UnavailableFeature: ', err)
+ if feature is not None:
+ self.result.addNotSupported(test, feature)
+ else:
+ self.result.addError(test, err)
+
+ def addFailure(self, test, err):
+ known = self._error_looks_like('KnownFailure: ', err)
+ if known is not None:
+ self.result._addKnownFailure(test, [KnownFailure,
+ KnownFailure(known), None])
+ else:
+ self.result.addFailure(test, err)
+
+ def addSkip(self, test, reason):
+ self.result.addSkip(test, reason)
+
+ def addSuccess(self, test):
+ self.result.addSuccess(test)
+
+ def _error_looks_like(self, prefix, err):
+ """Deserialize exception and returns the stringify value."""
+ import subunit
+ value = None
+ typ, exc, _ = err
+ if isinstance(exc, subunit.RemoteException):
+ # stringify the exception gives access to the remote traceback
+ # We search the last line for 'prefix'
+ lines = str(exc).split('\n')
+ if len(lines) > 1:
+ last = lines[-2] # -1 is empty, final \n
+ else:
+ last = lines[-1]
+ if last.startswith(prefix):
+ value = last[len(prefix):]
+ return value
+
+
# Controlled by "bzr selftest -E=..." option
selftest_debug_flags = set()
@@ -2854,6 +3070,7 @@
debug_flags=None,
starting_with=None,
runner_class=None,
+ suite_decorators=None,
):
"""Run the whole test suite under the enhanced runner"""
# XXX: Very ugly way to do this...
@@ -2891,6 +3108,7 @@
exclude_pattern=exclude_pattern,
strict=strict,
runner_class=runner_class,
+ suite_decorators=suite_decorators,
)
finally:
default_transport = old_transport
More information about the bazaar-commits
mailing list