Rev 4622: Implement a balancing scheme to maximize processor utilisation. in file:///home/vila/src/bzr/bugs/selftest/

Vincent Ladeuil v.ladeuil+lp at free.fr
Tue Aug 18 18:02:01 BST 2009


At file:///home/vila/src/bzr/bugs/selftest/

------------------------------------------------------------
revno: 4622
revision-id: v.ladeuil+lp at free.fr-20090818170201-s4pz2jzh7522e4x0
parent: v.ladeuil+lp at free.fr-20090818131648-wh0zg7jck90n76pd
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: selftest
timestamp: Tue 2009-08-18 19:02:01 +0200
message:
  Implement a balancing scheme to maximize processor utilisation.
  
  * bzrlib/tests/__init__.py:
  (fork_for_tests): Change the palce we fork to better control which
  tests are run where.
-------------- next part --------------
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py	2009-08-18 13:16:48 +0000
+++ b/bzrlib/tests/__init__.py	2009-08-18 17:02:01 +0000
@@ -3032,7 +3032,6 @@
         run(result) called on them to feed tests to result.
     """
     concurrency = osutils.local_concurrency()
-    concurrency = 2
     result = []
 
     from subunit import TestProtocolClient, ProtocolTestCase
@@ -3045,92 +3044,79 @@
     nb_tests = len(all_tests)
     shared_cur_test = [0]
 
+    import threading
     class TestInOtherProcess(object):
-        # Should be in subunit, I think. RBC.
-        def __init__(self, feedback, control, pid):
-            self.feedback = feedback
-            self.control = control
-            self.pid = pid
+
+        def __init__(self, suite_semaphore, rank):
+            self.suite_semaphore = suite_semaphore
+            self.rank = rank
+            self.nb_slices = 0
+            self.nb_tests = 0
 
         def run(self, result):
-            try:
-                print 'start monitoring %d' % (self.pid,)
+            self.suite_semaphore.acquire()
+            cur_test = shared_cur_test[0]
+            while cur_test < nb_tests:
+                # The slice size should be a balance between the overhead of
+                # processing a slice, and the ability to feed as many children
+                # as possible for the longest possible time (or said otherwise:
+                # the last standing child should run alone for the shortest
+                # possible time). So we start by saying that each child will
+                # handle as many slices as there are children and reducing the
+                # slice size from there.
+                slice_size = ((nb_tests - cur_test)
+                              / (concurrency * concurrency)) + 1
+                print '%5d/%5d ------> New slice on %d: %5d' % (
+                    cur_test, nb_tests, self.rank, slice_size)
+                # give a slice to first free child
+                first, last = cur_test, cur_test + slice_size
+                shared_cur_test[0] = last
+                self.suite_semaphore.release()
+
+                self.nb_slices += 1
+                self.nb_tests += slice_size
+                self.run_slice(result, all_tests[first:last])
+                if shared_cur_test[0] > nb_tests:
+                    break
+                self.suite_semaphore.acquire()
                 cur_test = shared_cur_test[0]
-                while cur_test < nb_tests:
-                    # The slice size should be a balance between the overhead
-                    # of processing a slice, and the ability to feed as many
-                    # children as possible for the longest possible time (or
-                    # said otherwise: the last standing child should run alone
-                    # for the shortest possible time). So we start by saying
-                    # that each child will handle as many slices as there are
-                    # children and reducing the slice size from there.
-                    slice_size = ((nb_tests - cur_test)
-                                  / (concurrency * concurrency)) + 1
-                    print '%d/%d -> %d' % (cur_test, nb_tests, slice_size)
-                    # give a slice to first free child
-                    first, last = cur_test, cur_test + slice_size
-                    shared_cur_test[0] = cur_test + slice_size
-                    self.control.write('%d %d\n' % (first, last))
-                    print 'Sent: [%d,%d[ to %d\n' % (first, last, self.pid)
-                    if cur_test > nb_tests:
-                        print 'stop monitoring %d' % (self.pid,)
-                        break
-                    print 'receiving results from %d' %  (self.pid,)
-                    test = ProtocolTestCase(self.feedback)
-                    test.run(result)
-                    print 'results received from %d' %  (self.pid,)
-                    cur_test = shared_cur_test[0]
-                self.control.close()
-            finally:
-                os.waitpid(self.pid, os.WNOHANG)
-
-
-#    test_blocks = partition_tests(suite, concurrency)
-    for proc in range(0, concurrency):
-        (c_read, c_write) = os.pipe()
-        (f_read, f_write) = os.pipe()
-        pid = os.fork()
-        if pid == 0:
-            pid = os.getpid()
-            try:
-                control = os.fdopen(c_read, 'rb', 1)
-                os.close(c_write)
-                # Leave stderr and stdout open so we can see test noise
-                # Close stdin so that the child goes away if it decides to
-                # read from stdin (otherwise its a roulette to see what
-                # child actually gets keystrokes for pdb etc).
-                sys.stdin.close()
-                sys.stdin = None
-                feedback = os.fdopen(f_write, 'wb', 1)
-                os.close(f_read)
-                line = control.readline()
-                while line:
+            self.suite_semaphore.release()
+            print '%d ran %d test in %d slices' % (
+                self.rank, self.nb_tests, self.nb_slices)
+
+        def run_slice(self, result, tests):
+            (f_read, f_write) = os.pipe()
+            pid = os.fork()
+            if pid == 0:
+                try:
+                    # Leave stderr and stdout open so we can see test noise
+                    # Close stdin so that the child goes away if it decides
+                    # to read from stdin (otherwise its a roulette to see
+                    # what child actually gets keystrokes for pdb etc).
+                    sys.stdin.close()
+                    sys.stdin = None
+                    feedback = os.fdopen(f_write, 'wb', 1)
+                    os.close(f_read)
                     subunit_result = AutoTimingTestResultDecorator(
                         TestProtocolClient(feedback))
-                    print '[%d] Read: [%s]' % (pid, line,)
-                    first, last = line.split()
-                    first, last = int(first), int(last)
-                    print '[%d] Received: [%d,%d[' % (pid, first, last)
                     process_suite = TestSuite()
-                    process_suite.addTests(all_tests[first:last])
-                    print '[%d] process_suite created' % (pid,)
+                    process_suite.addTests(tests)
                     process_suite.run(subunit_result)
-                    print '[%d] process_suite ran' % (pid,)
-                    subunit_result.done()
-                    print '[%d] about to read control' % (pid,)
-                    line = control.readline()
-                print '[%d] Closing control' % (pid,)
-                control.close()
-                feedback.close()
-            finally:
-                os._exit(0)
-        else:
-            control = os.fdopen(c_write, 'wb', 1)
-            os.close(c_read)
-            feedback = os.fdopen(f_read, 'rb', 1)
-            os.close(f_write)
-            test = TestInOtherProcess(feedback, control, pid)
-            result.append(test)
+                finally:
+                    os._exit(0)
+            else:
+                feedback = os.fdopen(f_read, 'rb', 1)
+                os.close(f_write)
+                try:
+                    test = ProtocolTestCase(feedback)
+                    test.run(result)
+                finally:
+                    os.waitpid(pid, os.WNOHANG)
+
+    suite_semaphore = threading.Semaphore(1)
+    for proc in range(0, concurrency):
+        test = TestInOtherProcess(suite_semaphore, proc)
+        result.append(test)
     return result
 
 



More information about the bazaar-commits mailing list