Rev 2381: Merge split out hpss chroot fixes which are going to pqm now. in file:///home/robertc/source/baz/hpss-test-correctness/
Robert Collins
robertc at robertcollins.net
Wed Mar 28 07:29:09 BST 2007
At file:///home/robertc/source/baz/hpss-test-correctness/
------------------------------------------------------------
revno: 2381
revision-id: robertc at robertcollins.net-20070328062906-xxq78qbuekise3nc
parent: pqm at pqm.ubuntu.com-20070328022809-40aa40f8edf4e502
parent: robertc at robertcollins.net-20070328051901-x5osh1c7fk75ira8
committer: Robert Collins <robertc at robertcollins.net>
branch nick: hpss-test-correctness
timestamp: Wed 2007-03-28 16:29:06 +1000
message:
Merge split out hpss chroot fixes which are going to pqm now.
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/tests/test_transport.py testtransport.py-20050718175618-e5cdb99f4555ddce
bzrlib/tests/test_transport_implementations.py test_transport_implementations.py-20051227111451-f97c5c7d5c49fce7
bzrlib/tests/test_wsgi.py test_wsgi.py-20061005091552-rz8pva0olkxv0sd8-1
bzrlib/transport/__init__.py transport.py-20050711165921-4978aa7ce1285ad5
bzrlib/transport/chroot.py chroot.py-20061011104729-0us9mgm97z378vnt-1
bzrlib/transport/http/wsgi.py wsgi.py-20061005091552-rz8pva0olkxv0sd8-2
------------------------------------------------------------
revno: 2379.2.3
merged: robertc at robertcollins.net-20070328051901-x5osh1c7fk75ira8
parent: robertc at robertcollins.net-20070328033235-sph663bdtwynteg9
committer: Robert Collins <robertc at robertcollins.net>
branch nick: hpss-chroot
timestamp: Wed 2007-03-28 15:19:01 +1000
message:
Review feedback.
------------------------------------------------------------
revno: 2379.2.2
merged: robertc at robertcollins.net-20070328033235-sph663bdtwynteg9
parent: robertc at robertcollins.net-20070328030232-uunyt91y60b584j9
committer: Robert Collins <robertc at robertcollins.net>
branch nick: hpss-chroot
timestamp: Wed 2007-03-28 13:32:35 +1000
message:
Further test-usability for chroots.
------------------------------------------------------------
revno: 2379.2.1
merged: robertc at robertcollins.net-20070328030232-uunyt91y60b584j9
parent: pqm at pqm.ubuntu.com-20070327081802-271be0d343108f4f
committer: Robert Collins <robertc at robertcollins.net>
branch nick: hpss-chroot
timestamp: Wed 2007-03-28 13:02:32 +1000
message:
Rewritten chroot transport that prevents accidental chroot escapes when
using urlutils against the transport's url.
(Robert Collins, Andrew Bennetts)
=== modified file 'NEWS'
--- a/NEWS 2007-03-27 07:58:34 +0000
+++ b/NEWS 2007-03-28 05:19:01 +0000
@@ -5,6 +5,12 @@
* bzrlib API compatability with 0.8 has been dropped, cleaning up some
code paths. (Robert Collins)
+ * Change the format of chroot urls so that they can be safely manipulated
+ by generic url utilities without causing the resulting urls to have
+ escaped the chroot. A side effect of this is that creating a chroot
+ requires an explicit action using a ChrootServer.
+ (Robert Collins, Andrew Bennetts)
+
TESTING:
* Added ``bzrlib.strace.strace`` which will strace a single callable and
=== modified file 'bzrlib/tests/test_transport.py'
--- a/bzrlib/tests/test_transport.py 2007-02-05 05:19:13 +0000
+++ b/bzrlib/tests/test_transport.py 2007-03-28 03:02:32 +0000
@@ -22,22 +22,26 @@
import bzrlib
from bzrlib import urlutils
-from bzrlib.errors import (NoSuchFile, FileExists,
+from bzrlib.errors import (ConnectionError,
+ DependencyNotPresent,
+ FileExists,
+ InvalidURLJoin,
+ NoSuchFile,
+ PathNotChild,
TransportNotPossible,
- ConnectionError,
- DependencyNotPresent,
UnsupportedProtocol,
- PathNotChild,
)
from bzrlib.tests import TestCase, TestCaseInTempDir
from bzrlib.transport import (_CoalescedOffset,
_get_protocol_handlers,
_get_transport_modules,
get_transport,
+ _protocol_handlers,
register_lazy_transport,
_set_protocol_handlers,
Transport,
)
+from bzrlib.transport.chroot import ChrootServer
from bzrlib.transport.memory import MemoryTransport
from bzrlib.transport.local import (LocalTransport,
EmulatedWin32LocalTransport)
@@ -67,7 +71,7 @@
_set_protocol_handlers(my_handlers)
register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
- self.assertEqual([SampleHandler.__module__],
+ self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
_get_transport_modules())
finally:
_set_protocol_handlers(handlers)
@@ -289,83 +293,89 @@
class ChrootDecoratorTransportTest(TestCase):
"""Chroot decoration specific tests."""
+ def test_abspath(self):
+ # The abspath is always relative to the chroot_url.
+ server = ChrootServer(get_transport('memory:///foo/bar/'))
+ server.setUp()
+ transport = get_transport(server.get_url())
+ self.assertEqual(server.get_url(), transport.abspath('/'))
+
+ subdir_transport = transport.clone('subdir')
+ self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
+ server.tearDown()
+
+ def test_clone(self):
+ server = ChrootServer(get_transport('memory:///foo/bar/'))
+ server.setUp()
+ transport = get_transport(server.get_url())
+ # relpath from root and root path are the same
+ relpath_cloned = transport.clone('foo')
+ abspath_cloned = transport.clone('/foo')
+ self.assertEqual(server, relpath_cloned.server)
+ self.assertEqual(server, abspath_cloned.server)
+ server.tearDown()
+
+ def test_chroot_url_preserves_chroot(self):
+ """Calling get_transport on a chroot transport's base should produce a
+ transport with exactly the same behaviour as the original chroot
+ transport.
+
+ This is so that it is not possible to escape a chroot by doing::
+ url = chroot_transport.base
+ parent_url = urlutils.join(url, '..')
+ new_transport = get_transport(parent_url)
+ """
+ server = ChrootServer(get_transport('memory:///path/subpath'))
+ server.setUp()
+ transport = get_transport(server.get_url())
+ new_transport = get_transport(transport.base)
+ self.assertEqual(transport.server, new_transport.server)
+ self.assertEqual(transport.base, new_transport.base)
+ server.tearDown()
+
+ def test_urljoin_preserves_chroot(self):
+ """Using urlutils.join(url, '..') on a chroot URL should not produce a
+ URL that escapes the intended chroot.
+
+ This is so that it is not possible to escape a chroot by doing::
+ url = chroot_transport.base
+ parent_url = urlutils.join(url, '..')
+ new_transport = get_transport(parent_url)
+ """
+ server = ChrootServer(get_transport('memory:///path/'))
+ server.setUp()
+ transport = get_transport(server.get_url())
+ self.assertRaises(
+ InvalidURLJoin, urlutils.join, transport.base, '..')
+ server.tearDown()
+
+
+class ChrootServerTest(TestCase):
+
def test_construct(self):
- from bzrlib.transport import chroot
- transport = chroot.ChrootTransportDecorator('chroot+memory:///pathA/')
- self.assertEqual('memory:///pathA/', transport.chroot_url)
-
- transport = chroot.ChrootTransportDecorator(
- 'chroot+memory:///path/B', chroot='memory:///path/')
- self.assertEqual('memory:///path/', transport.chroot_url)
-
- def test_append_file(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.append_file, '/foo', None)
-
- def test_append_bytes(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.append_bytes, '/foo', 'bytes')
-
- def test_clone(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.clone, '/foo')
-
- def test_delete(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.delete, '/foo')
-
- def test_delete_tree(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.delete_tree, '/foo')
-
- def test_get(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.get, '/foo')
-
- def test_get_bytes(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.get_bytes, '/foo')
-
- def test_has(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.has, '/foo')
-
- def test_list_dir(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.list_dir, '/foo')
-
- def test_lock_read(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.lock_read, '/foo')
-
- def test_lock_write(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.lock_write, '/foo')
-
- def test_mkdir(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.mkdir, '/foo')
-
- def test_put_bytes(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.put_bytes, '/foo', 'bytes')
-
- def test_put_file(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.put_file, '/foo', None)
-
- def test_rename(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.rename, '/aaa', 'bbb')
- self.assertRaises(PathNotChild, transport.rename, 'ccc', '/d')
-
- def test_rmdir(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.rmdir, '/foo')
-
- def test_stat(self):
- transport = get_transport('chroot+memory:///foo/bar')
- self.assertRaises(PathNotChild, transport.stat, '/foo')
+ backing_transport = MemoryTransport()
+ server = ChrootServer(backing_transport)
+ self.assertEqual(backing_transport, server.backing_transport)
+
+ def test_setUp(self):
+ backing_transport = MemoryTransport()
+ server = ChrootServer(backing_transport)
+ server.setUp()
+ self.assertTrue(server.scheme in _protocol_handlers.keys())
+
+ def test_tearDown(self):
+ backing_transport = MemoryTransport()
+ server = ChrootServer(backing_transport)
+ server.setUp()
+ server.tearDown()
+ self.assertFalse(server.scheme in _protocol_handlers.keys())
+
+ def test_get_url(self):
+ backing_transport = MemoryTransport()
+ server = ChrootServer(backing_transport)
+ server.setUp()
+ self.assertEqual('chroot-%d:///' % id(server), server.get_url())
+ server.tearDown()
class ReadonlyDecoratorTransportTest(TestCase):
@@ -515,11 +525,8 @@
super(TestTransportImplementation, self).setUp()
self._server = self.transport_server()
self._server.setUp()
+ self.addCleanup(self._server.tearDown)
- def tearDown(self):
- super(TestTransportImplementation, self).tearDown()
- self._server.tearDown()
-
def get_transport(self):
"""Return a connected transport to the local directory."""
base_url = self._server.get_url()
=== modified file 'bzrlib/tests/test_transport_implementations.py'
--- a/bzrlib/tests/test_transport_implementations.py 2007-03-13 05:27:12 +0000
+++ b/bzrlib/tests/test_transport_implementations.py 2007-03-28 03:02:32 +0000
@@ -38,7 +38,7 @@
from bzrlib.symbol_versioning import zero_eleven
from bzrlib.tests import TestCaseInTempDir, TestSkipped
from bzrlib.tests.test_transport import TestTransportImplementation
-from bzrlib.transport import memory, smart, chroot
+from bzrlib.transport import memory, smart
import bzrlib.transport
@@ -76,8 +76,6 @@
def test_has_root_works(self):
current_transport = self.get_transport()
- if isinstance(current_transport, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
self.assertTrue(current_transport.has('/'))
root = current_transport.clone('/')
self.assertTrue(root.has(''))
@@ -1008,8 +1006,6 @@
def test_clone(self):
# TODO: Test that clone moves up and down the filesystem
t1 = self.get_transport()
- if isinstance(t1, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
@@ -1042,8 +1038,6 @@
def test_clone_to_root(self):
orig_transport = self.get_transport()
- if isinstance(orig_transport, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
# Repeatedly go up to a parent directory until we're at the root
# directory of this transport
root_transport = orig_transport
@@ -1072,8 +1066,6 @@
def test_clone_from_root(self):
"""At the root, cloning to a simple dir should just do string append."""
orig_transport = self.get_transport()
- if isinstance(orig_transport, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('/')")
root_transport = orig_transport.clone('/')
self.assertEqual(root_transport.base + '.bzr/',
root_transport.clone('.bzr').base)
@@ -1094,8 +1086,6 @@
def test_relpath_at_root(self):
t = self.get_transport()
- if isinstance(t, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
# clone all the way to the top
new_transport = t.clone('..')
while new_transport.base != t.base:
@@ -1111,8 +1101,6 @@
# that have aliasing problems like symlinks should go in backend
# specific test cases.
transport = self.get_transport()
- if isinstance(transport, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
self.assertEqual(transport.base + 'relpath',
transport.abspath('relpath'))
@@ -1137,8 +1125,6 @@
def test_abspath_at_root(self):
t = self.get_transport()
- if isinstance(t, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
# clone all the way to the top
new_transport = t.clone('..')
while new_transport.base != t.base:
@@ -1243,8 +1229,6 @@
# check that our server (whatever it is) is accessable reliably
# via get_transport and multiple connections share content.
transport = self.get_transport()
- if isinstance(transport, chroot.ChrootTransportDecorator):
- raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
if transport.is_readonly():
return
transport.put_bytes('foo', 'bar')
=== modified file 'bzrlib/tests/test_wsgi.py'
--- a/bzrlib/tests/test_wsgi.py 2006-10-13 06:30:29 +0000
+++ b/bzrlib/tests/test_wsgi.py 2007-03-28 03:02:32 +0000
@@ -71,7 +71,7 @@
def test_construct(self):
app = wsgi.SmartWSGIApp(FakeTransport())
self.assertIsInstance(
- app.backing_transport, chroot.ChrootTransportDecorator)
+ app.backing_transport, chroot.ChrootTransport)
def test_http_get_rejected(self):
# GET requests are rejected.
@@ -84,10 +84,11 @@
def test_smart_wsgi_app_uses_given_relpath(self):
# The SmartWSGIApp should use the "bzrlib.relpath" field from the
- # WSGI environ to construct the transport for this request, by cloning
- # its base transport with the given relpath.
+ # WSGI environ to clone from its backing transport to get a specific
+ # transport for this request.
transport = FakeTransport()
wsgi_app = wsgi.SmartWSGIApp(transport)
+ wsgi_app.backing_transport = transport
def make_request(transport, write_func):
request = FakeRequest(transport, write_func)
self.request = request
@@ -175,7 +176,10 @@
path_var='a path_var')
self.assertIsInstance(app, wsgi.RelpathSetter)
self.assertIsInstance(app.app, wsgi.SmartWSGIApp)
- self.assertEndsWith(app.app.backing_transport.base, 'a%20root/')
+ self.assertStartsWith(app.app.backing_transport.base, 'chroot-')
+ backing_transport = app.app.backing_transport
+ chroot_backing_transport = backing_transport.server.backing_transport
+ self.assertEndsWith(chroot_backing_transport.base, 'a%20root/')
self.assertEqual(app.prefix, 'a prefix')
self.assertEqual(app.path_var, 'a path_var')
@@ -200,29 +204,6 @@
self.assertEqual('200 OK', self.status)
self.assertEqual('error\x01incomplete request\n', response)
- def test_chrooting(self):
- # Show that requests that try to access things outside of the base
- # really will get intercepted by the ChrootTransportDecorator.
- transport = memory.MemoryTransport()
- transport.mkdir('foo')
- transport.put_bytes('foo/bar', 'this is foo/bar')
- wsgi_app = wsgi.SmartWSGIApp(transport.clone('foo'))
-
- smart_request = StringIO('mkdir\x01/bad file\x01\n0\ndone\n')
- environ = self.build_environ({
- 'REQUEST_METHOD': 'POST',
- 'CONTENT_LENGTH': len(smart_request.getvalue()),
- 'wsgi.input': smart_request,
- 'bzrlib.relpath': '.',
- })
- iterable = wsgi_app(environ, self.start_response)
- response = self.read_response(iterable)
- self.assertEqual('200 OK', self.status)
- self.assertEqual(
- "error\x01Path '/bad file' is not a child of "
- "path 'memory:///foo/'\n",
- response)
-
class FakeRequest(object):
=== modified file 'bzrlib/transport/__init__.py'
--- a/bzrlib/transport/__init__.py 2007-01-26 01:23:38 +0000
+++ b/bzrlib/transport/__init__.py 2007-03-28 03:02:32 +0000
@@ -107,6 +107,13 @@
register_transport(scheme, _loader)
+def unregister_transport(scheme, factory):
+ """Unregister a transport."""
+ _protocol_handlers[scheme].remove(factory)
+ if _protocol_handlers[scheme] == []:
+ del _protocol_handlers[scheme]
+
+
def _get_protocol_handlers():
"""Return a dictionary of {urlprefix: [factory]}"""
return _protocol_handlers
@@ -137,6 +144,8 @@
modules.add(factory.module)
else:
modules.add(factory.__module__)
+ # Add chroot directly, because there is not handler registered for it.
+ modules.add('bzrlib.transport.chroot')
result = list(modules)
result.sort()
return result
@@ -1089,7 +1098,11 @@
raise NotImplementedError
def get_bogus_url(self):
- """Return a url for this protocol, that will fail to connect."""
+ """Return a url for this protocol, that will fail to connect.
+
+ This may raise NotImplementedError to indicate that this server cannot
+ provide bogus urls.
+ """
raise NotImplementedError
=== modified file 'bzrlib/transport/chroot.py'
--- a/bzrlib/transport/chroot.py 2006-12-21 13:37:29 +0000
+++ b/bzrlib/transport/chroot.py 2007-03-28 05:19:01 +0000
@@ -17,116 +17,141 @@
"""Implementation of Transport that prevents access to locations above a set
root.
"""
+from urlparse import urlparse
from bzrlib import errors, urlutils
+from bzrlib.transport import (
+ get_transport,
+ register_transport,
+ Server,
+ Transport,
+ unregister_transport,
+ )
from bzrlib.transport.decorator import TransportDecorator, DecoratorServer
-
-
-class ChrootTransportDecorator(TransportDecorator):
- """A decorator that can convert any transport to be chrooted.
-
- This is requested via the 'chrooted+' prefix to get_transport().
- """
-
- def __init__(self, url, _decorated=None, chroot=None):
- super(ChrootTransportDecorator, self).__init__(url,
- _decorated=_decorated)
- if chroot is None:
- self.chroot_url = self._decorated.base
- else:
- self.chroot_url = chroot
-
- @classmethod
- def _get_url_prefix(self):
- """Chroot transport decorators are invoked via 'chroot+'"""
- return 'chroot+'
-
- def _ensure_relpath_is_child(self, relpath):
- abspath = self.abspath(relpath)
- chroot_base = self._get_url_prefix() + self.chroot_url
- real_relpath = urlutils.relative_url(chroot_base, abspath)
- if real_relpath == '..' or real_relpath.startswith('../'):
- raise errors.PathNotChild(relpath, self.chroot_url)
-
- # decorated methods
+from bzrlib.transport.memory import MemoryTransport
+
+
+class ChrootServer(Server):
+ """User space 'chroot' facility.
+
+ The server's get_url returns the url for a chroot transport mapped to the
+ backing transport. The url is of the form chroot-xxx:/// so parent
+ directories of the backing transport are not visible. The chroot url will
+ not allow '..' sequences to result in requests to the chroot affecting
+ directories outside the backing transport.
+ """
+
+ def __init__(self, backing_transport):
+ self.backing_transport = backing_transport
+
+ def _factory(self, url):
+ assert url.startswith(self.scheme)
+ return ChrootTransport(self, url)
+
+ def get_url(self):
+ return self.scheme
+
+ def setUp(self):
+ self.scheme = 'chroot-%d:///' % id(self)
+ register_transport(self.scheme, self._factory)
+
+ def tearDown(self):
+ unregister_transport(self.scheme, self._factory)
+
+
+class ChrootTransport(Transport):
+ """A ChrootTransport.
+
+ Please see ChrootServer for details.
+ """
+
+ def __init__(self, server, base):
+ self.server = server
+ if not base.endswith('/'):
+ base += '/'
+ Transport.__init__(self, base)
+ self.base_path = self.base[len(self.server.scheme)-1:]
+ self.scheme = self.server.scheme
+
+ def _call(self, methodname, relpath, *args):
+ method = getattr(self.server.backing_transport, methodname)
+ return method(self._safe_relpath(relpath), *args)
+
+ def _safe_relpath(self, relpath):
+ safe_relpath = self._combine_paths(self.base_path, relpath)
+ assert safe_relpath.startswith('/')
+ return safe_relpath[1:]
+
+ # Transport methods
+ def abspath(self, relpath):
+ return self.scheme + self._safe_relpath(relpath)
+
def append_file(self, relpath, f, mode=None):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.append_file(self, relpath, f, mode=mode)
-
- def append_bytes(self, relpath, bytes, mode=None):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.append_bytes(self, relpath, bytes, mode=mode)
-
- def clone(self, offset=None):
- self._ensure_relpath_is_child(offset)
- return TransportDecorator.clone(self, offset)
+ return self._call('append_file', relpath, f, mode)
+
+ def clone(self, relpath):
+ return ChrootTransport(self.server, self.abspath(relpath))
def delete(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.delete(self, relpath)
+ return self._call('delete', relpath)
def delete_tree(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.delete_tree(self, relpath)
+ return self._call('delete_tree', relpath)
def get(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.get(self, relpath)
-
- def get_bytes(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.get_bytes(self, relpath)
+ return self._call('get', relpath)
def has(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.has(self, relpath)
+ return self._call('has', relpath)
+
+ def iter_files_recursive(self):
+ backing_transport = self.server.backing_transport.clone(
+ self._safe_relpath('.'))
+ return backing_transport.iter_files_recursive()
+
+ def listable(self):
+ return self.server.backing_transport.listable()
def list_dir(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.list_dir(self, relpath)
+ return self._call('list_dir', relpath)
def lock_read(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.lock_read(self, relpath)
+ return self._call('lock_read', relpath)
def lock_write(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.lock_write(self, relpath)
+ return self._call('lock_write', relpath)
def mkdir(self, relpath, mode=None):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.mkdir(self, relpath, mode=mode)
-
- def put_bytes(self, relpath, bytes, mode=None):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.put_bytes(self, relpath, bytes, mode=mode)
+ return self._call('mkdir', relpath, mode)
def put_file(self, relpath, f, mode=None):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.put_file(self, relpath, f, mode=mode)
+ return self._call('put_file', relpath, f, mode)
def rename(self, rel_from, rel_to):
- self._ensure_relpath_is_child(rel_from)
- self._ensure_relpath_is_child(rel_to)
- return TransportDecorator.rename(self, rel_from, rel_to)
+ return self._call('rename', rel_from, self._safe_relpath(rel_to))
def rmdir(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.rmdir(self, relpath)
+ return self._call('rmdir', relpath)
def stat(self, relpath):
- self._ensure_relpath_is_child(relpath)
- return TransportDecorator.stat(self, relpath)
-
-
-class ChrootServer(DecoratorServer):
- """Server for the ReadonlyTransportDecorator for testing with."""
-
- def get_decorator_class(self):
- return ChrootTransportDecorator
+ return self._call('stat', relpath)
+
+
+class TestingChrootServer(ChrootServer):
+
+ def __init__(self):
+ """TestingChrootServer is not usable until setUp is called."""
+
+ def setUp(self, backing_server=None):
+ """Setup the Chroot on backing_server."""
+ if backing_server is not None:
+ self.backing_transport = get_transport(backing_server.get_url())
+ else:
+ self.backing_transport = get_transport('.')
+ ChrootServer.setUp(self)
def get_test_permutations():
"""Return the permutations to be used in testing."""
- return [(ChrootTransportDecorator, ChrootServer),
+ return [(ChrootTransport, TestingChrootServer),
]
=== modified file 'bzrlib/transport/http/wsgi.py'
--- a/bzrlib/transport/http/wsgi.py 2006-12-15 23:59:15 +0000
+++ b/bzrlib/transport/http/wsgi.py 2007-03-28 05:19:01 +0000
@@ -94,8 +94,15 @@
# accidentally let people access locations they shouldn't.
# e.g. consider a smart server request for "get /etc/passwd" or
# something.
- self.backing_transport = chroot.ChrootTransportDecorator(
- 'chroot+' + backing_transport.base, _decorated=backing_transport)
+ self.chroot_server = chroot.ChrootServer(backing_transport)
+ self.chroot_server.setUp()
+ self.backing_transport = get_transport(self.chroot_server.get_url())
+ # While the chroot server can technically be torn down at this point,
+ # as all it does is remove the scheme registration from transport's
+ # protocol dictionary, we don't *just in case* there are parts of
+ # bzrlib that will invoke 'get_transport' on urls rather than cloning
+ # around the existing transport.
+ #self.chroot_server.tearDown()
def __call__(self, environ, start_response):
"""WSGI application callable."""
@@ -124,4 +131,4 @@
return [response_data]
def make_request(self, transport, write_func):
- return smart.SmartServerRequestProtocolOne(transport, write_func)
+ return protocol.SmartServerRequestProtocolOne(transport, write_func)
More information about the bazaar-commits
mailing list