Rev 6017: (mbp) fix a todo about test_transport using addCleanup, in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Sat Jul 9 21:43:56 UTC 2011


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 6017 [merge]
revision-id: pqm at pqm.ubuntu.com-20110709214353-wwne4f7832x0722w
parent: pqm at pqm.ubuntu.com-20110708235815-518j7ziokxp4vbr1
parent: mbp at canonical.com-20110705025200-qu1r4t85k0z92wna
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Sat 2011-07-09 21:43:53 +0000
message:
  (mbp) fix a todo about test_transport using addCleanup,
   and also some pep8 nits (Martin Pool)
modified:
  bzrlib/tests/test_transport.py testtransport.py-20050718175618-e5cdb99f4555ddce
=== modified file 'bzrlib/tests/test_transport.py'
--- a/bzrlib/tests/test_transport.py	2011-05-13 12:51:05 +0000
+++ b/bzrlib/tests/test_transport.py	2011-07-05 02:52:00 +0000
@@ -48,78 +48,67 @@
 class TestTransport(tests.TestCase):
     """Test the non transport-concrete class functionality."""
 
-    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
-    # of try/finally -- vila 20100205
-
     def test__get_set_protocol_handlers(self):
         handlers = transport._get_protocol_handlers()
-        self.assertNotEqual([], handlers.keys( ))
-        try:
-            transport._clear_protocol_handlers()
-            self.assertEqual([], transport._get_protocol_handlers().keys())
-        finally:
-            transport._set_protocol_handlers(handlers)
+        self.assertNotEqual([], handlers.keys())
+        transport._clear_protocol_handlers()
+        self.addCleanup(transport._set_protocol_handlers, handlers)
+        self.assertEqual([], transport._get_protocol_handlers().keys())
 
     def test_get_transport_modules(self):
         handlers = transport._get_protocol_handlers()
+        self.addCleanup(transport._set_protocol_handlers, handlers)
         # don't pollute the current handlers
         transport._clear_protocol_handlers()
+
         class SampleHandler(object):
             """I exist, isnt that enough?"""
-        try:
-            transport._clear_protocol_handlers()
-            transport.register_transport_proto('foo')
-            transport.register_lazy_transport('foo',
-                                              'bzrlib.tests.test_transport',
-                                              'TestTransport.SampleHandler')
-            transport.register_transport_proto('bar')
-            transport.register_lazy_transport('bar',
-                                              'bzrlib.tests.test_transport',
-                                              'TestTransport.SampleHandler')
-            self.assertEqual([SampleHandler.__module__,
-                              'bzrlib.transport.chroot',
-                              'bzrlib.transport.pathfilter'],
-                             transport._get_transport_modules())
-        finally:
-            transport._set_protocol_handlers(handlers)
+        transport._clear_protocol_handlers()
+        transport.register_transport_proto('foo')
+        transport.register_lazy_transport('foo',
+                                            'bzrlib.tests.test_transport',
+                                            'TestTransport.SampleHandler')
+        transport.register_transport_proto('bar')
+        transport.register_lazy_transport('bar',
+                                            'bzrlib.tests.test_transport',
+                                            'TestTransport.SampleHandler')
+        self.assertEqual([SampleHandler.__module__,
+                            'bzrlib.transport.chroot',
+                            'bzrlib.transport.pathfilter'],
+                            transport._get_transport_modules())
 
     def test_transport_dependency(self):
         """Transport with missing dependency causes no error"""
         saved_handlers = transport._get_protocol_handlers()
+        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
         # don't pollute the current handlers
         transport._clear_protocol_handlers()
