Rev 4622: Implement a balancing scheme to maximize processor utilisation. in file:///home/vila/src/bzr/bugs/selftest/
Vincent Ladeuil
v.ladeuil+lp at free.fr
Tue Aug 18 18:02:01 BST 2009
At file:///home/vila/src/bzr/bugs/selftest/
------------------------------------------------------------
revno: 4622
revision-id: v.ladeuil+lp at free.fr-20090818170201-s4pz2jzh7522e4x0
parent: v.ladeuil+lp at free.fr-20090818131648-wh0zg7jck90n76pd
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: selftest
timestamp: Tue 2009-08-18 19:02:01 +0200
message:
Implement a balancing scheme to maximize processor utilisation.
* bzrlib/tests/__init__.py:
(fork_for_tests): Change the palce we fork to better control which
tests are run where.
-------------- next part --------------
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py 2009-08-18 13:16:48 +0000
+++ b/bzrlib/tests/__init__.py 2009-08-18 17:02:01 +0000
@@ -3032,7 +3032,6 @@
run(result) called on them to feed tests to result.
"""
concurrency = osutils.local_concurrency()
- concurrency = 2
result = []
from subunit import TestProtocolClient, ProtocolTestCase
@@ -3045,92 +3044,79 @@
nb_tests = len(all_tests)
shared_cur_test = [0]
+ import threading
class TestInOtherProcess(object):
- # Should be in subunit, I think. RBC.
- def __init__(self, feedback, control, pid):
- self.feedback = feedback
- self.control = control
- self.pid = pid
+
+ def __init__(self, suite_semaphore, rank):
+ self.suite_semaphore = suite_semaphore
+ self.rank = rank
+ self.nb_slices = 0
+ self.nb_tests = 0
def run(self, result):
- try:
- print 'start monitoring %d' % (self.pid,)
+ self.suite_semaphore.acquire()
+ cur_test = shared_cur_test[0]
+ while cur_test < nb_tests:
+ # The slice size should be a balance between the overhead of
+ # processing a slice, and the ability to feed as many children
+ # as possible for the longest possible time (or said otherwise:
+ # the last standing child should run alone for the shortest
+ # possible time). So we start by saying that each child will
+ # handle as many slices as there are children and reducing the
+ # slice size from there.
+ slice_size = ((nb_tests - cur_test)
+ / (concurrency * concurrency)) + 1
+ print '%5d/%5d ------> New slice on %d: %5d' % (
+ cur_test, nb_tests, self.rank, slice_size)
+ # give a slice to first free child
+ first, last = cur_test, cur_test + slice_size
+ shared_cur_test[0] = last
+ self.suite_semaphore.release()
+
+ self.nb_slices += 1
+ self.nb_tests += slice_size
+ self.run_slice(result, all_tests[first:last])
+ if shared_cur_test[0] > nb_tests:
+ break
+ self.suite_semaphore.acquire()
cur_test = shared_cur_test[0]
- while cur_test < nb_tests:
- # The slice size should be a balance between the overhead
- # of processing a slice, and the ability to feed as many
- # children as possible for the longest possible time (or
- # said otherwise: the last standing child should run alone
- # for the shortest possible time). So we start by saying
- # that each child will handle as many slices as there are
- # children and reducing the slice size from there.
- slice_size = ((nb_tests - cur_test)
- / (concurrency * concurrency)) + 1
- print '%d/%d -> %d' % (cur_test, nb_tests, slice_size)
- # give a slice to first free child
- first, last = cur_test, cur_test + slice_size
- shared_cur_test[0] = cur_test + slice_size
- self.control.write('%d %d\n' % (first, last))
- print 'Sent: [%d,%d[ to %d\n' % (first, last, self.pid)
- if cur_test > nb_tests:
- print 'stop monitoring %d' % (self.pid,)
- break
- print 'receiving results from %d' % (self.pid,)
- test = ProtocolTestCase(self.feedback)
- test.run(result)
- print 'results received from %d' % (self.pid,)
- cur_test = shared_cur_test[0]
- self.control.close()
- finally:
- os.waitpid(self.pid, os.WNOHANG)
-
-
-# test_blocks = partition_tests(suite, concurrency)
- for proc in range(0, concurrency):
- (c_read, c_write) = os.pipe()
- (f_read, f_write) = os.pipe()
- pid = os.fork()
- if pid == 0:
- pid = os.getpid()
- try:
- control = os.fdopen(c_read, 'rb', 1)
- os.close(c_write)
- # Leave stderr and stdout open so we can see test noise
- # Close stdin so that the child goes away if it decides to
- # read from stdin (otherwise its a roulette to see what
- # child actually gets keystrokes for pdb etc).
- sys.stdin.close()
- sys.stdin = None
- feedback = os.fdopen(f_write, 'wb', 1)
- os.close(f_read)
- line = control.readline()
- while line:
+ self.suite_semaphore.release()
+ print '%d ran %d test in %d slices' % (
+ self.rank, self.nb_tests, self.nb_slices)
+
+ def run_slice(self, result, tests):
+ (f_read, f_write) = os.pipe()
+ pid = os.fork()
+ if pid == 0:
+ try:
+ # Leave stderr and stdout open so we can see test noise
+ # Close stdin so that the child goes away if it decides
+ # to read from stdin (otherwise its a roulette to see
+ # what child actually gets keystrokes for pdb etc).
+ sys.stdin.close()
+ sys.stdin = None
+ feedback = os.fdopen(f_write, 'wb', 1)
+ os.close(f_read)
subunit_result = AutoTimingTestResultDecorator(
TestProtocolClient(feedback))
- print '[%d] Read: [%s]' % (pid, line,)
- first, last = line.split()
- first, last = int(first), int(last)
- print '[%d] Received: [%d,%d[' % (pid, first, last)
process_suite = TestSuite()
- process_suite.addTests(all_tests[first:last])
- print '[%d] process_suite created' % (pid,)
+ process_suite.addTests(tests)
process_suite.run(subunit_result)
- print '[%d] process_suite ran' % (pid,)
- subunit_result.done()
- print '[%d] about to read control' % (pid,)
- line = control.readline()
- print '[%d] Closing control' % (pid,)
- control.close()
- feedback.close()
- finally:
- os._exit(0)
- else:
- control = os.fdopen(c_write, 'wb', 1)
- os.close(c_read)
- feedback = os.fdopen(f_read, 'rb', 1)
- os.close(f_write)
- test = TestInOtherProcess(feedback, control, pid)
- result.append(test)
+ finally:
+ os._exit(0)
+ else:
+ feedback = os.fdopen(f_read, 'rb', 1)
+ os.close(f_write)
+ try:
+ test = ProtocolTestCase(feedback)
+ test.run(result)
+ finally:
+ os.waitpid(pid, os.WNOHANG)
+
+ suite_semaphore = threading.Semaphore(1)
+ for proc in range(0, concurrency):
+ test = TestInOtherProcess(suite_semaphore, proc)
+ result.append(test)
return result
More information about the bazaar-commits
mailing list