Rev 4621: Start hacking on balancing parallel selftest. in file:///home/vila/src/bzr/bugs/selftest/
Vincent Ladeuil
v.ladeuil+lp at free.fr
Tue Aug 18 14:16:48 BST 2009
At file:///home/vila/src/bzr/bugs/selftest/
------------------------------------------------------------
revno: 4621
revision-id: v.ladeuil+lp at free.fr-20090818131648-wh0zg7jck90n76pd
parent: v.ladeuil+lp at free.fr-20090817170424-axb52smg54movnon
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: selftest
timestamp: Tue 2009-08-18 15:16:48 +0200
message:
Start hacking on balancing parallel selftest.
* bzrlib/tests/__init__.py:
(fork_for_tests): Start balancing forked selftest. This version
does not work.
-------------- next part --------------
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py 2009-08-17 17:04:24 +0000
+++ b/bzrlib/tests/__init__.py 2009-08-18 13:16:48 +0000
@@ -3032,49 +3032,104 @@
run(result) called on them to feed tests to result.
"""
concurrency = osutils.local_concurrency()
+ concurrency = 2
result = []
+
from subunit import TestProtocolClient, ProtocolTestCase
try:
from subunit.test_results import AutoTimingTestResultDecorator
except ImportError:
AutoTimingTestResultDecorator = lambda x:x
- class TestInOtherProcess(ProtocolTestCase):
+
+ all_tests = list(iter_suite_tests(suite))
+ nb_tests = len(all_tests)
+ shared_cur_test = [0]
+
+ class TestInOtherProcess(object):
# Should be in subunit, I think. RBC.
- def __init__(self, stream, pid):
- ProtocolTestCase.__init__(self, stream)
+ def __init__(self, feedback, control, pid):
+ self.feedback = feedback
+ self.control = control
self.pid = pid
def run(self, result):
try:
- ProtocolTestCase.run(self, result)
+ print 'start monitoring %d' % (self.pid,)
+ cur_test = shared_cur_test[0]
+ while cur_test < nb_tests:
+ # The slice size should be a balance between the overhead
+ # of processing a slice, and the ability to feed as many
+ # children as possible for the longest possible time (or
+ # said otherwise: the last standing child should run alone
+ # for the shortest possible time). So we start by saying
+ # that each child will handle as many slices as there are
+ # children and reducing the slice size from there.
+ slice_size = ((nb_tests - cur_test)
+ / (concurrency * concurrency)) + 1
+ print '%d/%d -> %d' % (cur_test, nb_tests, slice_size)
+ # give a slice to first free child
+ first, last = cur_test, cur_test + slice_size
+ shared_cur_test[0] = cur_test + slice_size
+ self.control.write('%d %d\n' % (first, last))
+ print 'Sent: [%d,%d[ to %d\n' % (first, last, self.pid)
+ if cur_test > nb_tests:
+ print 'stop monitoring %d' % (self.pid,)
+ break
+ print 'receiving results from %d' % (self.pid,)
+ test = ProtocolTestCase(self.feedback)
+ test.run(result)
+ print 'results received from %d' % (self.pid,)
+ cur_test = shared_cur_test[0]
+ self.control.close()
finally:
os.waitpid(self.pid, os.WNOHANG)
- test_blocks = partition_tests(suite, concurrency)
- for process_tests in test_blocks:
- process_suite = TestSuite()
- process_suite.addTests(process_tests)
- c2pread, c2pwrite = os.pipe()
+
+# test_blocks = partition_tests(suite, concurrency)
+ for proc in range(0, concurrency):
+ (c_read, c_write) = os.pipe()
+ (f_read, f_write) = os.pipe()
pid = os.fork()
if pid == 0:
+ pid = os.getpid()
try:
- os.close(c2pread)
+ control = os.fdopen(c_read, 'rb', 1)
+ os.close(c_write)
# Leave stderr and stdout open so we can see test noise
# Close stdin so that the child goes away if it decides to
# read from stdin (otherwise its a roulette to see what
# child actually gets keystrokes for pdb etc).
sys.stdin.close()
sys.stdin = None
- stream = os.fdopen(c2pwrite, 'wb', 1)
- subunit_result = AutoTimingTestResultDecorator(
- TestProtocolClient(stream))
- process_suite.run(subunit_result)
+ feedback = os.fdopen(f_write, 'wb', 1)
+ os.close(f_read)
+ line = control.readline()
+ while line:
+ subunit_result = AutoTimingTestResultDecorator(
+ TestProtocolClient(feedback))
+ print '[%d] Read: [%s]' % (pid, line,)
+ first, last = line.split()
+ first, last = int(first), int(last)
+ print '[%d] Received: [%d,%d[' % (pid, first, last)
+ process_suite = TestSuite()
+ process_suite.addTests(all_tests[first:last])
+ print '[%d] process_suite created' % (pid,)
+ process_suite.run(subunit_result)
+ print '[%d] process_suite ran' % (pid,)
+ subunit_result.done()
+ print '[%d] about to read control' % (pid,)
+ line = control.readline()
+ print '[%d] Closing control' % (pid,)
+ control.close()
+ feedback.close()
finally:
os._exit(0)
else:
- os.close(c2pwrite)
- stream = os.fdopen(c2pread, 'rb', 1)
- test = TestInOtherProcess(stream, pid)
+ control = os.fdopen(c_write, 'wb', 1)
+ os.close(c_read)
+ feedback = os.fdopen(f_read, 'rb', 1)
+ os.close(f_write)
+ test = TestInOtherProcess(feedback, control, pid)
result.append(test)
return result
More information about the bazaar-commits
mailing list