Rev 4621: Start hacking on balancing parallel selftest. in file:///home/vila/src/bzr/bugs/selftest/

Vincent Ladeuil v.ladeuil+lp at free.fr
Tue Aug 18 14:16:48 BST 2009


At file:///home/vila/src/bzr/bugs/selftest/

------------------------------------------------------------
revno: 4621
revision-id: v.ladeuil+lp at free.fr-20090818131648-wh0zg7jck90n76pd
parent: v.ladeuil+lp at free.fr-20090817170424-axb52smg54movnon
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: selftest
timestamp: Tue 2009-08-18 15:16:48 +0200
message:
  Start hacking on balancing parallel selftest.
  
  * bzrlib/tests/__init__.py:
  (fork_for_tests): Start balancing forked selftest. This version
  does not work.
-------------- next part --------------
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py	2009-08-17 17:04:24 +0000
+++ b/bzrlib/tests/__init__.py	2009-08-18 13:16:48 +0000
@@ -3032,49 +3032,104 @@
         run(result) called on them to feed tests to result.
     """
     concurrency = osutils.local_concurrency()
+    concurrency = 2
     result = []
+
     from subunit import TestProtocolClient, ProtocolTestCase
     try:
         from subunit.test_results import AutoTimingTestResultDecorator
     except ImportError:
         AutoTimingTestResultDecorator = lambda x:x
-    class TestInOtherProcess(ProtocolTestCase):
+
+    all_tests = list(iter_suite_tests(suite))
+    nb_tests = len(all_tests)
+    shared_cur_test = [0]
+
+    class TestInOtherProcess(object):
         # Should be in subunit, I think. RBC.
-        def __init__(self, stream, pid):
-            ProtocolTestCase.__init__(self, stream)
+        def __init__(self, feedback, control, pid):
+            self.feedback = feedback
+            self.control = control
             self.pid = pid
 
         def run(self, result):
             try:
-                ProtocolTestCase.run(self, result)
+                print 'start monitoring %d' % (self.pid,)
+                cur_test = shared_cur_test[0]
+                while cur_test < nb_tests:
+                    # The slice size should be a balance between the overhead
+                    # of processing a slice, and the ability to feed as many
+                    # children as possible for the longest possible time (or
+                    # said otherwise: the last standing child should run alone
+                    # for the shortest possible time). So we start by saying
+                    # that each child will handle as many slices as there are
+                    # children and reducing the slice size from there.
+                    slice_size = ((nb_tests - cur_test)
+                                  / (concurrency * concurrency)) + 1
+                    print '%d/%d -> %d' % (cur_test, nb_tests, slice_size)
+                    # give a slice to first free child
+                    first, last = cur_test, cur_test + slice_size
+                    shared_cur_test[0] = cur_test + slice_size
+                    self.control.write('%d %d\n' % (first, last))
+                    print 'Sent: [%d,%d[ to %d\n' % (first, last, self.pid)
+                    if cur_test > nb_tests:
+                        print 'stop monitoring %d' % (self.pid,)
+                        break
+                    print 'receiving results from %d' %  (self.pid,)
+                    test = ProtocolTestCase(self.feedback)
+                    test.run(result)
+                    print 'results received from %d' %  (self.pid,)
+                    cur_test = shared_cur_test[0]
+                self.control.close()
             finally:
                 os.waitpid(self.pid, os.WNOHANG)
 
-    test_blocks = partition_tests(suite, concurrency)
-    for process_tests in test_blocks:
-        process_suite = TestSuite()
-        process_suite.addTests(process_tests)
-        c2pread, c2pwrite = os.pipe()
+
+#    test_blocks = partition_tests(suite, concurrency)
+    for proc in range(0, concurrency):
+        (c_read, c_write) = os.pipe()
+        (f_read, f_write) = os.pipe()
         pid = os.fork()
         if pid == 0:
+            pid = os.getpid()
             try:
-                os.close(c2pread)
+                control = os.fdopen(c_read, 'rb', 1)
+                os.close(c_write)
                 # Leave stderr and stdout open so we can see test noise
                 # Close stdin so that the child goes away if it decides to
                 # read from stdin (otherwise its a roulette to see what
                 # child actually gets keystrokes for pdb etc).
                 sys.stdin.close()
                 sys.stdin = None
-                stream = os.fdopen(c2pwrite, 'wb', 1)
-                subunit_result = AutoTimingTestResultDecorator(
-                    TestProtocolClient(stream))
-                process_suite.run(subunit_result)
+                feedback = os.fdopen(f_write, 'wb', 1)
+                os.close(f_read)
+                line = control.readline()
+                while line:
+                    subunit_result = AutoTimingTestResultDecorator(
+                        TestProtocolClient(feedback))
+                    print '[%d] Read: [%s]' % (pid, line,)
+                    first, last = line.split()
+                    first, last = int(first), int(last)
+                    print '[%d] Received: [%d,%d[' % (pid, first, last)
+                    process_suite = TestSuite()
+                    process_suite.addTests(all_tests[first:last])
+                    print '[%d] process_suite created' % (pid,)
+                    process_suite.run(subunit_result)
+                    print '[%d] process_suite ran' % (pid,)
+                    subunit_result.done()
+                    print '[%d] about to read control' % (pid,)
+                    line = control.readline()
+                print '[%d] Closing control' % (pid,)
+                control.close()
+                feedback.close()
             finally:
                 os._exit(0)
         else:
-            os.close(c2pwrite)
-            stream = os.fdopen(c2pread, 'rb', 1)
-            test = TestInOtherProcess(stream, pid)
+            control = os.fdopen(c_write, 'wb', 1)
+            os.close(c_read)
+            feedback = os.fdopen(f_read, 'rb', 1)
+            os.close(f_write)
+            test = TestInOtherProcess(feedback, control, pid)
             result.append(test)
     return result
 



More information about the bazaar-commits mailing list