+        transport.register_transport_proto('foo')
+        transport.register_lazy_transport(
+            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
         try:
-            transport.register_transport_proto('foo')
-            transport.register_lazy_transport(
-                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
-            try:
-                transport.get_transport('foo://fooserver/foo')
-            except errors.UnsupportedProtocol, e:
-                e_str = str(e)
-                self.assertEquals('Unsupported protocol'
-                                  ' for url "foo://fooserver/foo":'
-                                  ' Unable to import library "some_lib":'
-                                  ' testing missing dependency', str(e))
-            else:
-                self.fail('Did not raise UnsupportedProtocol')
-        finally:
-            # restore original values
-            transport._set_protocol_handlers(saved_handlers)
+            transport.get_transport('foo://fooserver/foo')
+        except errors.UnsupportedProtocol, e:
+            e_str = str(e)
+            self.assertEquals('Unsupported protocol'
+                                ' for url "foo://fooserver/foo":'
+                                ' Unable to import library "some_lib":'
+                                ' testing missing dependency', str(e))
+        else:
+            self.fail('Did not raise UnsupportedProtocol')
 
     def test_transport_fallback(self):
         """Transport with missing dependency causes no error"""
         saved_handlers = transport._get_protocol_handlers()
-        try:
-            transport._clear_protocol_handlers()
-            transport.register_transport_proto('foo')
-            transport.register_lazy_transport(
-                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
-            transport.register_lazy_transport(
-                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
-            t = transport.get_transport('foo://fooserver/foo')
-            self.assertTrue(isinstance(t, BackupTransportHandler))
-        finally:
-            transport._set_protocol_handlers(saved_handlers)
+        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
+        transport._clear_protocol_handlers()
+        transport.register_transport_proto('foo')
+        transport.register_lazy_transport(
+            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
+        transport.register_lazy_transport(
+            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
+        t = transport.get_transport('foo://fooserver/foo')
+        self.assertTrue(isinstance(t, BackupTransportHandler))
 
     def test_ssh_hints(self):
         """Transport ssh:// should raise an error pointing out bzr+ssh://"""
@@ -218,33 +207,32 @@
 
     def test_coalesce_fudge(self):
         self.check([(10, 30, [(0, 10), (20, 10)]),
-                    (100, 10, [(0, 10),]),
+                    (100, 10, [(0, 10)]),
                    ], [(10, 10), (30, 10), (100, 10)],
-                   fudge=10
-                  )
+                   fudge=10)
+
     def test_coalesce_max_size(self):
         self.check([(10, 20, [(0, 10), (10, 10)]),
                     (30, 50, [(0, 50)]),
                     # If one range is above max_size, it gets its own coalesced
                     # offset
-                    (100, 80, [(0, 80),]),],
+                    (100, 80, [(0, 80)]),],
                    [(10, 10), (20, 10), (30, 50), (100, 80)],
-                   max_size=50
-                  )
+                   max_size=50)
 
     def test_coalesce_no_max_size(self):
-        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
+        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
                    [(10, 10), (20, 10), (30, 50), (80, 100)],
                   )
 
     def test_coalesce_default_limit(self):
         # By default we use a 100MB max size.
-        ten_mb = 10*1024*1024
-        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
+        ten_mb = 10 * 1024 * 1024
+        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
                     (10*ten_mb, ten_mb, [(0, ten_mb)])],
                    [(i*ten_mb, ten_mb) for i in range(11)])
-        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
-                   [(i*ten_mb, ten_mb) for i in range(11)],
+        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
+                   [(i * ten_mb, ten_mb) for i in range(11)],
                    max_size=1*1024*1024*1024)
 
 
@@ -422,7 +410,8 @@
             parent_url = urlutils.join(url, '..')
             new_t = transport.get_transport(parent_url)
         """
-        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
+        server = chroot.ChrootServer(
+            transport.get_transport('memory:///path/'))
         self.start_server(server)
         t = transport.get_transport(server.get_url())
         self.assertRaises(
@@ -440,11 +429,9 @@
         backing_transport = memory.MemoryTransport()
         server = chroot.ChrootServer(backing_transport)
         server.start_server()
-        try:
-            self.assertTrue(server.scheme
-                            in transport._get_protocol_handlers().keys())
-        finally:
-            server.stop_server()
+        self.addCleanup(server.stop_server)
+        self.assertTrue(server.scheme
+                        in transport._get_protocol_handlers().keys())
 
     def test_stop_server(self):
         backing_transport = memory.MemoryTransport()
@@ -458,10 +445,8 @@
         backing_transport = memory.MemoryTransport()
         server = chroot.ChrootServer(backing_transport)
         server.start_server()
-        try:
-            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
-        finally:
-            server.stop_server()
+        self.addCleanup(server.stop_server)
+        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
 
 class PathFilteringDecoratorTransportTest(tests.TestCase):
@@ -482,7 +467,7 @@
 
     def make_pf_transport(self, filter_func=None):
         """Make a PathFilteringTransport backed by a MemoryTransport.
-        
+
         :param filter_func: by default this will be a no-op function.  Use this
             parameter to override it."""
         if filter_func is None:
@@ -510,6 +495,7 @@
 
     def test_filter_invocation(self):
         filter_log = []
+
         def filter(path):
             filter_log.append(path)
             return path
@@ -776,7 +762,8 @@
     def test_relpath(self):
         t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
-        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
+        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
+            'sub')
         self.assertRaises(errors.PathNotChild, t.relpath,
                           'http://user@host.com/abs/path/sub')
         self.assertRaises(errors.PathNotChild, t.relpath,
@@ -855,7 +842,8 @@
 
     def test_get(self):
         t = transport.get_transport('trace+memory://')
-        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
+        self.assertIsInstance(
+            t, bzrlib.transport.trace.TransportTraceDecorator)
 
     def test_clone_preserves_activity(self):
         t = transport.get_transport('trace+memory://')
@@ -896,7 +884,7 @@
 class TestSSHConnections(tests.TestCaseWithTransport):
 
     def test_bzr_connect_to_bzr_ssh(self):
-        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
+        """get_transport of a bzr+ssh:// behaves correctly.
 
         bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
         """
@@ -918,6 +906,7 @@
         # SSH channel ourselves.  Surely this has already been implemented
         # elsewhere?
         started = []
+
         class StubSSHServer(stub_sftp.StubServer):
 
             test = self
@@ -929,8 +918,8 @@
                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
                 # XXX: horribly inefficient, not to mention ugly.
-                # Start a thread for each of stdin/out/err, and relay bytes from
-                # the subprocess to channel and vice versa.
+                # Start a thread for each of stdin/out/err, and relay bytes
+                # from the subprocess to channel and vice versa.
                 def ferry_bytes(read, write, close):
                     while True:
                         bytes = read(1)




More information about the bazaar-commits mailing list