Rev 4175: Merge Robert ec2 additions. in file:///home/vila/src/bzr/experimental/parallel-selftest/

Vincent Ladeuil v.ladeuil+lp at free.fr
Sun Mar 22 17:43:51 GMT 2009


At file:///home/vila/src/bzr/experimental/parallel-selftest/

------------------------------------------------------------
revno: 4175
revision-id: v.ladeuil+lp at free.fr-20090322174351-r28gl68v332a42dm
parent: v.ladeuil+lp at free.fr-20090321181645-zvumlz7hk9tnxqud
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: lifeless
timestamp: Sun 2009-03-22 18:43:51 +0100
message:
  Merge Robert ec2 additions.
-------------- next part --------------
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py	2009-03-20 08:04:33 +0000
+++ b/bzrlib/tests/__init__.py	2009-03-22 17:43:51 +0000
@@ -27,6 +27,7 @@
 # new assertFoo() methods.
 
 import atexit
+import bz2
 import codecs
 from cStringIO import StringIO
 import difflib
@@ -2677,6 +2678,9 @@
         concurrency = local_concurrency()
         if concurrency > 1:
             suite = ConcurrentTestSuite(suite, concurrency, reinvoke_for_tests)
+    if strategy.startswith('ec2='):
+        concurrency = int(strategy[4:])
+        suite = ConcurrentTestSuite(suite, concurrency, ec2_for_tests)
     if runner_class is None:
         runner_class = TextTestRunner
     runner = runner_class(stream=sys.stdout,
@@ -2806,12 +2810,186 @@
     return result
 
 
+def ec2_for_tests(suite, concurrency):
+    """Take suite and start up to concurrency runners using subprocess().
+
+    :return: An iterable of TestCase-like objects which can each have
+        run(result) called on them to feed tests to result, and
+        cleanup() called on them to stop them/kill children/end threads.
+    """
+    result = []
+    import boto
+    from subunit import TestProtocolClient, ProtocolTestCase
+    from bzrlib.transport import ssh
+    class TestInEC2(ProtocolTestCase):
+        def __init__(self, tests, instance, lock):
+            ProtocolTestCase.__init__(self, None)
+            self.instance = instance
+            self.process_tests = tests
+            self.connection = None
+            self.lock = lock
+            self.shutdown_instance = False
+        # wait?
+        def run_script(self, username, args):
+            """Run a script on the instance"""
+            vendor = ssh._get_ssh_vendor()
+            # depends on subprocess vendor at the moment
+            connection = vendor.connect_ssh(username, None,
+                self.instance.dns_name, None, ['-oBatchMode=yes'] + args)
+            ir, iw = connection.get_filelike_channels()
+            return ir, connection
+
+        def acquire_machine(self):
+            while instance.state != 'running' or not instance.dns_name:
+                time.sleep(1)
+                self.instance.update()
+            # allow ssh to start :(
+            time.sleep(10)
+            transport = self.get_transport('root')
+            setup_script = '\n'.join(["set -e",
+                "echo 'deb http://ppa.launchpad.net/bzr/ppa/ubuntu "
+                "intrepid main' >> /etc/apt/sources.list.d/bzr.list",
+                "apt-key adv --recv-keys --keyserver keyserver.ubuntu.com"
+                " 8C6C1EFD 2>&1 > /dev/null",
+                "apt-get update > /dev/null",
+                "apt-get install -y bzr python python2.5-dev "
+                "python-docutils graphviz xdg-utils python-pyrex "
+                "python-medusa 2>&1 > /dev/null",
+                "mkdir -p ~/.bazaar/plugins/ > /dev/null",
+                "bzr branch lp:subunit subunit 2>&1 > /dev/null",
+                "adduser bzrtest --disabled-password < /dev/null",
+                "mkdir /home/bzrtest/.ssh",
+                "cp .ssh/authorized_keys /home/bzrtest/.ssh/",
+                "chown -R bzrtest:bzrtest /home/bzrtest/.ssh",
+                "chmod 0700 /home/bzrtest/.ssh",
+                "chmod 0600 /home/bzrtest/.ssh/authorized_keys",
+                "sudo -u bzrtest bzr branch lp:subunit /home/bzrtest/subunit 2>&1 > /dev/null",
+                ])
+            tries = 0
+            connected = False
+            while not connected and tries < 10:
+                try:
+                    transport.put_bytes_non_atomic("setup_machine.sh", setup_script)
+                except errors.SocketConnectionError, e:
+                    tries += 1
+                else:
+                    connected = True
+            if tries == 10:
+                raise e
+            ir, connection = self.run_script('root', ['bash', 'setup_machine.sh'])
+            ir.read()
+            connection.close()
+            t = get_transport('sftp://bzrtest@%s/~/' %
+                self.instance.dns_name)
+            # And here we do a lockdir dance.
+            self.lock = bzrlib.lockdir.LockDir(t, 'test-machine')
+            self.lock.create()
+            self.lock.attempt_lock()
+
+        def get_transport(self, username):
+            if not self.instance.dns_name:
+                raise AssertionError("not connected?")
+            return get_transport('sftp://%s@%s/~/' % (username,
+                self.instance.dns_name))
+
+        def run(self, result):
+            try:
+                if self.lock is None:
+                    self.acquire_machine()
+                transport = self.lock.transport
+                bzr_root = os.path.dirname(os.path.dirname(bzrlib.__file__))
+                branch = bzrlib.branch.Branch.open(bzr_root)
+                public_location = branch.get_public_branch()
+                test_script = '\n'.join([
+                    "set -e",
+                    "rm -f test_list",
+                    "bunzip2 test_list.bz2 > /dev/null",
+                    "if [ -d test_branch ] ; then",
+                    " cd test_branch",
+                    " bzr pull --overwrite %s > /dev/null" % public_location,
+                    "else",
+                    " bzr branch %s test_branch 2>&1 > /dev/null" % public_location,
+                    " cd test_branch",
+                    "fi",
+                    "make 2>&1 > /dev/null",
+                    "BZR_PARALLEL=forked PYTHONPATH=~/subunit/python ./bzr selftest --subunit --load-list ../test_list ",
+                    ])
+                transport.put_bytes_non_atomic("test.sh", test_script)
+                test_list = '\n'.join(test.id() for test in self.process_tests)
+                test_list_bz2 = bz2.compress(test_list)
+                # TODO: random name ?
+                transport.put_bytes_non_atomic('test_list.bz2', test_list_bz2)
+                ir, connection = self.run_script('bzrtest', ['bash', 'test.sh'])
+                print self.instance.dns_name
+                self._stream = ir
+                self.connection = connection
+                ProtocolTestCase.run(self, result)
+            finally:
+                # NB: ctrl-C does not yet trigger cleanup().
+                self.cleanup()
+
+        def cleanup(self):
+            if self.lock.is_held:
+                self.lock.unlock()
+            if self.shutdown_instance and self.instance is not None:
+                self.instance.stop()
+                self.instance = None
+            if self.connection is not None:
+                self.connection.close()
+                self.connection = None
+            # print "instance %d finished" % self.instance
+    test_blocks = partition_tests(suite, concurrency)
+    ec2_conn = boto.connect_ec2(os.environ['AWS_ACCESS_KEY_ID'], os.environ['AWS_SECRET_ACCESS_KEY'])
+    instances = []
+    missing_instances = concurrency
+    for reservation in ec2_conn.get_all_instances():
+        for instance in reservation.instances:
+            missing_instances = concurrency - len(instances)
+            if not missing_instances:
+                break
+            if instance.state == 'running':
+                t = get_transport('sftp://bzrtest@%s/~/' %
+                    instance.dns_name)
+                # And here we do a lockdir dance.
+                lock = bzrlib.lockdir.LockDir(t, 'test-machine')
+                try:
+                    lock.attempt_lock()
+                    instances.append((instance, lock))
+                except errors.LockContention:
+                    note("busy instance %r" % instance)
+                    pass
+                except Exception:
+                    note("unusable instance %r" % instance)
+    if missing_instances:
+        # ami-bdfe19d4 is Eric Hammonds 64-bit Ubuntu image
+        # http://developer.amazonwebservices.com/connect/entry.jspa?externalID=1762&categoryID=101
+        # TODO use ec2_conn.get_all_instances() to get current reservations and use
+        # an existing running instance.
+        image = ec2_conn.get_image('ami-bdfe19d4')
+        os.environ['EC2_KEYPAIR_NAME'] = 'bzr'
+        reservation = image.run(1, missing_instances,
+            os.environ['EC2_KEYPAIR_NAME'], instance_type='c1.xlarge')
+        for instance in reservation.instances:
+            instances.append((instance, None))
+    for process_tests, (instance, lock) in zip(test_blocks, instances):
+        test = TestInEC2(process_tests, instance, lock)
+        result.append(test)
+    return result
+
+
+def cpucount(content):
+    lines = content.splitlines()
+    prefix = 'processor'
+    for line in file('/proc/cpuinfo', 'rb').readlines():
+        if line.startswith(prefix):
+            concurrency = int(line[line.find(':')+1:-1]) + 1
+    return concurrency
+
+
 def local_concurrency():
     try:
-        prefix = 'processor'
-        for line in file('/proc/cpuinfo', 'rb').readlines():
-            if line.startswith(prefix):
-                concurrency = int(line[line.find(':')+1:-1]) + 1
+        content = file('/proc/cpuinfo', rb).read()
+        concurrency = cpucount(content)
     except Exception, e:
         concurrency = 1
     return concurrency



More information about the bazaar-commits mailing list