Rev 5421: (gz) Cleanup and test selftest thread leak detection (Martin [gz]) in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Mon Sep 13 12:11:45 BST 2010


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 5421 [merge]
revision-id: pqm at pqm.ubuntu.com-20100913111142-4d29z7oy3z5o7vrd
parent: pqm at pqm.ubuntu.com-20100913095203-w7z5eyx3fzdb24wi
parent: gzlist at googlemail.com-20100913082328-dzdil0p7x8mnwize
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Mon 2010-09-13 12:11:42 +0100
message:
  (gz) Cleanup and test selftest thread leak detection (Martin [gz])
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/tests/__init__.py       selftest.py-20050531073622-8d0e3c8845c97a64
  bzrlib/tests/test_selftest.py  test_selftest.py-20051202044319-c110a115d8c0456a
=== modified file 'NEWS'
--- a/NEWS	2010-09-13 06:36:59 +0000
+++ b/NEWS	2010-09-13 11:11:42 +0000
@@ -289,6 +289,9 @@
 * HTTP test servers will leak less threads (and sockets) and will not hang on
   AIX anymore. (Vincent Ladeuil, #405745)
 
+* Rearrange thread leak detection code to eliminate global state and make it
+  possible to extend the reporting. (Marting [gz], #633462)
+
 * The test suite now simply holds log files in memory, rather than writing them
   out to disk and then reading them back in and deleting them.
   (Andrew Bennetts)

=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py	2010-09-10 09:46:15 +0000
+++ b/bzrlib/tests/__init__.py	2010-09-13 11:11:42 +0000
@@ -38,6 +38,7 @@
 import logging
 import math
 import os
+import platform
 import pprint
 import random
 import re
@@ -194,6 +195,8 @@
         self.count = 0
         self._overall_start_time = time.time()
         self._strict = strict
+        self._first_thread_leaker_id = None
+        self._tests_leaking_threads_count = 0
 
     def stopTestRun(self):
         run = self.testsRun
@@ -238,15 +241,15 @@
             ok = self.wasStrictlySuccessful()
         else:
             ok = self.wasSuccessful()
-        if TestCase._first_thread_leaker_id:
+        if self._first_thread_leaker_id:
             self.stream.write(
                 '%s is leaking threads among %d leaking tests.\n' % (
-                TestCase._first_thread_leaker_id,
-                TestCase._leaking_threads_tests))
+                self._first_thread_leaker_id,
+                self._tests_leaking_threads_count))
             # We don't report the main thread as an active one.
             self.stream.write(
                 '%d non-main threads were left active in the end.\n'
-                % (TestCase._active_threads - 1))
+                % (len(self._active_threads) - 1))
 
     def getDescription(self, test):
         return test.id()
@@ -283,28 +286,34 @@
         super(ExtendedTestResult, self).startTest(test)
         if self.count == 0:
             self.startTests()
+        self.count += 1
         self.report_test_start(test)
         test.number = self.count
         self._recordTestStartTime()
+        # Only check for thread leaks if the test case supports cleanups
+        addCleanup = getattr(test, "addCleanup", None)
+        if addCleanup is not None:
+            addCleanup(self._check_leaked_threads, test)
 
     def startTests(self):
