Rev 4334: Fix 2 more lock-related test failures. in file:///home/vila/src/bzr/experimental/failing-lock-tests/

Vincent Ladeuil v.ladeuil+lp at free.fr
Mon May 11 08:14:59 BST 2009


At file:///home/vila/src/bzr/experimental/failing-lock-tests/

------------------------------------------------------------
revno: 4334
revision-id: v.ladeuil+lp at free.fr-20090511071458-bjghkrcdxbv87193
parent: v.ladeuil+lp at free.fr-20090508163409-83gvsoy3fuy58k2y
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: failing-lock-tests
timestamp: Mon 2009-05-11 09:14:58 +0200
message:
  Fix 2 more lock-related test failures.
  
  * tests/test_lockable_files.py:
  Replace some try/finally with an addCleanup().
  (_TestLockableFiles_mixin.test_leave_in_place): Restore clearing
  lock behavior.
-------------- next part --------------
=== modified file 'bzrlib/tests/test_lockable_files.py'
--- a/bzrlib/tests/test_lockable_files.py	2009-04-04 02:50:01 +0000
+++ b/bzrlib/tests/test_lockable_files.py	2009-05-11 07:14:58 +0000
@@ -56,72 +56,68 @@
             deprecated_in((1, 5, 0)),
             self.lockable.get_utf8, 'foo')
         self.lockable.lock_write()
-        try:
-            unicode_string = u'bar\u1234'
-            self.assertEqual(4, len(unicode_string))
-            byte_string = unicode_string.encode('utf-8')
-            self.assertEqual(6, len(byte_string))
-            self.assertRaises(UnicodeEncodeError,
-                self.applyDeprecated,
-                deprecated_in((1, 6, 0)),
-                self.lockable.put, 'foo',
-                StringIO(unicode_string))
-            self.applyDeprecated(
-                deprecated_in((1, 6, 0)),
-                self.lockable.put,
-                'foo', StringIO(byte_string))
-            byte_stream = self.applyDeprecated(
-                deprecated_in((1, 5, 0)),
-                self.lockable.get,
-                'foo')
-            self.assertEqual(byte_string, byte_stream.read())
-            unicode_stream = self.applyDeprecated(
-                deprecated_in((1, 5, 0)),
-                self.lockable.get_utf8,
-                'foo')
-            self.assertEqual(unicode_string,
-                unicode_stream.read())
-            self.assertRaises(BzrBadParameterNotString,
-                self.applyDeprecated,
-                deprecated_in((1, 6, 0)),
-                self.lockable.put_utf8,
-                'bar',
-                StringIO(unicode_string))
-            self.applyDeprecated(
-                deprecated_in((1, 6, 0)),
-                self.lockable.put_utf8,
-                'bar',
-                unicode_string)
-            unicode_stream = self.applyDeprecated(
-                deprecated_in((1, 5, 0)),
-                self.lockable.get_utf8,
-                'bar')
-            self.assertEqual(unicode_string,
-                unicode_stream.read())
-            byte_stream = self.applyDeprecated(
-                deprecated_in((1, 5, 0)),
-                self.lockable.get,
-                'bar')
-            self.assertEqual(byte_string, byte_stream.read())
-            self.applyDeprecated(
-                deprecated_in((1, 6, 0)),
-                self.lockable.put_bytes,
-                'raw', 'raw\xffbytes')
-            byte_stream = self.applyDeprecated(
-                deprecated_in((1, 5, 0)),
-                self.lockable.get,
-                'raw')
-            self.assertEqual('raw\xffbytes', byte_stream.read())
-        finally:
-            self.lockable.unlock()
+        self.addCleanup(self.lockable.unlock)
+        unicode_string = u'bar\u1234'
+        self.assertEqual(4, len(unicode_string))
+        byte_string = unicode_string.encode('utf-8')
+        self.assertEqual(6, len(byte_string))
+        self.assertRaises(UnicodeEncodeError,
+            self.applyDeprecated,
+            deprecated_in((1, 6, 0)),
+            self.lockable.put, 'foo',
+            StringIO(unicode_string))
+        self.applyDeprecated(
+            deprecated_in((1, 6, 0)),
+            self.lockable.put,
+            'foo', StringIO(byte_string))
+        byte_stream = self.applyDeprecated(
+            deprecated_in((1, 5, 0)),
+            self.lockable.get,
+            'foo')
+        self.assertEqual(byte_string, byte_stream.read())
+        unicode_stream = self.applyDeprecated(
+            deprecated_in((1, 5, 0)),
+            self.lockable.get_utf8,
+            'foo')
+        self.assertEqual(unicode_string,
+            unicode_stream.read())
+        self.assertRaises(BzrBadParameterNotString,
+            self.applyDeprecated,
+            deprecated_in((1, 6, 0)),
+            self.lockable.put_utf8,
+            'bar',
+            StringIO(unicode_string))
+        self.applyDeprecated(
+            deprecated_in((1, 6, 0)),
+            self.lockable.put_utf8,
+            'bar',
+            unicode_string)
+        unicode_stream = self.applyDeprecated(
+            deprecated_in((1, 5, 0)),
+            self.lockable.get_utf8,
+            'bar')
+        self.assertEqual(unicode_string,
+            unicode_stream.read())
+        byte_stream = self.applyDeprecated(
+            deprecated_in((1, 5, 0)),
+            self.lockable.get,
+            'bar')
+        self.assertEqual(byte_string, byte_stream.read())
+        self.applyDeprecated(
+            deprecated_in((1, 6, 0)),
+            self.lockable.put_bytes,
+            'raw', 'raw\xffbytes')
+        byte_stream = self.applyDeprecated(
+            deprecated_in((1, 5, 0)),
+            self.lockable.get,
+            'raw')
+        self.assertEqual('raw\xffbytes', byte_stream.read())
 
     def test_locks(self):
         self.lockable.lock_read()
