Rev 4099: Sketch out parallelisation for test suite execution. in http://people.ubuntu.com/~robertc/baz2.0/tests.parallel
Robert Collins
robertc at robertcollins.net
Mon Mar 9 23:40:26 GMT 2009
At http://people.ubuntu.com/~robertc/baz2.0/tests.parallel
------------------------------------------------------------
revno: 4099
revision-id: robertc at robertcollins.net-20090309234018-2mvku884zbhxs7lm
parent: pqm at pqm.ubuntu.com-20090309084556-9i2m12qlud2qcrtw
committer: Robert Collins <robertc at robertcollins.net>
branch nick: tests.parallel
timestamp: Tue 2009-03-10 10:40:18 +1100
message:
Sketch out parallelisation for test suite execution.
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py 2009-03-09 08:45:56 +0000
+++ b/bzrlib/tests/__init__.py 2009-03-09 23:40:18 +0000
@@ -33,6 +33,7 @@
import doctest
import errno
import logging
+import math
import os
from pprint import pformat
import random
@@ -2616,15 +2617,6 @@
verbosity = 2
else:
verbosity = 1
- if runner_class is None:
- runner_class = TextTestRunner
- runner = runner_class(stream=sys.stdout,
- descriptions=0,
- verbosity=verbosity,
- bench_history=bench_history,
- list_only=list_only,
- )
- runner.stop_on_failure=stop_on_failure
# Initialise the random number generator and display the seed used.
# We convert the seed to a long to make it reuseable across invocations.
random_order = False
@@ -2638,7 +2630,7 @@
random_seed = long(random_seed)
except:
pass
- runner.stream.writeln("Randomizing test order using seed %s\n" %
+ sys.stdout.writeln("Randomizing test order using seed %s\n" %
(random_seed))
random.seed(random_seed)
# Customise the list of tests if requested
@@ -2655,6 +2647,25 @@
else:
suite = order_changer(filter_suite_by_re(suite, pattern))
+ # One-machine, forked, parallelism.
+ try:
+ prefix = 'processor'
+ for line in file('/proc/cpuinfo', 'rb').readlines():
+ if line.startswith(prefix):
+ concurrency = int(line[line.find(':')+1:-1]) + 1
+ except Exception, e:
+ concurrency = 1
+ if concurrency != 1:
+ suite = ConcurrentTestSuite(suite, concurrency)
+ if runner_class is None:
+ runner_class = TextTestRunner
+ runner = runner_class(stream=sys.stdout,
+ descriptions=0,
+ verbosity=verbosity,
+ bench_history=bench_history,
+ list_only=list_only,
+ )
+ runner.stop_on_failure=stop_on_failure
result = runner.run(suite)
if strict:
@@ -2663,6 +2674,131 @@
return result.wasSuccessful()
+class ConcurrentTestSuite(object):
+ """A deliberate subset of TestSuite, in fact, its more a larger TestCase.
+
+ Runs in forked processes. That is all.
+ """
+
+ def __init__(self, suite, concurrency):
+ """Create a ConcurrentTestSuite to execute suite.
+
+ :param concurrency: The number of processes to fork.
+ """
+ self._suite = suite
+ self.concurrency = concurrency
+
+ def countTestCases(self):
+ return self._suite.countTestCases()
+
+ def debug(self):
+ return self._suite.debug()
+
+ def run(self, result):
+ # XXX: implicit result support, I guess.
+ from subunit import (TestProtocolClient, ProtocolTestCase,
+ TestResultFilter,)
+ import Queue
+ tests = list(iter_suite_tests(self._suite))
+ tests_per_process = int(math.ceil(float(len(tests)) / self.concurrency))
+ processes = {}
+ for block in range(self.concurrency):
+ low_test = block * tests_per_process
+ high_test = low_test + tests_per_process
+ process_tests = tests[low_test:high_test]
+ process_suite = TestSuite()
+ process_suite.addTests(process_tests)
+ c2pread, c2pwrite = os.pipe()
+ result_semaphore = threading.Semaphore(1)
+ pid = os.fork()
+ if pid == 0:
+ try:
+ os.close(c2pread)
+ # Leave stderr and stdout open so we can see test noise
+ # Close stdin so that the child goes away if it decides to
+ # read from stdin (otherwise its a roulette to see what
+ # child actually gets keystrokes for pdb etc.
+ sys.stdin.close()
+ sys.stdin = None
+ stream = os.fdopen(c2pwrite, 'wb', 0)
+ subunit_result = TestProtocolClient(stream)
+ process_suite.run(subunit_result)
+ finally:
+ os._exit(0)
+ else:
+ os.close(c2pwrite)
+ stream = os.fdopen(c2pread, 'rb', 0)
+ test = ProtocolTestCase(stream)
+ process_result = ThreadsafeForwardingResult(result,
+ result_semaphore)
+ processes[pid] = (test, process_result)
+ threads = {}
+ queue = Queue.Queue()
+ for pid, (test, process_result) in processes.iteritems():
+ reader_thread = threading.Thread(target=self._run_process, args=(test,
+ process_result, pid, queue))
+ threads[pid] = reader_thread
+ reader_thread.start()
+ # test.run(process_result)
+ while threads:
+ finished_process = queue.get()
+ threads[finished_process].join()
+ del threads[finished_process]
+ os.waitpid(finished_process, os.WNOHANG)
+ print "pid %d finished" % finished_process
+
+ def _run_process(self, test, process_result, pid, queue):
+ print "start"
+ test.run(process_result)
+ print "done"
+ queue.put(pid)
+
+
+class ThreadsafeForwardingResult(unittest.TestResult):
+ """A TestResult which ensures the target does not receive mixed up calls."""
+
+ def __init__(self, target, semaphore):
+ unittest.TestResult.__init__(self)
+ self.result = target
+ self.semaphore = semaphore
+
+ def addError(self, test, err):
+ self.semaphore.acquire()
+ try:
+ self.result.startTest(test)
+ self.result.addError(test, err)
+ self.result.stopTest(test)
+ finally:
+ self.semaphore.release()
+
+ def addFailure(self, test, err):
+ self.semaphore.acquire()
+ try:
+ self.result.startTest(test)
+ self.result.addFailure(test, err)
+ self.result.stopTest(test)
+ finally:
+ self.semaphore.release()
+
+ def addSkip(self, test, reason):
+ self.semaphore.acquire()
+ try:
+ self.result.startTest(test)
+ self.result.addSkip(test, reason)
+ self.result.stopTest(test)
+ finally:
+ self.semaphore.release()
+
+ def addSuccess(self, test):
+ self.semaphore.acquire()
+ try:
+ self.result.startTest(test)
+ self.result.addSuccess(test)
+ self.result.stopTest(test)
+ finally:
+ self.semaphore.release()
+
+
# Controlled by "bzr selftest -E=..." option
selftest_debug_flags = set()
More information about the bazaar-commits
mailing list