-        import platform
-        if getattr(sys, 'frozen', None) is None:
-            bzr_path = osutils.realpath(sys.argv[0])
-        else:
-            bzr_path = sys.executable
-        self.stream.write(
-            'bzr selftest: %s\n' % (bzr_path,))
-        self.stream.write(
-            '   %s\n' % (
-                    bzrlib.__path__[0],))
-        self.stream.write(
-            '   bzr-%s python-%s %s\n' % (
-                    bzrlib.version_string,
-                    bzrlib._format_version_tuple(sys.version_info),
-                    platform.platform(aliased=1),
-                    ))
-        self.stream.write('\n')
+        self.report_tests_starting()
+        self._active_threads = threading.enumerate()
+
+    def _check_leaked_threads(self, test):
+        """See if any threads have leaked since last call
+
+        A sample of live threads is stored in the _active_threads attribute,
+        when this method runs it compares the current live threads and any not
+        in the previous sample are treated as having leaked.
+        """
+        now_active_threads = set(threading.enumerate())
+        threads_leaked = now_active_threads.difference(self._active_threads)
+        if threads_leaked:
+            self._report_thread_leak(test, threads_leaked, now_active_threads)
+            self._tests_leaking_threads_count += 1
+            if self._first_thread_leaker_id is None:
+                self._first_thread_leaker_id = test.id()
+            self._active_threads = now_active_threads
 
     def _recordTestStartTime(self):
         """Record that a test has started."""
@@ -401,8 +410,37 @@
         else:
             raise errors.BzrError("Unknown whence %r" % whence)
 
-    def report_cleaning_up(self):
-        pass
+    def report_tests_starting(self):
+        """Display information before the test run begins"""
+        if getattr(sys, 'frozen', None) is None:
+            bzr_path = osutils.realpath(sys.argv[0])
+        else:
+            bzr_path = sys.executable
+        self.stream.write(
+            'bzr selftest: %s\n' % (bzr_path,))
+        self.stream.write(
+            '   %s\n' % (
+                    bzrlib.__path__[0],))
+        self.stream.write(
+            '   bzr-%s python-%s %s\n' % (
+                    bzrlib.version_string,
+                    bzrlib._format_version_tuple(sys.version_info),
+                    platform.platform(aliased=1),
+                    ))
+        self.stream.write('\n')
+
+    def report_test_start(self, test):
+        """Display information on the test just about to be run"""
+
+    def _report_thread_leak(self, test, leaked_threads, active_threads):
+        """Display information on a test that leaked one or more threads"""
+        # GZ 2010-09-09: A leak summary reported separately from the general
+        #                thread debugging would be nice. Tests under subunit
+        #                need something not using stream, perhaps adding a
+        #                testtools details object would be fitting.
+        if 'threads' in selftest_debug_flags:
+            self.stream.write('%s is leaking, active is now %d\n' %
+                (test.id(), len(active_threads)))
 
     def startTestRun(self):
         self.startTime = time.time()
@@ -481,7 +519,6 @@
         return a
 
     def report_test_start(self, test):
