Rev 4099: Sketch out parallelisation for test suite execution. in http://people.ubuntu.com/~robertc/baz2.0/tests.parallel

Robert Collins robertc at robertcollins.net
Mon Mar 9 23:40:26 GMT 2009


At http://people.ubuntu.com/~robertc/baz2.0/tests.parallel

------------------------------------------------------------
revno: 4099
revision-id: robertc at robertcollins.net-20090309234018-2mvku884zbhxs7lm
parent: pqm at pqm.ubuntu.com-20090309084556-9i2m12qlud2qcrtw
committer: Robert Collins <robertc at robertcollins.net>
branch nick: tests.parallel
timestamp: Tue 2009-03-10 10:40:18 +1100
message:
  Sketch out parallelisation for test suite execution.
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py	2009-03-09 08:45:56 +0000
+++ b/bzrlib/tests/__init__.py	2009-03-09 23:40:18 +0000
@@ -33,6 +33,7 @@
 import doctest
 import errno
 import logging
+import math
 import os
 from pprint import pformat
 import random
@@ -2616,15 +2617,6 @@
         verbosity = 2
     else:
         verbosity = 1
-    if runner_class is None:
-        runner_class = TextTestRunner
-    runner = runner_class(stream=sys.stdout,
-                            descriptions=0,
-                            verbosity=verbosity,
-                            bench_history=bench_history,
-                            list_only=list_only,
-                            )
-    runner.stop_on_failure=stop_on_failure
     # Initialise the random number generator and display the seed used.
     # We convert the seed to a long to make it reuseable across invocations.
     random_order = False
@@ -2638,7 +2630,7 @@
                 random_seed = long(random_seed)
             except:
                 pass
-        runner.stream.writeln("Randomizing test order using seed %s\n" %
+        sys.stdout.writeln("Randomizing test order using seed %s\n" %
             (random_seed))
         random.seed(random_seed)
     # Customise the list of tests if requested
@@ -2655,6 +2647,25 @@
         else:
             suite = order_changer(filter_suite_by_re(suite, pattern))
 
+    # One-machine, forked, parallelism.
+    try:
+        prefix = 'processor'
+        for line in file('/proc/cpuinfo', 'rb').readlines():
+            if line.startswith(prefix):
+                concurrency = int(line[line.find(':')+1:-1]) + 1
+    except Exception, e:
+        concurrency = 1
+    if concurrency != 1:
+        suite = ConcurrentTestSuite(suite, concurrency)
+    if runner_class is None:
+        runner_class = TextTestRunner
+    runner = runner_class(stream=sys.stdout,
+        descriptions=0,
+        verbosity=verbosity,
+        bench_history=bench_history,
+        list_only=list_only,
+        )
+    runner.stop_on_failure=stop_on_failure
     result = runner.run(suite)
 
     if strict:
@@ -2663,6 +2674,131 @@
     return result.wasSuccessful()
 
 
+class ConcurrentTestSuite(object):
+    """A deliberate subset of TestSuite, in fact, its more a larger TestCase.
+
+    Runs in forked processes. That is all.
+    """
+
+    def __init__(self, suite, concurrency):
+        """Create a ConcurrentTestSuite to execute suite.
+        
+        :param concurrency: The number of processes to fork.
+        """
+        self._suite = suite
+        self.concurrency = concurrency
+
+    def countTestCases(self):
+        return self._suite.countTestCases()
+
+    def debug(self):
+        return self._suite.debug()
+
+    def run(self, result):
+        # XXX: implicit result support, I guess.
+        from subunit import (TestProtocolClient, ProtocolTestCase,
+            TestResultFilter,)
+        import Queue
+        tests = list(iter_suite_tests(self._suite))
+        tests_per_process = int(math.ceil(float(len(tests)) / self.concurrency))
+        processes = {}
+        for block in range(self.concurrency):
+            low_test = block * tests_per_process
+            high_test = low_test + tests_per_process
+            process_tests = tests[low_test:high_test]
+            process_suite = TestSuite()
+            process_suite.addTests(process_tests)
+            c2pread, c2pwrite = os.pipe()
+            result_semaphore = threading.Semaphore(1)
+            pid = os.fork()
+            if pid == 0:
+                try:
+                    os.close(c2pread)
+                    # Leave stderr and stdout open so we can see test noise
+                    # Close stdin so that the child goes away if it decides to
+                    # read from stdin (otherwise its a roulette to see what
+                    # child actually gets keystrokes for pdb etc.
+                    sys.stdin.close()
+                    sys.stdin = None
+                    stream = os.fdopen(c2pwrite, 'wb', 0)
+                    subunit_result = TestProtocolClient(stream)
+                    process_suite.run(subunit_result)
+                finally:
+                    os._exit(0)
+            else:
+                os.close(c2pwrite)
+                stream = os.fdopen(c2pread, 'rb', 0)
+                test = ProtocolTestCase(stream)
+                process_result = ThreadsafeForwardingResult(result,
+                    result_semaphore)
+                processes[pid] = (test, process_result)
+        threads = {}
+        queue = Queue.Queue()
+        for pid, (test, process_result) in processes.iteritems():
+            reader_thread = threading.Thread(target=self._run_process, args=(test,
+                process_result, pid, queue))
+            threads[pid] = reader_thread
+            reader_thread.start()
+            # test.run(process_result)
+        while threads:
+            finished_process = queue.get()
+            threads[finished_process].join()
+            del threads[finished_process]
+            os.waitpid(finished_process, os.WNOHANG)
+            print "pid %d finished" % finished_process
+        
+    def _run_process(self, test, process_result, pid, queue):
+        print "start"
+        test.run(process_result)
+        print "done"
+        queue.put(pid)
+
+
+class ThreadsafeForwardingResult(unittest.TestResult):
+    """A TestResult which ensures the target does not receive mixed up calls."""
+
+    def __init__(self, target, semaphore):
+        unittest.TestResult.__init__(self)
+        self.result = target
+        self.semaphore = semaphore
+
+    def addError(self, test, err):
+        self.semaphore.acquire()
+        try:
+            self.result.startTest(test)
+            self.result.addError(test, err)
+            self.result.stopTest(test)
+        finally:
+            self.semaphore.release()
+
+    def addFailure(self, test, err):
+        self.semaphore.acquire()
+        try:
+            self.result.startTest(test)
+            self.result.addFailure(test, err)
+            self.result.stopTest(test)
+        finally:
+            self.semaphore.release()
+
+    def addSkip(self, test, reason):
+        self.semaphore.acquire()
+        try:
+            self.result.startTest(test)
+            self.result.addSkip(test, reason)
+            self.result.stopTest(test)
+        finally:
+            self.semaphore.release()
+
+    def addSuccess(self, test):
+        self.semaphore.acquire()
+        try:
+            self.result.startTest(test)
+            self.result.addSuccess(test)
+            self.result.stopTest(test)
+        finally:
+            self.semaphore.release()
+
+
 # Controlled by "bzr selftest -E=..." option
 selftest_debug_flags = set()
 




More information about the bazaar-commits mailing list