Rev 6017: (mbp) fix a todo about test_transport using addCleanup, in file:///home/pqm/archives/thelove/bzr/%2Btrunk/
Canonical.com Patch Queue Manager
pqm at pqm.ubuntu.com
Sat Jul 9 21:43:56 UTC 2011
At file:///home/pqm/archives/thelove/bzr/%2Btrunk/
------------------------------------------------------------
revno: 6017 [merge]
revision-id: pqm at pqm.ubuntu.com-20110709214353-wwne4f7832x0722w
parent: pqm at pqm.ubuntu.com-20110708235815-518j7ziokxp4vbr1
parent: mbp at canonical.com-20110705025200-qu1r4t85k0z92wna
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Sat 2011-07-09 21:43:53 +0000
message:
(mbp) fix a todo about test_transport using addCleanup,
and also some pep8 nits (Martin Pool)
modified:
bzrlib/tests/test_transport.py testtransport.py-20050718175618-e5cdb99f4555ddce
=== modified file 'bzrlib/tests/test_transport.py'
--- a/bzrlib/tests/test_transport.py 2011-05-13 12:51:05 +0000
+++ b/bzrlib/tests/test_transport.py 2011-07-05 02:52:00 +0000
@@ -48,78 +48,67 @@
class TestTransport(tests.TestCase):
"""Test the non transport-concrete class functionality."""
- # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
- # of try/finally -- vila 20100205
-
def test__get_set_protocol_handlers(self):
handlers = transport._get_protocol_handlers()
- self.assertNotEqual([], handlers.keys( ))
- try:
- transport._clear_protocol_handlers()
- self.assertEqual([], transport._get_protocol_handlers().keys())
- finally:
- transport._set_protocol_handlers(handlers)
+ self.assertNotEqual([], handlers.keys())
+ transport._clear_protocol_handlers()
+ self.addCleanup(transport._set_protocol_handlers, handlers)
+ self.assertEqual([], transport._get_protocol_handlers().keys())
def test_get_transport_modules(self):
handlers = transport._get_protocol_handlers()
+ self.addCleanup(transport._set_protocol_handlers, handlers)
# don't pollute the current handlers
transport._clear_protocol_handlers()
+
class SampleHandler(object):
"""I exist, isnt that enough?"""
- try:
- transport._clear_protocol_handlers()
- transport.register_transport_proto('foo')
- transport.register_lazy_transport('foo',
- 'bzrlib.tests.test_transport',
- 'TestTransport.SampleHandler')
- transport.register_transport_proto('bar')
- transport.register_lazy_transport('bar',
- 'bzrlib.tests.test_transport',
- 'TestTransport.SampleHandler')
- self.assertEqual([SampleHandler.__module__,
- 'bzrlib.transport.chroot',
- 'bzrlib.transport.pathfilter'],
- transport._get_transport_modules())
- finally:
- transport._set_protocol_handlers(handlers)
+ transport._clear_protocol_handlers()
+ transport.register_transport_proto('foo')
+ transport.register_lazy_transport('foo',
+ 'bzrlib.tests.test_transport',
+ 'TestTransport.SampleHandler')
+ transport.register_transport_proto('bar')
+ transport.register_lazy_transport('bar',
+ 'bzrlib.tests.test_transport',
+ 'TestTransport.SampleHandler')
+ self.assertEqual([SampleHandler.__module__,
+ 'bzrlib.transport.chroot',
+ 'bzrlib.transport.pathfilter'],
+ transport._get_transport_modules())
def test_transport_dependency(self):
"""Transport with missing dependency causes no error"""
saved_handlers = transport._get_protocol_handlers()
+ self.addCleanup(transport._set_protocol_handlers, saved_handlers)
# don't pollute the current handlers
transport._clear_protocol_handlers()
+ transport.register_transport_proto('foo')
+ transport.register_lazy_transport(
+ 'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
try:
- transport.register_transport_proto('foo')
- transport.register_lazy_transport(
- 'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
- try:
- transport.get_transport('foo://fooserver/foo')
- except errors.UnsupportedProtocol, e:
- e_str = str(e)
- self.assertEquals('Unsupported protocol'
- ' for url "foo://fooserver/foo":'
- ' Unable to import library "some_lib":'
- ' testing missing dependency', str(e))
- else:
- self.fail('Did not raise UnsupportedProtocol')
- finally:
- # restore original values
- transport._set_protocol_handlers(saved_handlers)
+ transport.get_transport('foo://fooserver/foo')
+ except errors.UnsupportedProtocol, e:
+ e_str = str(e)
+ self.assertEquals('Unsupported protocol'
+ ' for url "foo://fooserver/foo":'
+ ' Unable to import library "some_lib":'
+ ' testing missing dependency', str(e))
+ else:
+ self.fail('Did not raise UnsupportedProtocol')
def test_transport_fallback(self):
"""Transport with missing dependency causes no error"""
saved_handlers = transport._get_protocol_handlers()
- try:
- transport._clear_protocol_handlers()
- transport.register_transport_proto('foo')
- transport.register_lazy_transport(
- 'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
- transport.register_lazy_transport(
- 'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
- t = transport.get_transport('foo://fooserver/foo')
- self.assertTrue(isinstance(t, BackupTransportHandler))
- finally:
- transport._set_protocol_handlers(saved_handlers)
+ self.addCleanup(transport._set_protocol_handlers, saved_handlers)
+ transport._clear_protocol_handlers()
+ transport.register_transport_proto('foo')
+ transport.register_lazy_transport(
+ 'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
+ transport.register_lazy_transport(
+ 'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
+ t = transport.get_transport('foo://fooserver/foo')
+ self.assertTrue(isinstance(t, BackupTransportHandler))
def test_ssh_hints(self):
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
@@ -218,33 +207,32 @@
def test_coalesce_fudge(self):
self.check([(10, 30, [(0, 10), (20, 10)]),
- (100, 10, [(0, 10),]),
+ (100, 10, [(0, 10)]),
], [(10, 10), (30, 10), (100, 10)],
- fudge=10
- )
+ fudge=10)
+
def test_coalesce_max_size(self):
self.check([(10, 20, [(0, 10), (10, 10)]),
(30, 50, [(0, 50)]),
# If one range is above max_size, it gets its own coalesced
# offset
- (100, 80, [(0, 80),]),],
+ (100, 80, [(0, 80)]),],
[(10, 10), (20, 10), (30, 50), (100, 80)],
- max_size=50
- )
+ max_size=50)
def test_coalesce_no_max_size(self):
- self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
+ self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
[(10, 10), (20, 10), (30, 50), (80, 100)],
)
def test_coalesce_default_limit(self):
# By default we use a 100MB max size.
- ten_mb = 10*1024*1024
- self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
+ ten_mb = 10 * 1024 * 1024
+ self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
(10*ten_mb, ten_mb, [(0, ten_mb)])],
[(i*ten_mb, ten_mb) for i in range(11)])
- self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
- [(i*ten_mb, ten_mb) for i in range(11)],
+ self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
+ [(i * ten_mb, ten_mb) for i in range(11)],
max_size=1*1024*1024*1024)
@@ -422,7 +410,8 @@
parent_url = urlutils.join(url, '..')
new_t = transport.get_transport(parent_url)
"""
- server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
+ server = chroot.ChrootServer(
+ transport.get_transport('memory:///path/'))
self.start_server(server)
t = transport.get_transport(server.get_url())
self.assertRaises(
@@ -440,11 +429,9 @@
backing_transport = memory.MemoryTransport()
server = chroot.ChrootServer(backing_transport)
server.start_server()
- try:
- self.assertTrue(server.scheme
- in transport._get_protocol_handlers().keys())
- finally:
- server.stop_server()
+ self.addCleanup(server.stop_server)
+ self.assertTrue(server.scheme
+ in transport._get_protocol_handlers().keys())
def test_stop_server(self):
backing_transport = memory.MemoryTransport()
@@ -458,10 +445,8 @@
backing_transport = memory.MemoryTransport()
server = chroot.ChrootServer(backing_transport)
server.start_server()
- try:
- self.assertEqual('chroot-%d:///' % id(server), server.get_url())
- finally:
- server.stop_server()
+ self.addCleanup(server.stop_server)
+ self.assertEqual('chroot-%d:///' % id(server), server.get_url())
class PathFilteringDecoratorTransportTest(tests.TestCase):
@@ -482,7 +467,7 @@
def make_pf_transport(self, filter_func=None):
"""Make a PathFilteringTransport backed by a MemoryTransport.
-
+
:param filter_func: by default this will be a no-op function. Use this
parameter to override it."""
if filter_func is None:
@@ -510,6 +495,7 @@
def test_filter_invocation(self):
filter_log = []
+
def filter(path):
filter_log.append(path)
return path
@@ -776,7 +762,8 @@
def test_relpath(self):
t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
- self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
+ self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
+ 'sub')
self.assertRaises(errors.PathNotChild, t.relpath,
'http://user@host.com/abs/path/sub')
self.assertRaises(errors.PathNotChild, t.relpath,
@@ -855,7 +842,8 @@
def test_get(self):
t = transport.get_transport('trace+memory://')
- self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
+ self.assertIsInstance(
+ t, bzrlib.transport.trace.TransportTraceDecorator)
def test_clone_preserves_activity(self):
t = transport.get_transport('trace+memory://')
@@ -896,7 +884,7 @@
class TestSSHConnections(tests.TestCaseWithTransport):
def test_bzr_connect_to_bzr_ssh(self):
- """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
+ """get_transport of a bzr+ssh:// behaves correctly.
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
"""
@@ -918,6 +906,7 @@
# SSH channel ourselves. Surely this has already been implemented
# elsewhere?
started = []
+
class StubSSHServer(stub_sftp.StubServer):
test = self
@@ -929,8 +918,8 @@
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# XXX: horribly inefficient, not to mention ugly.
- # Start a thread for each of stdin/out/err, and relay bytes from
- # the subprocess to channel and vice versa.
+ # Start a thread for each of stdin/out/err, and relay bytes
+ # from the subprocess to channel and vice versa.
def ferry_bytes(read, write, close):
while True:
bytes = read(1)
More information about the bazaar-commits
mailing list