Rev 4334: Fix 2 more lock-related test failures. in file:///home/vila/src/bzr/experimental/failing-lock-tests/
Vincent Ladeuil
v.ladeuil+lp at free.fr
Mon May 11 08:14:59 BST 2009
At file:///home/vila/src/bzr/experimental/failing-lock-tests/
------------------------------------------------------------
revno: 4334
revision-id: v.ladeuil+lp at free.fr-20090511071458-bjghkrcdxbv87193
parent: v.ladeuil+lp at free.fr-20090508163409-83gvsoy3fuy58k2y
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: failing-lock-tests
timestamp: Mon 2009-05-11 09:14:58 +0200
message:
Fix 2 more lock-related test failures.
* tests/test_lockable_files.py:
Replace some try/finally with an addCleanup().
(_TestLockableFiles_mixin.test_leave_in_place): Restore clearing
lock behavior.
-------------- next part --------------
=== modified file 'bzrlib/tests/test_lockable_files.py'
--- a/bzrlib/tests/test_lockable_files.py 2009-04-04 02:50:01 +0000
+++ b/bzrlib/tests/test_lockable_files.py 2009-05-11 07:14:58 +0000
@@ -56,72 +56,68 @@
deprecated_in((1, 5, 0)),
self.lockable.get_utf8, 'foo')
self.lockable.lock_write()
- try:
- unicode_string = u'bar\u1234'
- self.assertEqual(4, len(unicode_string))
- byte_string = unicode_string.encode('utf-8')
- self.assertEqual(6, len(byte_string))
- self.assertRaises(UnicodeEncodeError,
- self.applyDeprecated,
- deprecated_in((1, 6, 0)),
- self.lockable.put, 'foo',
- StringIO(unicode_string))
- self.applyDeprecated(
- deprecated_in((1, 6, 0)),
- self.lockable.put,
- 'foo', StringIO(byte_string))
- byte_stream = self.applyDeprecated(
- deprecated_in((1, 5, 0)),
- self.lockable.get,
- 'foo')
- self.assertEqual(byte_string, byte_stream.read())
- unicode_stream = self.applyDeprecated(
- deprecated_in((1, 5, 0)),
- self.lockable.get_utf8,
- 'foo')
- self.assertEqual(unicode_string,
- unicode_stream.read())
- self.assertRaises(BzrBadParameterNotString,
- self.applyDeprecated,
- deprecated_in((1, 6, 0)),
- self.lockable.put_utf8,
- 'bar',
- StringIO(unicode_string))
- self.applyDeprecated(
- deprecated_in((1, 6, 0)),
- self.lockable.put_utf8,
- 'bar',
- unicode_string)
- unicode_stream = self.applyDeprecated(
- deprecated_in((1, 5, 0)),
- self.lockable.get_utf8,
- 'bar')
- self.assertEqual(unicode_string,
- unicode_stream.read())
- byte_stream = self.applyDeprecated(
- deprecated_in((1, 5, 0)),
- self.lockable.get,
- 'bar')
- self.assertEqual(byte_string, byte_stream.read())
- self.applyDeprecated(
- deprecated_in((1, 6, 0)),
- self.lockable.put_bytes,
- 'raw', 'raw\xffbytes')
- byte_stream = self.applyDeprecated(
- deprecated_in((1, 5, 0)),
- self.lockable.get,
- 'raw')
- self.assertEqual('raw\xffbytes', byte_stream.read())
- finally:
- self.lockable.unlock()
+ self.addCleanup(self.lockable.unlock)
+ unicode_string = u'bar\u1234'
+ self.assertEqual(4, len(unicode_string))
+ byte_string = unicode_string.encode('utf-8')
+ self.assertEqual(6, len(byte_string))
+ self.assertRaises(UnicodeEncodeError,
+ self.applyDeprecated,
+ deprecated_in((1, 6, 0)),
+ self.lockable.put, 'foo',
+ StringIO(unicode_string))
+ self.applyDeprecated(
+ deprecated_in((1, 6, 0)),
+ self.lockable.put,
+ 'foo', StringIO(byte_string))
+ byte_stream = self.applyDeprecated(
+ deprecated_in((1, 5, 0)),
+ self.lockable.get,
+ 'foo')
+ self.assertEqual(byte_string, byte_stream.read())
+ unicode_stream = self.applyDeprecated(
+ deprecated_in((1, 5, 0)),
+ self.lockable.get_utf8,
+ 'foo')
+ self.assertEqual(unicode_string,
+ unicode_stream.read())
+ self.assertRaises(BzrBadParameterNotString,
+ self.applyDeprecated,
+ deprecated_in((1, 6, 0)),
+ self.lockable.put_utf8,
+ 'bar',
+ StringIO(unicode_string))
+ self.applyDeprecated(
+ deprecated_in((1, 6, 0)),
+ self.lockable.put_utf8,
+ 'bar',
+ unicode_string)
+ unicode_stream = self.applyDeprecated(
+ deprecated_in((1, 5, 0)),
+ self.lockable.get_utf8,
+ 'bar')
+ self.assertEqual(unicode_string,
+ unicode_stream.read())
+ byte_stream = self.applyDeprecated(
+ deprecated_in((1, 5, 0)),
+ self.lockable.get,
+ 'bar')
+ self.assertEqual(byte_string, byte_stream.read())
+ self.applyDeprecated(
+ deprecated_in((1, 6, 0)),
+ self.lockable.put_bytes,
+ 'raw', 'raw\xffbytes')
+ byte_stream = self.applyDeprecated(
+ deprecated_in((1, 5, 0)),
+ self.lockable.get,
+ 'raw')
+ self.assertEqual('raw\xffbytes', byte_stream.read())
def test_locks(self):
self.lockable.lock_read()
- try:
- self.assertRaises(ReadOnlyError, self.lockable.put, 'foo',
- StringIO('bar\u1234'))
- finally:
- self.lockable.unlock()
+ self.addCleanup(self.lockable.unlock)
+ self.assertRaises(ReadOnlyError, self.lockable.put, 'foo',
+ StringIO('bar\u1234'))
def test_transactions(self):
self.assertIs(self.lockable.get_transaction().__class__,
@@ -176,92 +172,80 @@
def test_lock_write_returns_None_refuses_token(self):
token = self.lockable.lock_write()
- try:
- if token is not None:
- # This test does not apply, because this lockable supports
- # tokens.
- raise TestNotApplicable("%r uses tokens" % (self.lockable,))
- self.assertRaises(errors.TokenLockingNotSupported,
- self.lockable.lock_write, token='token')
- finally:
- self.lockable.unlock()
+ self.addCleanup(self.lockable.unlock)
+ if token is not None:
+ # This test does not apply, because this lockable supports
+ # tokens.
+ raise TestNotApplicable("%r uses tokens" % (self.lockable,))
+ self.assertRaises(errors.TokenLockingNotSupported,
+ self.lockable.lock_write, token='token')
def test_lock_write_returns_token_when_given_token(self):
token = self.lockable.lock_write()
- try:
- if token is None:
- # This test does not apply, because this lockable refuses
- # tokens.
- return
- new_lockable = self.get_lockable()
- token_from_new_lockable = new_lockable.lock_write(token=token)
- try:
- self.assertEqual(token, token_from_new_lockable)
- finally:
- new_lockable.unlock()
- finally:
- self.lockable.unlock()
+ self.addCleanup(self.lockable.unlock)
+ if token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+ new_lockable = self.get_lockable()
+ token_from_new_lockable = new_lockable.lock_write(token=token)
+ self.addCleanup(new_lockable.unlock)
+ self.assertEqual(token, token_from_new_lockable)
def test_lock_write_raises_on_token_mismatch(self):
token = self.lockable.lock_write()
- try:
- if token is None:
- # This test does not apply, because this lockable refuses
- # tokens.
- return
- different_token = token + 'xxx'
- # Re-using the same lockable instance with a different token will
- # raise TokenMismatch.
- self.assertRaises(errors.TokenMismatch,
- self.lockable.lock_write, token=different_token)
- # A separate instance for the same lockable will also raise
- # TokenMismatch.
- # This detects the case where a caller claims to have a lock (via
- # the token) for an external resource, but doesn't (the token is
- # different). Clients need a separate lock object to make sure the
- # external resource is probed, whereas the existing lock object
- # might cache.
- new_lockable = self.get_lockable()
- self.assertRaises(errors.TokenMismatch,
- new_lockable.lock_write, token=different_token)
- finally:
- self.lockable.unlock()
+ self.addCleanup(self.lockable.unlock)
+ if token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+ different_token = token + 'xxx'
+ # Re-using the same lockable instance with a different token will
+ # raise TokenMismatch.
+ self.assertRaises(errors.TokenMismatch,
+ self.lockable.lock_write, token=different_token)
+ # A separate instance for the same lockable will also raise
+ # TokenMismatch.
+ # This detects the case where a caller claims to have a lock (via
+ # the token) for an external resource, but doesn't (the token is
+ # different). Clients need a separate lock object to make sure the
+ # external resource is probed, whereas the existing lock object
+ # might cache.
+ new_lockable = self.get_lockable()
+ self.assertRaises(errors.TokenMismatch,
+ new_lockable.lock_write, token=different_token)
def test_lock_write_with_matching_token(self):
# If the token matches, so no exception is raised by lock_write.
token = self.lockable.lock_write()
- try:
- if token is None:
- # This test does not apply, because this lockable refuses
- # tokens.
- return
- # The same instance will accept a second lock_write if the specified
- # token matches.
- self.lockable.lock_write(token=token)
- self.lockable.unlock()
- # Calling lock_write on a new instance for the same lockable will
- # also succeed.
- new_lockable = self.get_lockable()
- new_lockable.lock_write(token=token)
- new_lockable.unlock()
- finally:
- self.lockable.unlock()
+ self.addCleanup(self.lockable.unlock)
+ if token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+ # The same instance will accept a second lock_write if the specified
+ # token matches.
+ self.lockable.lock_write(token=token)
+ self.lockable.unlock()
+ # Calling lock_write on a new instance for the same lockable will
+ # also succeed.
+ new_lockable = self.get_lockable()
+ new_lockable.lock_write(token=token)
+ new_lockable.unlock()
def test_unlock_after_lock_write_with_token(self):
# If lock_write did not physically acquire the lock (because it was
# passed a token), then unlock should not physically release it.
token = self.lockable.lock_write()
- try:
- if token is None:
- # This test does not apply, because this lockable refuses
- # tokens.
- return
- new_lockable = self.get_lockable()
- new_lockable.lock_write(token=token)
- new_lockable.unlock()
- self.assertTrue(self.lockable.get_physical_lock_status())
- finally:
- self.lockable.unlock()
+ self.addCleanup(self.lockable.unlock)
+ if token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+ new_lockable = self.get_lockable()
+ new_lockable.lock_write(token=token)
+ new_lockable.unlock()
+ self.assertTrue(self.lockable.get_physical_lock_status())
def test_lock_write_with_token_fails_when_unlocked(self):
# Lock and unlock to get a superficially valid token. This mimics a
@@ -332,6 +316,11 @@
# But should be relockable with a token.
self.lockable.lock_write(token=token)
self.lockable.unlock()
+ # Cleanup: we should still be able to get the lock, but we restore the
+ # behavior to clearing the lock when unlocking.
+ self.lockable.lock_write(token=token)
+ self.lockable.dont_leave_in_place()
+ self.lockable.unlock()
def test_dont_leave_in_place(self):
token = self.lockable.lock_write()
More information about the bazaar-commits
mailing list