Rev 5421: (gz) Cleanup and test selftest thread leak detection (Martin [gz]) in file:///home/pqm/archives/thelove/bzr/%2Btrunk/
Canonical.com Patch Queue Manager
pqm at pqm.ubuntu.com
Mon Sep 13 12:11:45 BST 2010
At file:///home/pqm/archives/thelove/bzr/%2Btrunk/
------------------------------------------------------------
revno: 5421 [merge]
revision-id: pqm at pqm.ubuntu.com-20100913111142-4d29z7oy3z5o7vrd
parent: pqm at pqm.ubuntu.com-20100913095203-w7z5eyx3fzdb24wi
parent: gzlist at googlemail.com-20100913082328-dzdil0p7x8mnwize
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Mon 2010-09-13 12:11:42 +0100
message:
(gz) Cleanup and test selftest thread leak detection (Martin [gz])
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/tests/__init__.py selftest.py-20050531073622-8d0e3c8845c97a64
bzrlib/tests/test_selftest.py test_selftest.py-20051202044319-c110a115d8c0456a
=== modified file 'NEWS'
--- a/NEWS 2010-09-13 06:36:59 +0000
+++ b/NEWS 2010-09-13 11:11:42 +0000
@@ -289,6 +289,9 @@
* HTTP test servers will leak less threads (and sockets) and will not hang on
AIX anymore. (Vincent Ladeuil, #405745)
+* Rearrange thread leak detection code to eliminate global state and make it
+ possible to extend the reporting. (Marting [gz], #633462)
+
* The test suite now simply holds log files in memory, rather than writing them
out to disk and then reading them back in and deleting them.
(Andrew Bennetts)
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py 2010-09-10 09:46:15 +0000
+++ b/bzrlib/tests/__init__.py 2010-09-13 11:11:42 +0000
@@ -38,6 +38,7 @@
import logging
import math
import os
+import platform
import pprint
import random
import re
@@ -194,6 +195,8 @@
self.count = 0
self._overall_start_time = time.time()
self._strict = strict
+ self._first_thread_leaker_id = None
+ self._tests_leaking_threads_count = 0
def stopTestRun(self):
run = self.testsRun
@@ -238,15 +241,15 @@
ok = self.wasStrictlySuccessful()
else:
ok = self.wasSuccessful()
- if TestCase._first_thread_leaker_id:
+ if self._first_thread_leaker_id:
self.stream.write(
'%s is leaking threads among %d leaking tests.\n' % (
- TestCase._first_thread_leaker_id,
- TestCase._leaking_threads_tests))
+ self._first_thread_leaker_id,
+ self._tests_leaking_threads_count))
# We don't report the main thread as an active one.
self.stream.write(
'%d non-main threads were left active in the end.\n'
- % (TestCase._active_threads - 1))
+ % (len(self._active_threads) - 1))
def getDescription(self, test):
return test.id()
@@ -283,28 +286,34 @@
super(ExtendedTestResult, self).startTest(test)
if self.count == 0:
self.startTests()
+ self.count += 1
self.report_test_start(test)
test.number = self.count
self._recordTestStartTime()
+ # Only check for thread leaks if the test case supports cleanups
+ addCleanup = getattr(test, "addCleanup", None)
+ if addCleanup is not None:
+ addCleanup(self._check_leaked_threads, test)
def startTests(self):
- import platform
- if getattr(sys, 'frozen', None) is None:
- bzr_path = osutils.realpath(sys.argv[0])
- else:
- bzr_path = sys.executable
- self.stream.write(
- 'bzr selftest: %s\n' % (bzr_path,))
- self.stream.write(
- ' %s\n' % (
- bzrlib.__path__[0],))
- self.stream.write(
- ' bzr-%s python-%s %s\n' % (
- bzrlib.version_string,
- bzrlib._format_version_tuple(sys.version_info),
- platform.platform(aliased=1),
- ))
- self.stream.write('\n')
+ self.report_tests_starting()
+ self._active_threads = threading.enumerate()
+
+ def _check_leaked_threads(self, test):
+ """See if any threads have leaked since last call
+
+ A sample of live threads is stored in the _active_threads attribute,
+ when this method runs it compares the current live threads and any not
+ in the previous sample are treated as having leaked.
+ """
+ now_active_threads = set(threading.enumerate())
+ threads_leaked = now_active_threads.difference(self._active_threads)
+ if threads_leaked:
+ self._report_thread_leak(test, threads_leaked, now_active_threads)
+ self._tests_leaking_threads_count += 1
+ if self._first_thread_leaker_id is None:
+ self._first_thread_leaker_id = test.id()
+ self._active_threads = now_active_threads
def _recordTestStartTime(self):
"""Record that a test has started."""
@@ -401,8 +410,37 @@
else:
raise errors.BzrError("Unknown whence %r" % whence)
- def report_cleaning_up(self):
- pass
+ def report_tests_starting(self):
+ """Display information before the test run begins"""
+ if getattr(sys, 'frozen', None) is None:
+ bzr_path = osutils.realpath(sys.argv[0])
+ else:
+ bzr_path = sys.executable
+ self.stream.write(
+ 'bzr selftest: %s\n' % (bzr_path,))
+ self.stream.write(
+ ' %s\n' % (
+ bzrlib.__path__[0],))
+ self.stream.write(
+ ' bzr-%s python-%s %s\n' % (
+ bzrlib.version_string,
+ bzrlib._format_version_tuple(sys.version_info),
+ platform.platform(aliased=1),
+ ))
+ self.stream.write('\n')
+
+ def report_test_start(self, test):
+ """Display information on the test just about to be run"""
+
+ def _report_thread_leak(self, test, leaked_threads, active_threads):
+ """Display information on a test that leaked one or more threads"""
+ # GZ 2010-09-09: A leak summary reported separately from the general
+ # thread debugging would be nice. Tests under subunit
+ # need something not using stream, perhaps adding a
+ # testtools details object would be fitting.
+ if 'threads' in selftest_debug_flags:
+ self.stream.write('%s is leaking, active is now %d\n' %
+ (test.id(), len(active_threads)))
def startTestRun(self):
self.startTime = time.time()
@@ -481,7 +519,6 @@
return a
def report_test_start(self, test):
- self.count += 1
self.pb.update(
self._progress_prefix_text()
+ ' '
@@ -514,9 +551,6 @@
def report_unsupported(self, test, feature):
"""test cannot be run because feature is missing."""
- def report_cleaning_up(self):
- self.pb.update('Cleaning up')
-
class VerboseTestResult(ExtendedTestResult):
"""Produce long output, with one line per test run plus times"""
@@ -534,7 +568,6 @@
self.stream.write('running %d tests...\n' % self.num_tests)
def report_test_start(self, test):
- self.count += 1
name = self._shortened_test_description(test)
width = osutils.terminal_width()
if width is not None:
@@ -795,9 +828,6 @@
accidentally overlooked.
"""
- _active_threads = None
- _leaking_threads_tests = 0
- _first_thread_leaker_id = None
_log_file = None
# record lsprof data when performing benchmark calls.
_gather_lsprof_in_benchmarks = False
@@ -828,31 +858,12 @@
self._track_transports()
self._track_locks()
self._clear_debug_flags()
- TestCase._active_threads = threading.activeCount()
- self.addCleanup(self._check_leaked_threads)
def debug(self):
# debug a frame up.
import pdb
pdb.Pdb().set_trace(sys._getframe().f_back)
- def _check_leaked_threads(self):
- active = threading.activeCount()
- leaked_threads = active - TestCase._active_threads
- TestCase._active_threads = active
- # If some tests make the number of threads *decrease*, we'll consider
- # that they are just observing old threads dieing, not agressively kill
- # random threads. So we don't report these tests as leaking. The risk
- # is that we have false positives that way (the test see 2 threads
- # going away but leak one) but it seems less likely than the actual
- # false positives (the test see threads going away and does not leak).
- if leaked_threads > 0:
- if 'threads' in selftest_debug_flags:
- print '%s is leaking, active is now %d' % (self.id(), active)
- TestCase._leaking_threads_tests += 1
- if TestCase._first_thread_leaker_id is None:
- TestCase._first_thread_leaker_id = self.id()
-
def _clear_debug_flags(self):
"""Prevent externally set debug flags affecting tests.
=== modified file 'bzrlib/tests/test_selftest.py'
--- a/bzrlib/tests/test_selftest.py 2010-09-02 20:17:03 +0000
+++ b/bzrlib/tests/test_selftest.py 2010-09-09 18:10:38 +0000
@@ -21,6 +21,7 @@
import os
import signal
import sys
+import threading
import time
import unittest
import warnings
@@ -850,8 +851,7 @@
"""A KnownFailure being raised should trigger several result actions."""
class InstrumentedTestResult(tests.ExtendedTestResult):
def stopTestRun(self): pass
- def startTests(self): pass
- def report_test_start(self, test): pass
+ def report_tests_starting(self): pass
def report_known_failure(self, test, err=None, details=None):
self._call = test, 'known failure'
result = InstrumentedTestResult(None, None, None, None)
@@ -907,8 +907,7 @@
"""Test the behaviour of invoking addNotSupported."""
class InstrumentedTestResult(tests.ExtendedTestResult):
def stopTestRun(self): pass
- def startTests(self): pass
- def report_test_start(self, test): pass
+ def report_tests_starting(self): pass
def report_unsupported(self, test, feature):
self._call = test, feature
result = InstrumentedTestResult(None, None, None, None)
@@ -953,8 +952,7 @@
"""An UnavailableFeature being raised should invoke addNotSupported."""
class InstrumentedTestResult(tests.ExtendedTestResult):
def stopTestRun(self): pass
- def startTests(self): pass
- def report_test_start(self, test): pass
+ def report_tests_starting(self): pass
def addNotSupported(self, test, feature):
self._call = test, feature
result = InstrumentedTestResult(None, None, None, None)
@@ -1002,7 +1000,6 @@
class InstrumentedTestResult(tests.ExtendedTestResult):
calls = 0
def startTests(self): self.calls += 1
- def report_test_start(self, test): pass
result = InstrumentedTestResult(None, None, None, None)
def test_function():
pass
@@ -1010,6 +1007,19 @@
test.run(result)
self.assertEquals(1, result.calls)
+ def test_startTests_only_once(self):
+ """With multiple tests startTests should still only be called once"""
+ class InstrumentedTestResult(tests.ExtendedTestResult):
+ calls = 0
+ def startTests(self): self.calls += 1
+ result = InstrumentedTestResult(None, None, None, None)
+ suite = unittest.TestSuite([
+ unittest.FunctionTestCase(lambda: None),
+ unittest.FunctionTestCase(lambda: None)])
+ suite.run(result)
+ self.assertEquals(1, result.calls)
+ self.assertEquals(2, result.count)
+
class TestUnicodeFilenameFeature(tests.TestCase):
@@ -1036,14 +1046,11 @@
because of our use of global state.
"""
old_root = tests.TestCaseInTempDir.TEST_ROOT
- old_leak = tests.TestCase._first_thread_leaker_id
try:
tests.TestCaseInTempDir.TEST_ROOT = None
- tests.TestCase._first_thread_leaker_id = None
return testrunner.run(test)
finally:
tests.TestCaseInTempDir.TEST_ROOT = old_root
- tests.TestCase._first_thread_leaker_id = old_leak
def test_known_failure_failed_run(self):
# run a test that generates a known failure which should be printed in
@@ -2979,6 +2986,94 @@
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
+class TestThreadLeakDetection(tests.TestCase):
+ """Ensure when tests leak threads we detect and report it"""
+
+ class LeakRecordingResult(tests.ExtendedTestResult):
+ def __init__(self):
+ tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
+ self.leaks = []
+ def _report_thread_leak(self, test, leaks, alive):
+ self.leaks.append((test, leaks))
+
+ def test_testcase_without_addCleanups(self):
+ """Check old TestCase instances don't break with leak detection"""
+ class Test(unittest.TestCase):
+ def runTest(self):
+ pass
+ addCleanup = None # for when on Python 2.7 with native addCleanup
+ result = self.LeakRecordingResult()
+ test = Test()
+ self.assertIs(getattr(test, "addCleanup", None), None)
+ result.startTestRun()
+ test.run(result)
+ result.stopTestRun()
+ self.assertEqual(result._tests_leaking_threads_count, 0)
+ self.assertEqual(result.leaks, [])
+
+ def test_thread_leak(self):
+ """Ensure a thread that outlives the running of a test is reported
+
+ Uses a thread that blocks on an event, and is started by the inner
+ test case. As the thread outlives the inner case's run, it should be
+ detected as a leak, but the event is then set so that the thread can
+ be safely joined in cleanup so it's not leaked for real.
+ """
+ event = threading.Event()
+ thread = threading.Thread(name="Leaker", target=event.wait)
+ class Test(tests.TestCase):
+ def test_leak(self):
+ thread.start()
+ result = self.LeakRecordingResult()
+ test = Test("test_leak")
+ self.addCleanup(thread.join)
+ self.addCleanup(event.set)
+ result.startTestRun()
+ test.run(result)
+ result.stopTestRun()
+ self.assertEqual(result._tests_leaking_threads_count, 1)
+ self.assertEqual(result._first_thread_leaker_id, test.id())
+ self.assertEqual(result.leaks, [(test, set([thread]))])
+ self.assertContainsString(result.stream.getvalue(), "leaking threads")
+
+ def test_multiple_leaks(self):
+ """Check multiple leaks are blamed on the test cases at fault
+
+ Same concept as the previous test, but has one inner test method that
+ leaks two threads, and one that doesn't leak at all.
+ """
+ event = threading.Event()
+ thread_a = threading.Thread(name="LeakerA", target=event.wait)
+ thread_b = threading.Thread(name="LeakerB", target=event.wait)
+ thread_c = threading.Thread(name="LeakerC", target=event.wait)
+ class Test(tests.TestCase):
+ def test_first_leak(self):
+ thread_b.start()
+ def test_second_no_leak(self):
+ pass
+ def test_third_leak(self):
+ thread_c.start()
+ thread_a.start()
+ result = self.LeakRecordingResult()
+ first_test = Test("test_first_leak")
+ third_test = Test("test_third_leak")
+ self.addCleanup(thread_a.join)
+ self.addCleanup(thread_b.join)
+ self.addCleanup(thread_c.join)
+ self.addCleanup(event.set)
+ result.startTestRun()
+ unittest.TestSuite(
+ [first_test, Test("test_second_no_leak"), third_test]
+ ).run(result)
+ result.stopTestRun()
+ self.assertEqual(result._tests_leaking_threads_count, 2)
+ self.assertEqual(result._first_thread_leaker_id, first_test.id())
+ self.assertEqual(result.leaks, [
+ (first_test, set([thread_b])),
+ (third_test, set([thread_a, thread_c]))])
+ self.assertContainsString(result.stream.getvalue(), "leaking threads")
+
+
class TestRunSuite(tests.TestCase):
def test_runner_class(self):
More information about the bazaar-commits
mailing list