Rev 2171: Completely rework chrooted transports. in sftp://bazaar.launchpad.net/%7Ebzr/bzr/hpss/
Andrew Bennetts
andrew.bennetts at canonical.com
Tue Mar 27 10:48:05 BST 2007
At sftp://bazaar.launchpad.net/%7Ebzr/bzr/hpss/
------------------------------------------------------------
revno: 2171
revision-id: andrew.bennetts at canonical.com-20070327094624-jeh2g92906trijk3
parent: andrew.bennetts at canonical.com-20070327064226-80ympk0j9aoc81oa
committer: Andrew Bennetts <andrew.bennetts at canonical.com>
branch nick: hpss
timestamp: Tue 2007-03-27 19:46:24 +1000
message:
Completely rework chrooted transports.
modified:
bzrlib/smart/server.py server.py-20061110062051-chzu10y32vx8gvur-1
bzrlib/tests/branch_implementations/test_http.py test_http.py-20060731224648-2eef7ae5yja95rya-1
bzrlib/tests/test_transport.py testtransport.py-20050718175618-e5cdb99f4555ddce
bzrlib/tests/test_transport_implementations.py test_transport_implementations.py-20051227111451-f97c5c7d5c49fce7
bzrlib/tests/test_wsgi.py test_wsgi.py-20061005091552-rz8pva0olkxv0sd8-1
bzrlib/transport/__init__.py transport.py-20050711165921-4978aa7ce1285ad5
bzrlib/transport/chroot.py chroot.py-20061011104729-0us9mgm97z378vnt-1
bzrlib/transport/http/wsgi.py wsgi.py-20061005091552-rz8pva0olkxv0sd8-2
=== modified file 'bzrlib/smart/server.py'
--- a/bzrlib/smart/server.py 2007-03-13 05:52:01 +0000
+++ b/bzrlib/smart/server.py 2007-03-27 09:46:24 +0000
@@ -113,18 +113,24 @@
def get_backing_transport(self, backing_transport_server):
"""Get a backing transport from a server we are decorating."""
- return transport.get_transport('chroot+' + backing_transport_server.get_url())
+ return transport.get_transport(backing_transport_server.get_url())
def setUp(self, backing_transport_server=None):
"""Set up server for testing"""
+ from bzrlib.transport.chroot import ChrootServer
if backing_transport_server is None:
from bzrlib.transport.local import LocalURLServer
backing_transport_server = LocalURLServer()
- self.backing_transport = self.get_backing_transport(backing_transport_server)
+ self.chroot_server = ChrootServer(
+ self.get_backing_transport(backing_transport_server))
+ self.chroot_server.setUp()
+ self.backing_transport = transport.get_transport(
+ self.chroot_server.get_url())
self.start_background_thread()
def tearDown(self):
self.stop_background_thread()
+ self.chroot_server.tearDown()
def get_url(self):
"""Return the url of the server"""
@@ -141,6 +147,6 @@
def get_backing_transport(self, backing_transport_server):
"""Get a backing transport from a server we are decorating."""
- url = 'chroot+readonly+' + backing_transport_server.get_url()
+ url = 'readonly+' + backing_transport_server.get_url()
return transport.get_transport(url)
=== modified file 'bzrlib/tests/branch_implementations/test_http.py'
--- a/bzrlib/tests/branch_implementations/test_http.py 2007-03-21 05:42:14 +0000
+++ b/bzrlib/tests/branch_implementations/test_http.py 2007-03-27 09:46:24 +0000
@@ -22,7 +22,7 @@
from bzrlib.tests.branch_implementations.test_branch import TestCaseWithBranch
from bzrlib.tests.HttpServer import HttpServer
from bzrlib.transport.local import LocalURLServer
-from bzrlib.transport.chroot import ChrootServer
+from bzrlib.transport.chroot import TestingChrootServer
class InaccessibleParentTests(TestCaseWithBranch):
@@ -37,7 +37,7 @@
def setUp(self):
super(InaccessibleParentTests, self).setUp()
if self.vfs_transport_factory is LocalURLServer:
- self.vfs_transport_factory = ChrootServer
+ self.vfs_transport_factory = TestingChrootServer
def get_branch_with_invalid_parent(self):
"""Get a branch whose get_parent will raise InaccessibleParent."""
=== modified file 'bzrlib/tests/test_transport.py'
--- a/bzrlib/tests/test_transport.py 2007-02-12 05:09:45 +0000
+++ b/bzrlib/tests/test_transport.py 2007-03-27 09:46:24 +0000
@@ -26,6 +26,7 @@
ConnectionError,
DependencyNotPresent,
FileExists,
+ InvalidURLJoin,
NoSuchFile,
PathNotChild,
TransportNotPossible,
@@ -36,10 +37,12 @@
_get_protocol_handlers,
_get_transport_modules,
get_transport,
+ _protocol_handlers,
register_lazy_transport,
_set_protocol_handlers,
Transport,
)
+from bzrlib.transport.chroot import ChrootServer
from bzrlib.transport.memory import MemoryTransport
from bzrlib.transport.local import (LocalTransport,
EmulatedWin32LocalTransport)
@@ -291,85 +294,89 @@
class ChrootDecoratorTransportTest(TestCase):
"""Chroot decoration specific tests."""
- def test_construct(self):
- from bzrlib.transport import chroot
- transport = chroot.ChrootTransportDecorator('chroot+memory:///pathA/')
- self.assertEqual('memory:///pathA/', transport.chroot_url)
- self.assertEqual('/', transport.chroot_relative)
- transport = chroot.ChrootTransportDecorator('chroot+memory:///pathA')
- self.assertEqual('memory:///pathA/', transport.chroot_url)
- self.assertEqual('/', transport.chroot_relative)
- transport = chroot.ChrootTransportDecorator(
- 'chroot+memory:///path/B', chroot='memory:///path/')
- self.assertEqual('memory:///path/', transport.chroot_url)
- self.assertEqual('/B/', transport.chroot_relative)
-
def test_abspath(self):
# The abspath is always relative to the chroot_url.
- transport = get_transport('chroot+memory:///foo/bar/')
- self.assertEqual('chroot+memory:///foo/bar/', transport.abspath('/'))
+ server = ChrootServer(get_transport('memory:///foo/bar/'))
+ server.setUp()
+ transport = get_transport(server.get_url())
+ self.assertEqual(server.get_url(), transport.abspath('/'))
subdir_transport = transport.clone('subdir')
- self.assertEqual(
- 'chroot+memory:///foo/bar/', subdir_transport.abspath('/'))
-
- def test_abspath_invalid(self):
- # You cannot have a url like "scheme:///chroot_root/..", which tries to
- # reference a location above chroot_url.
- transport = get_transport('chroot+memory:///foo/bar/')
-
- self.assertRaises(PathNotChild, transport.abspath, '..')
- self.assertRaises(PathNotChild, transport.abspath, '../..')
- self.assertRaises(PathNotChild, transport.abspath, 'foo/../..')
- self.assertRaises(PathNotChild, transport.abspath, '/..')
- self.assertRaises(PathNotChild, transport.abspath, '/foo/../..')
+ self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
+ server.tearDown()
def test_clone(self):
- transport = get_transport('chroot+memory:///foo/bar')
+ server = ChrootServer(get_transport('memory:///foo/bar/'))
+ server.setUp()
+ transport = get_transport(server.get_url())
# relpath from root and root path are the same
relpath_cloned = transport.clone('foo')
abspath_cloned = transport.clone('/foo')
- self.assertEqual(relpath_cloned.base, abspath_cloned.base)
- self.assertEqual(relpath_cloned.chroot_url, abspath_cloned.chroot_url)
- self.assertEqual(relpath_cloned.chroot_relative,
- abspath_cloned.chroot_relative)
- transport = transport.clone('subdir')
- # clone preserves chroot_url and adjusts chroot_relative
- self.assertEqual('memory:///foo/bar/', transport.chroot_url)
- self.assertEqual('/subdir/', transport.chroot_relative)
- transport = transport.clone('/otherdir')
- # clone preserves chroot_url and adjusts chroot_relative
- self.assertEqual('memory:///foo/bar/', transport.chroot_url)
- self.assertEqual('/otherdir/', transport.chroot_relative)
+ self.assertEqual(server, relpath_cloned.server)
+ self.assertEqual(server, abspath_cloned.server)
+ server.tearDown()
- def test_clone_to_root(self):
- # cloning to "/" (and similarly any offset beginning with "/") goes to
- # the chroot_url, not to root of the decorated transport.
- transport = get_transport('chroot+memory:///foo/bar/baz/')
- transport.clone('subdir')
- # now clone to "/" will take us back to the initial location, not to
- # "chroot_memory:///".
- transport.clone('/')
- self.assertEqual('chroot+memory:///foo/bar/baz/', transport.base)
-
- def test_clone_offset(self):
- # transport.clone('some offset') should call clone('some offset') on the
- # decorated transport, not some surprising variation like
- # ('/some offset').
- from bzrlib.transport import chroot
- decorated_transport = FakeTransport()
- transport = chroot.ChrootTransportDecorator(
- 'chroot+fake:///', _decorated=decorated_transport)
- transport.clone('foo/bar')
- self.assertEqual([('clone', 'foo/bar')] , decorated_transport.calls)
-
- def test_clone_multiple_levels(self):
- url='chroot+memory:///hello/'
- transport = get_transport(url)
- transport = transport.clone("subdir1")
- transport = transport.clone("subdir2")
- self.assertEqual(
- 'chroot+memory:///hello/subdir1/subdir2/', transport.base)
+ def test_chroot_url_preserves_chroot(self):
+ """Calling get_transport on a chroot transport's base should produce a
+ transport with exactly the same behaviour as the original chroot
+ transport.
+
+ This is so that it is not possible to escape a chroot by doing::
+ url = chroot_transport.base
+ parent_url = urlutils.join(url, '..')
+ new_transport = get_transport(parent_url)
+ """
+ server = ChrootServer(get_transport('memory:///path/subpath'))
+ server.setUp()
+ transport = get_transport(server.get_url())
+ new_transport = get_transport(transport.base)
+ self.assertEqual(transport.server, new_transport.server)
+ self.assertEqual(transport.base, new_transport.base)
+ server.tearDown()
+
+ def test_urljoin_preserves_chroot(self):
+ """Using urlutils.join(url, '..') on a chroot URL should not produce a
+ URL that escapes the intended chroot.
+
+ This is so that it is not possible to escape a chroot by doing::
+ url = chroot_transport.base
+ parent_url = urlutils.join(url, '..')
+ new_transport = get_transport(parent_url)
+ """
+ server = ChrootServer(get_transport('memory:///path/'))
+ server.setUp()
+ transport = get_transport(server.get_url())
+ self.assertRaises(
+ InvalidURLJoin, urlutils.join, transport.base, '..')
+ server.tearDown()
+
+
+class ChrootServerTest(TestCase):
+
+ def test_construct(self):
+ backing_transport = MemoryTransport()
+ server = ChrootServer(backing_transport)
+ self.assertEqual(backing_transport, server.backing_transport)
+
+ def test_setUp(self):
+ backing_transport = MemoryTransport()
+ server = ChrootServer(backing_transport)
+ server.setUp()
+ self.assertTrue(server.scheme in _protocol_handlers.keys())
+
+ def test_tearDown(self):
+ backing_transport = MemoryTransport()
+ server = ChrootServer(backing_transport)
+ server.setUp()
+ server.tearDown()
+ self.assertFalse(server.scheme in _protocol_handlers.keys())
+
+ def test_get_url(self):
+ backing_transport = MemoryTransport()
+ server = ChrootServer(backing_transport)
+ server.setUp()
+ self.assertEqual('chroot-%d:///' % id(server), server.get_url())
+ server.tearDown()
class FakeTransport(object):
@@ -534,11 +541,8 @@
super(TestTransportImplementation, self).setUp()
self._server = self.transport_server()
self._server.setUp()
+ self.addCleanup(self._server.tearDown)
- def tearDown(self):
- super(TestTransportImplementation, self).tearDown()
- self._server.tearDown()
-
def get_transport(self):
"""Return a connected transport to the local directory."""
base_url = self._server.get_url()
=== modified file 'bzrlib/tests/test_transport_implementations.py'
--- a/bzrlib/tests/test_transport_implementations.py 2007-03-27 06:42:26 +0000
+++ b/bzrlib/tests/test_transport_implementations.py 2007-03-27 09:46:24 +0000
@@ -81,8 +81,6 @@
def test_has_root_works(self):
current_transport = self.get_transport()
- if isinstance(current_transport, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
self.assertTrue(current_transport.has('/'))
root = current_transport.clone('/')
self.assertTrue(root.has(''))
@@ -1013,8 +1011,6 @@
def test_clone(self):
# TODO: Test that clone moves up and down the filesystem
t1 = self.get_transport()
- if isinstance(t1, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
@@ -1047,8 +1043,6 @@
def test_clone_to_root(self):
orig_transport = self.get_transport()
- if isinstance(orig_transport, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
# Repeatedly go up to a parent directory until we're at the root
# directory of this transport
root_transport = orig_transport
@@ -1077,8 +1071,6 @@
def test_clone_from_root(self):
"""At the root, cloning to a simple dir should just do string append."""
orig_transport = self.get_transport()
- if isinstance(orig_transport, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('/')")
root_transport = orig_transport.clone('/')
self.assertEqual(root_transport.base + '.bzr/',
root_transport.clone('.bzr').base)
@@ -1099,8 +1091,6 @@
def test_relpath_at_root(self):
t = self.get_transport()
- if isinstance(t, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
# clone all the way to the top
new_transport = t.clone('..')
while new_transport.base != t.base:
@@ -1116,8 +1106,6 @@
# that have aliasing problems like symlinks should go in backend
# specific test cases.
transport = self.get_transport()
- if isinstance(transport, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
self.assertEqual(transport.base + 'relpath',
transport.abspath('relpath'))
@@ -1142,8 +1130,6 @@
def test_abspath_at_root(self):
t = self.get_transport()
- if isinstance(t, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
# clone all the way to the top
new_transport = t.clone('..')
while new_transport.base != t.base:
@@ -1248,8 +1234,6 @@
# check that our server (whatever it is) is accessable reliably
# via get_transport and multiple connections share content.
transport = self.get_transport()
- if isinstance(transport, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
if transport.is_readonly():
return
transport.put_bytes('foo', 'bar')
=== modified file 'bzrlib/tests/test_wsgi.py'
--- a/bzrlib/tests/test_wsgi.py 2007-02-05 05:16:09 +0000
+++ b/bzrlib/tests/test_wsgi.py 2007-03-27 09:46:24 +0000
@@ -71,7 +71,7 @@
def test_construct(self):
app = wsgi.SmartWSGIApp(FakeTransport())
self.assertIsInstance(
- app.backing_transport, chroot.ChrootTransportDecorator)
+ app.backing_transport, chroot.ChrootTransport)
def test_http_get_rejected(self):
# GET requests are rejected.
@@ -83,11 +83,13 @@
self.assertTrue(('Allow', 'POST') in self.headers)
def test_smart_wsgi_app_uses_given_relpath(self):
+ # XXX XXX XXX update comment
# The SmartWSGIApp should use the "bzrlib.relpath" field from the
# WSGI environ to construct the transport for this request, by cloning
# its base transport with the given relpath.
transport = FakeTransport()
wsgi_app = wsgi.SmartWSGIApp(transport)
+ wsgi_app.backing_transport = transport
def make_request(transport, write_func):
request = FakeRequest(transport, write_func)
self.request = request
@@ -175,7 +177,10 @@
path_var='a path_var')
self.assertIsInstance(app, wsgi.RelpathSetter)
self.assertIsInstance(app.app, wsgi.SmartWSGIApp)
- self.assertEndsWith(app.app.backing_transport.base, 'a%20root/')
+ self.assertStartsWith(app.app.backing_transport.base, 'chroot-')
+ backing_transport = app.app.backing_transport
+ chroot_backing_transport = backing_transport.server.backing_transport
+ self.assertEndsWith(chroot_backing_transport.base, 'a%20root/')
self.assertEqual(app.prefix, 'a prefix')
self.assertEqual(app.path_var, 'a path_var')
@@ -200,29 +205,6 @@
self.assertEqual('200 OK', self.status)
self.assertEqual('error\x01incomplete request\n', response)
- def test_chrooting(self):
- # Show that requests that try to access things outside of the base
- # really will get intercepted by the ChrootTransportDecorator.
- transport = memory.MemoryTransport()
- transport.mkdir('foo')
- transport.put_bytes('foo/bar', 'this is foo/bar')
- wsgi_app = wsgi.SmartWSGIApp(transport.clone('foo'))
-
- smart_request = StringIO('mkdir\x01../bad file\x01\n0\ndone\n')
- environ = self.build_environ({
- 'REQUEST_METHOD': 'POST',
- 'CONTENT_LENGTH': len(smart_request.getvalue()),
- 'wsgi.input': smart_request,
- 'bzrlib.relpath': '.',
- })
- iterable = wsgi_app(environ, self.start_response)
- response = self.read_response(iterable)
- self.assertEqual('200 OK', self.status)
- self.assertEqual(
- "error\x01Path '../bad file' is not a child of "
- "path 'memory:///foo/'\n",
- response)
-
class FakeRequest(object):
=== modified file 'bzrlib/transport/__init__.py'
--- a/bzrlib/transport/__init__.py 2007-02-12 05:09:45 +0000
+++ b/bzrlib/transport/__init__.py 2007-03-27 09:46:24 +0000
@@ -107,6 +107,13 @@
register_transport(scheme, _loader)
+def unregister_transport(scheme, factory):
+ """Unregister a transport."""
+ _protocol_handlers[scheme].remove(factory)
+ if _protocol_handlers[scheme] == []:
+ del _protocol_handlers[scheme]
+
+
def _get_protocol_handlers():
"""Return a dictionary of {urlprefix: [factory]}"""
return _protocol_handlers
@@ -137,6 +144,8 @@
modules.add(factory.module)
else:
modules.add(factory.__module__)
+ # Add chroot directly, because there is not handler registered for it.
+ modules.add('bzrlib.transport.chroot')
result = list(modules)
result.sort()
return result
@@ -1089,7 +1098,11 @@
raise NotImplementedError
def get_bogus_url(self):
- """Return a url for this protocol, that will fail to connect."""
+ """Return a url for this protocol, that will fail to connect.
+
+ This may raise NotImplementedError to indicate that this server cannot
+ provide bogus urls.
+ """
raise NotImplementedError
@@ -1182,8 +1195,6 @@
register_lazy_transport('ftp://', 'bzrlib.transport.ftp', 'FtpTransport')
register_lazy_transport('aftp://', 'bzrlib.transport.ftp', 'FtpTransport')
register_lazy_transport('memory://', 'bzrlib.transport.memory', 'MemoryTransport')
-register_lazy_transport('chroot+', 'bzrlib.transport.chroot',
- 'ChrootTransportDecorator')
register_lazy_transport('readonly+', 'bzrlib.transport.readonly', 'ReadonlyTransportDecorator')
register_lazy_transport('fakenfs+', 'bzrlib.transport.fakenfs', 'FakeNFSTransportDecorator')
register_lazy_transport('vfat+',
=== modified file 'bzrlib/transport/chroot.py'
--- a/bzrlib/transport/chroot.py 2007-02-02 05:31:06 +0000
+++ b/bzrlib/transport/chroot.py 2007-03-27 09:46:24 +0000
@@ -20,141 +20,119 @@
from urlparse import urlparse
from bzrlib import errors, urlutils
+from bzrlib.transport import (
+ get_transport,
+ register_transport,
+ Server,
+ Transport,
+ unregister_transport,
+ )
from bzrlib.transport.decorator import TransportDecorator, DecoratorServer
-
-
-class ChrootTransportDecorator(TransportDecorator):
- """A decorator that can convert any transport to be chrooted.
-
- This is requested via the 'chroot+' prefix to get_transport().
-
- :ivar chroot_url: the root of this chroot
- :ivar chroot_relative: this transport's location relative to the chroot
- root. e.g. A chroot_relative of '/' means this location is the same as
- chroot_url.
- """
-
- def __init__(self, url, _decorated=None, chroot=None):
- super(ChrootTransportDecorator, self).__init__(url,
- _decorated=_decorated)
- if chroot is None:
- self.chroot_url = self._decorated.base
- else:
- self.chroot_url = chroot
- self.chroot_relative = '/' + self._decorated.base[len(self.chroot_url):]
-
- @classmethod
- def _get_url_prefix(self):
- """Chroot transport decorators are invoked via 'chroot+'"""
- return 'chroot+'
-
- def _ensure_relpath_is_child(self, relpath):
- abspath = self.abspath(relpath)
- chroot_base = self._get_url_prefix() + self.chroot_url
- real_relpath = urlutils.relative_url(chroot_base, abspath)
- if real_relpath == '..' or real_relpath.startswith('../'):
- raise errors.PathNotChild(relpath, self.chroot_url)
-
- # decorated methods
+from bzrlib.transport.memory import MemoryTransport
+
+
+class ChrootServer(Server):
+ """Server for chroot transports."""
+
+ def __init__(self, backing_transport):
+ self.backing_transport = backing_transport
+
+ def _factory(self, url):
+ assert url.startswith(self.scheme)
+ return ChrootTransport(self, url)
+
+ def get_url(self):
+ return self.scheme
+
+ def setUp(self):
+ self.scheme = 'chroot-%d:///' % id(self)
+ register_transport(self.scheme, self._factory)
+
+ def tearDown(self):
+ unregister_transport(self.scheme, self._factory)
+
+
+class ChrootTransport(Transport):
+
+ def __init__(self, server, base):
+ self.server = server
+ if not base.endswith('/'):
+ base += '/'
+ Transport.__init__(self, base)
+ self.base_path = self.base[len(self.server.scheme)-1:]
+ self.scheme = self.server.scheme
+
+ def _call(self, methodname, relpath, *args):
+ method = getattr(self.server.backing_transport, methodname)
+ return method(self._safe_relpath(relpath), *args)
+
+ def _safe_relpath(self, relpath):
+ safe_relpath = self._combine_paths(self.base_path, relpath)
+ assert safe_relpath.startswith('/')
+ return safe_relpath[1:]
+
+ # Transport methods
def abspath(self, relpath):
- try:
- url = urlutils.join('fake:///', relpath)
- except errors.InvalidURLJoin:
- raise errors.PathNotChild(relpath, self.chroot_url)
- normalised_abspath = url[len('fake:///'):]
- return self._get_url_prefix() + self.chroot_url + normalised_abspath[1:]
+ return self.scheme + self._safe_relpath(relpath)
def append_file(self, relpath, f, mode=None):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.append_file(self, relpath, f, mode=mode)
-
- def append_bytes(self, relpath, bytes, mode=None):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.append_bytes(self, relpath, bytes, mode=mode)
-
- def clone(self, offset=None):
- if offset is None: return self
- # the new URL we want to clone to is
- # self.chroot_url + an adjusted self.chroot_relative, with the leading
- # / removed.
- new_relpath = urlutils.joinpath(self.chroot_relative, offset)
- assert new_relpath.startswith('/')
- new_url = self.chroot_url + new_relpath[1:]
- # Clone the decorated transport according to this new path.
- assert new_url.startswith(self.chroot_url), (
- 'new_url (%r) does not start with %r'
- % (new_url, self._decorated.base))
- path = urlutils.relative_url(self._decorated.base, new_url)
- decorated_clone = self._decorated.clone(path)
- return ChrootTransportDecorator(self._get_url_prefix() + new_url,
- decorated_clone, self.chroot_url)
+ return self._call('append_file', relpath, f, mode)
+
+ def clone(self, relpath):
+ return ChrootTransport(self.server, self.abspath(relpath))
def delete(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.delete(self, relpath)
+ return self._call('delete', relpath)
def delete_tree(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.delete_tree(self, relpath)
+ return self._call('delete_tree', relpath)
def get(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.get(self, relpath)
-
- def get_bytes(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.get_bytes(self, relpath)
+ return self._call('get', relpath)
def has(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.has(self, relpath)
+ return self._call('has', relpath)
+
+ def iter_files_recursive(self):
+ backing_transport = self.server.backing_transport.clone(
+ self._safe_relpath('.'))
+ return backing_transport.iter_files_recursive()
+
+ def listable(self):
+ return self.server.backing_transport.listable()
def list_dir(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.list_dir(self, relpath)
+ return self._call('list_dir', relpath)
def lock_read(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.lock_read(self, relpath)
+ return self._call('lock_read', relpath)
def lock_write(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.lock_write(self, relpath)
+ return self._call('lock_write', relpath)
def mkdir(self, relpath, mode=None):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.mkdir(self, relpath, mode=mode)
-
- def put_bytes(self, relpath, bytes, mode=None):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.put_bytes(self, relpath, bytes, mode=mode)
+ return self._call('mkdir', relpath, mode)
def put_file(self, relpath, f, mode=None):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.put_file(self, relpath, f, mode=mode)
+ return self._call('put_file', relpath, f, mode)
def rename(self, rel_from, rel_to):
- self._ensure_relpath_is_child(rel_from)
- self._ensure_relpath_is_child(rel_to)
- return TransportDecorator.rename(self, rel_from, rel_to)
+ return self._call('rename', rel_from, self._safe_relpath(rel_to))
def rmdir(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.rmdir(self, relpath)
+ return self._call('rmdir', relpath)
def stat(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.stat(self, relpath)
-
-
-class ChrootServer(DecoratorServer):
- """Server for the ReadonlyTransportDecorator for testing with."""
-
- def get_decorator_class(self):
- return ChrootTransportDecorator
+ return self._call('stat', relpath)
+
+
+class TestingChrootServer(ChrootServer):
+
+ def __init__(self):
+ ChrootServer.__init__(self, get_transport('.'))
def get_test_permutations():
"""Return the permutations to be used in testing."""
- return [(ChrootTransportDecorator, ChrootServer),
+ return [(ChrootTransport, TestingChrootServer),
]
=== modified file 'bzrlib/transport/http/wsgi.py'
--- a/bzrlib/transport/http/wsgi.py 2006-12-21 15:50:42 +0000
+++ b/bzrlib/transport/http/wsgi.py 2007-03-27 09:46:24 +0000
@@ -95,8 +95,14 @@
# accidentally let people access locations they shouldn't.
# e.g. consider a smart server request for "get /etc/passwd" or
# something.
- self.backing_transport = chroot.ChrootTransportDecorator(
- 'chroot+' + backing_transport.base, _decorated=backing_transport)
+ self.chroot_server = chroot.ChrootServer(backing_transport)
+ self.chroot_server.setUp()
+ self.backing_transport = get_transport(self.chroot_server.get_url())
+ # XXX: explain why not:
+ #self.chroot_server.tearDown()
+
+ #self.backing_transport = chroot.ChrootTransportDecorator(
+ # 'chroot+' + backing_transport.base, _decorated=backing_transport)
def __call__(self, environ, start_response):
"""WSGI application callable."""
More information about the bazaar-commits
mailing list