-        try:
-            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo',
-                              StringIO('bar\u1234'))
-        finally:
-            self.lockable.unlock()
+        self.addCleanup(self.lockable.unlock)
+        self.assertRaises(ReadOnlyError, self.lockable.put, 'foo',
+                          StringIO('bar\u1234'))
 
     def test_transactions(self):
         self.assertIs(self.lockable.get_transaction().__class__,
@@ -176,92 +172,80 @@
 
     def test_lock_write_returns_None_refuses_token(self):
         token = self.lockable.lock_write()
-        try:
-            if token is not None:
-                # This test does not apply, because this lockable supports
-                # tokens.
-                raise TestNotApplicable("%r uses tokens" % (self.lockable,))
-            self.assertRaises(errors.TokenLockingNotSupported,
-                              self.lockable.lock_write, token='token')
-        finally:
-            self.lockable.unlock()
+        self.addCleanup(self.lockable.unlock)
+        if token is not None:
+            # This test does not apply, because this lockable supports
+            # tokens.
+            raise TestNotApplicable("%r uses tokens" % (self.lockable,))
+        self.assertRaises(errors.TokenLockingNotSupported,
+                          self.lockable.lock_write, token='token')
 
     def test_lock_write_returns_token_when_given_token(self):
         token = self.lockable.lock_write()
-        try:
-            if token is None:
-                # This test does not apply, because this lockable refuses
-                # tokens.
-                return
-            new_lockable = self.get_lockable()
-            token_from_new_lockable = new_lockable.lock_write(token=token)
-            try:
-                self.assertEqual(token, token_from_new_lockable)
-            finally:
-                new_lockable.unlock()
-        finally:
-            self.lockable.unlock()
+        self.addCleanup(self.lockable.unlock)
+        if token is None:
+            # This test does not apply, because this lockable refuses
+            # tokens.
+            return
+        new_lockable = self.get_lockable()
+        token_from_new_lockable = new_lockable.lock_write(token=token)
+        self.addCleanup(new_lockable.unlock)
+        self.assertEqual(token, token_from_new_lockable)
 
     def test_lock_write_raises_on_token_mismatch(self):
         token = self.lockable.lock_write()
-        try:
-            if token is None:
-                # This test does not apply, because this lockable refuses
-                # tokens.
-                return
-            different_token = token + 'xxx'
-            # Re-using the same lockable instance with a different token will
-            # raise TokenMismatch.
-            self.assertRaises(errors.TokenMismatch,
-                              self.lockable.lock_write, token=different_token)
-            # A separate instance for the same lockable will also raise
-            # TokenMismatch.
-            # This detects the case where a caller claims to have a lock (via
-            # the token) for an external resource, but doesn't (the token is
-            # different).  Clients need a separate lock object to make sure the
-            # external resource is probed, whereas the existing lock object
-            # might cache.
-            new_lockable = self.get_lockable()
-            self.assertRaises(errors.TokenMismatch,
-                              new_lockable.lock_write, token=different_token)
-        finally:
-            self.lockable.unlock()
+        self.addCleanup(self.lockable.unlock)
+        if token is None:
+            # This test does not apply, because this lockable refuses
+            # tokens.
+            return
+        different_token = token + 'xxx'
+        # Re-using the same lockable instance with a different token will
+        # raise TokenMismatch.
+        self.assertRaises(errors.TokenMismatch,
+                          self.lockable.lock_write, token=different_token)
+        # A separate instance for the same lockable will also raise
+        # TokenMismatch.
+        # This detects the case where a caller claims to have a lock (via
+        # the token) for an external resource, but doesn't (the token is
+        # different).  Clients need a separate lock object to make sure the
+        # external resource is probed, whereas the existing lock object
+        # might cache.
+        new_lockable = self.get_lockable()
+        self.assertRaises(errors.TokenMismatch,
+                          new_lockable.lock_write, token=different_token)
 
     def test_lock_write_with_matching_token(self):
         # If the token matches, so no exception is raised by lock_write.
         token = self.lockable.lock_write()
-        try:
-            if token is None:
-                # This test does not apply, because this lockable refuses
-                # tokens.
-                return
-            # The same instance will accept a second lock_write if the specified
-            # token matches.
-            self.lockable.lock_write(token=token)
-            self.lockable.unlock()
-            # Calling lock_write on a new instance for the same lockable will
-            # also succeed.
-            new_lockable = self.get_lockable()
-            new_lockable.lock_write(token=token)
-            new_lockable.unlock()
-        finally:
-            self.lockable.unlock()
+        self.addCleanup(self.lockable.unlock)
+        if token is None:
+            # This test does not apply, because this lockable refuses
+            # tokens.
+            return
+        # The same instance will accept a second lock_write if the specified
+        # token matches.
+        self.lockable.lock_write(token=token)
+        self.lockable.unlock()
+        # Calling lock_write on a new instance for the same lockable will
+        # also succeed.
+        new_lockable = self.get_lockable()
+        new_lockable.lock_write(token=token)
+        new_lockable.unlock()
 
     def test_unlock_after_lock_write_with_token(self):
         # If lock_write did not physically acquire the lock (because it was
         # passed a token), then unlock should not physically release it.
         token = self.lockable.lock_write()
-        try:
-            if token is None:
-                # This test does not apply, because this lockable refuses
-                # tokens.
-                return
-            new_lockable = self.get_lockable()
-            new_lockable.lock_write(token=token)
-            new_lockable.unlock()
-            self.assertTrue(self.lockable.get_physical_lock_status())
-        finally:
-            self.lockable.unlock()
+        self.addCleanup(self.lockable.unlock)
+        if token is None:
+            # This test does not apply, because this lockable refuses
+            # tokens.
+            return
+        new_lockable = self.get_lockable()
+        new_lockable.lock_write(token=token)
+        new_lockable.unlock()
+        self.assertTrue(self.lockable.get_physical_lock_status())
 
     def test_lock_write_with_token_fails_when_unlocked(self):
         # Lock and unlock to get a superficially valid token.  This mimics a
@@ -332,6 +316,11 @@
         # But should be relockable with a token.
         self.lockable.lock_write(token=token)
         self.lockable.unlock()
+        # Cleanup: we should still be able to get the lock, but we restore the
+        # behavior to clearing the lock when unlocking.
+        self.lockable.lock_write(token=token)
+        self.lockable.dont_leave_in_place()
+        self.lockable.unlock()
 
     def test_dont_leave_in_place(self):
         token = self.lockable.lock_write()



More information about the bazaar-commits mailing list