-        self.count += 1
         self.pb.update(
                 self._progress_prefix_text()
                 + ' '
@@ -514,9 +551,6 @@
     def report_unsupported(self, test, feature):
         """test cannot be run because feature is missing."""
 
-    def report_cleaning_up(self):
-        self.pb.update('Cleaning up')
-
 
 class VerboseTestResult(ExtendedTestResult):
     """Produce long output, with one line per test run plus times"""
@@ -534,7 +568,6 @@
         self.stream.write('running %d tests...\n' % self.num_tests)
 
     def report_test_start(self, test):
-        self.count += 1
         name = self._shortened_test_description(test)
         width = osutils.terminal_width()
         if width is not None:
@@ -795,9 +828,6 @@
     accidentally overlooked.
     """
 
-    _active_threads = None
-    _leaking_threads_tests = 0
-    _first_thread_leaker_id = None
     _log_file = None
     # record lsprof data when performing benchmark calls.
     _gather_lsprof_in_benchmarks = False
@@ -828,31 +858,12 @@
         self._track_transports()
         self._track_locks()
         self._clear_debug_flags()
-        TestCase._active_threads = threading.activeCount()
-        self.addCleanup(self._check_leaked_threads)
 
     def debug(self):
         # debug a frame up.
         import pdb
         pdb.Pdb().set_trace(sys._getframe().f_back)
 
-    def _check_leaked_threads(self):
-        active = threading.activeCount()
-        leaked_threads = active - TestCase._active_threads
-        TestCase._active_threads = active
-        # If some tests make the number of threads *decrease*, we'll consider
-        # that they are just observing old threads dieing, not agressively kill
-        # random threads. So we don't report these tests as leaking. The risk
-        # is that we have false positives that way (the test see 2 threads
-        # going away but leak one) but it seems less likely than the actual
-        # false positives (the test see threads going away and does not leak).
-        if leaked_threads > 0:
-            if 'threads' in selftest_debug_flags:
-                print '%s is leaking, active is now %d' % (self.id(), active)
-            TestCase._leaking_threads_tests += 1
-            if TestCase._first_thread_leaker_id is None:
-                TestCase._first_thread_leaker_id = self.id()
-
     def _clear_debug_flags(self):
         """Prevent externally set debug flags affecting tests.
 

=== modified file 'bzrlib/tests/test_selftest.py'
--- a/bzrlib/tests/test_selftest.py	2010-09-02 20:17:03 +0000
+++ b/bzrlib/tests/test_selftest.py	2010-09-09 18:10:38 +0000
@@ -21,6 +21,7 @@
 import os
 import signal
 import sys
+import threading
 import time
 import unittest
 import warnings
@@ -850,8 +851,7 @@
         """A KnownFailure being raised should trigger several result actions."""
         class InstrumentedTestResult(tests.ExtendedTestResult):
             def stopTestRun(self): pass
-            def startTests(self): pass
-            def report_test_start(self, test): pass
+            def report_tests_starting(self): pass
             def report_known_failure(self, test, err=None, details=None):
                 self._call = test, 'known failure'
         result = InstrumentedTestResult(None, None, None, None)
@@ -907,8 +907,7 @@
         """Test the behaviour of invoking addNotSupported."""
         class InstrumentedTestResult(tests.ExtendedTestResult):
             def stopTestRun(self): pass
-            def startTests(self): pass
-            def report_test_start(self, test): pass
+            def report_tests_starting(self): pass
             def report_unsupported(self, test, feature):
                 self._call = test, feature
         result = InstrumentedTestResult(None, None, None, None)
@@ -953,8 +952,7 @@
         """An UnavailableFeature being raised should invoke addNotSupported."""
         class InstrumentedTestResult(tests.ExtendedTestResult):
             def stopTestRun(self): pass
-            def startTests(self): pass
-            def report_test_start(self, test): pass
+            def report_tests_starting(self): pass
             def addNotSupported(self, test, feature):
                 self._call = test, feature
         result = InstrumentedTestResult(None, None, None, None)
@@ -1002,7 +1000,6 @@
         class InstrumentedTestResult(tests.ExtendedTestResult):
             calls = 0
             def startTests(self): self.calls += 1
-            def report_test_start(self, test): pass
         result = InstrumentedTestResult(None, None, None, None)
         def test_function():
             pass
@@ -1010,6 +1007,19 @@
         test.run(result)
         self.assertEquals(1, result.calls)
 
+    def test_startTests_only_once(self):
+        """With multiple tests startTests should still only be called once"""
+        class InstrumentedTestResult(tests.ExtendedTestResult):
+            calls = 0
+            def startTests(self): self.calls += 1
+        result = InstrumentedTestResult(None, None, None, None)
+        suite = unittest.TestSuite([
+            unittest.FunctionTestCase(lambda: None),
+            unittest.FunctionTestCase(lambda: None)])
+        suite.run(result)
+        self.assertEquals(1, result.calls)
+        self.assertEquals(2, result.count)
+
 
 class TestUnicodeFilenameFeature(tests.TestCase):
 
@@ -1036,14 +1046,11 @@
         because of our use of global state.
         """
         old_root = tests.TestCaseInTempDir.TEST_ROOT
-        old_leak = tests.TestCase._first_thread_leaker_id
         try:
             tests.TestCaseInTempDir.TEST_ROOT = None
-            tests.TestCase._first_thread_leaker_id = None
             return testrunner.run(test)
         finally:
             tests.TestCaseInTempDir.TEST_ROOT = old_root
-            tests.TestCase._first_thread_leaker_id = old_leak
 
     def test_known_failure_failed_run(self):
         # run a test that generates a known failure which should be printed in
@@ -2979,6 +2986,94 @@
         self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
 
 
+class TestThreadLeakDetection(tests.TestCase):
+    """Ensure when tests leak threads we detect and report it"""
+
+    class LeakRecordingResult(tests.ExtendedTestResult):
+        def __init__(self):
+            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
+            self.leaks = []
+        def _report_thread_leak(self, test, leaks, alive):
+            self.leaks.append((test, leaks))
+
+    def test_testcase_without_addCleanups(self):
+        """Check old TestCase instances don't break with leak detection"""
+        class Test(unittest.TestCase):
+            def runTest(self):
+                pass
+            addCleanup = None # for when on Python 2.7 with native addCleanup
+        result = self.LeakRecordingResult()
+        test = Test()
+        self.assertIs(getattr(test, "addCleanup", None), None)
+        result.startTestRun()
+        test.run(result)
+        result.stopTestRun()
+        self.assertEqual(result._tests_leaking_threads_count, 0)
+        self.assertEqual(result.leaks, [])
+        
+    def test_thread_leak(self):
+        """Ensure a thread that outlives the running of a test is reported
+
+        Uses a thread that blocks on an event, and is started by the inner
+        test case. As the thread outlives the inner case's run, it should be
+        detected as a leak, but the event is then set so that the thread can
+        be safely joined in cleanup so it's not leaked for real.
+        """
+        event = threading.Event()
+        thread = threading.Thread(name="Leaker", target=event.wait)
+        class Test(tests.TestCase):
+            def test_leak(self):
+                thread.start()
+        result = self.LeakRecordingResult()
+        test = Test("test_leak")
+        self.addCleanup(thread.join)
+        self.addCleanup(event.set)
+        result.startTestRun()
+        test.run(result)
+        result.stopTestRun()
+        self.assertEqual(result._tests_leaking_threads_count, 1)
+        self.assertEqual(result._first_thread_leaker_id, test.id())
+        self.assertEqual(result.leaks, [(test, set([thread]))])
+        self.assertContainsString(result.stream.getvalue(), "leaking threads")
+
+    def test_multiple_leaks(self):
+        """Check multiple leaks are blamed on the test cases at fault
+
+        Same concept as the previous test, but has one inner test method that
+        leaks two threads, and one that doesn't leak at all.
+        """
+        event = threading.Event()
+        thread_a = threading.Thread(name="LeakerA", target=event.wait)
+        thread_b = threading.Thread(name="LeakerB", target=event.wait)
+        thread_c = threading.Thread(name="LeakerC", target=event.wait)
+        class Test(tests.TestCase):
+            def test_first_leak(self):
+                thread_b.start()
+            def test_second_no_leak(self):
+                pass
+            def test_third_leak(self):
+                thread_c.start()
+                thread_a.start()
+        result = self.LeakRecordingResult()
+        first_test = Test("test_first_leak")
+        third_test = Test("test_third_leak")
+        self.addCleanup(thread_a.join)
+        self.addCleanup(thread_b.join)
+        self.addCleanup(thread_c.join)
+        self.addCleanup(event.set)
+        result.startTestRun()
+        unittest.TestSuite(
+            [first_test, Test("test_second_no_leak"), third_test]
+            ).run(result)
+        result.stopTestRun()
+        self.assertEqual(result._tests_leaking_threads_count, 2)
+        self.assertEqual(result._first_thread_leaker_id, first_test.id())
+        self.assertEqual(result.leaks, [
+            (first_test, set([thread_b])),
+            (third_test, set([thread_a, thread_c]))])
+        self.assertContainsString(result.stream.getvalue(), "leaking threads")
+
+
 class TestRunSuite(tests.TestCase):
 
     def test_runner_class(self):




More information about the bazaar-commits mailing list