Rev 6460: (jelmer) Add utility code for opening branches with a specific policy for in file:///srv/pqm.bazaar-vcs.org/archives/thelove/bzr/2.5/
Patch Queue Manager
pqm at pqm.ubuntu.com
Fri Jan 27 13:33:55 UTC 2012
At file:///srv/pqm.bazaar-vcs.org/archives/thelove/bzr/2.5/
------------------------------------------------------------
revno: 6460 [merge]
revision-id: pqm at pqm.ubuntu.com-20120127133353-roaqncvkncz1m2oi
parent: pqm at pqm.ubuntu.com-20120127130635-ouv0mnnesy5b2elj
parent: jelmer at samba.org-20120119005526-q2gst8d3v8qtqpot
committer: Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: 2.5
timestamp: Fri 2012-01-27 13:33:53 +0000
message:
(jelmer) Add utility code for opening branches with a specific policy for
URLs to access. (Jelmer Vernooij)
added:
bzrlib/tests/test_url_policy_open.py test_safe_open.py-20111222011853-iu027ec0o6wj9mfr-2
bzrlib/url_policy_open.py safe_open.py-20111222011853-iu027ec0o6wj9mfr-1
modified:
bzrlib/tests/__init__.py selftest.py-20050531073622-8d0e3c8845c97a64
doc/en/release-notes/bzr-2.5.txt bzr2.5.txt-20110708125756-587p0hpw7oke4h05-1
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py 2012-01-04 23:51:16 +0000
+++ b/bzrlib/tests/__init__.py 2012-01-27 13:33:53 +0000
@@ -4057,6 +4057,7 @@
'bzrlib.tests.test_revisiontree',
'bzrlib.tests.test_rio',
'bzrlib.tests.test_rules',
+ 'bzrlib.tests.test_url_policy_open',
'bzrlib.tests.test_sampler',
'bzrlib.tests.test_scenarios',
'bzrlib.tests.test_script',
=== added file 'bzrlib/tests/test_url_policy_open.py'
--- a/bzrlib/tests/test_url_policy_open.py 1970-01-01 00:00:00 +0000
+++ b/bzrlib/tests/test_url_policy_open.py 2012-01-19 00:51:24 +0000
@@ -0,0 +1,357 @@
+# Copyright (C) 2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Tests for the branch open with specific URL policy code."""
+
+from bzrlib import urlutils
+from bzrlib.branch import (
+ Branch,
+ BranchReferenceFormat,
+ )
+from bzrlib.bzrdir import (
+ BzrDir,
+ BzrProber,
+ )
+from bzrlib.controldir import ControlDirFormat
+from bzrlib.errors import NotBranchError
+from bzrlib.url_policy_open import (
+ BadUrl,
+ _BlacklistPolicy,
+ BranchLoopError,
+ BranchReferenceForbidden,
+ open_only_scheme,
+ BranchOpener,
+ WhitelistPolicy,
+ )
+from bzrlib.tests import (
+ TestCase,
+ TestCaseWithTransport,
+ )
+from bzrlib.transport import chroot
+
+
+class TestBranchOpenerCheckAndFollowBranchReference(TestCase):
+ """Unit tests for `BranchOpener.check_and_follow_branch_reference`."""
+
+ def setUp(self):
+ super(TestBranchOpenerCheckAndFollowBranchReference, self).setUp()
+ BranchOpener.install_hook()
+
+ class StubbedBranchOpener(BranchOpener):
+ """BranchOpener that provides canned answers.
+
+ We implement the methods we need to to be able to control all the
+ inputs to the `follow_reference` method, which is what is
+ being tested in this class.
+ """
+
+ def __init__(self, references, policy):
+ parent_cls = TestBranchOpenerCheckAndFollowBranchReference
+ super(parent_cls.StubbedBranchOpener, self).__init__(policy)
+ self._reference_values = {}
+ for i in range(len(references) - 1):
+ self._reference_values[references[i]] = references[i + 1]
+ self.follow_reference_calls = []
+
+ def follow_reference(self, url):
+ self.follow_reference_calls.append(url)
+ return self._reference_values[url]
+
+ def make_branch_opener(self, should_follow_references, references,
+ unsafe_urls=None):
+ policy = _BlacklistPolicy(should_follow_references, unsafe_urls)
+ opener = self.StubbedBranchOpener(references, policy)
+ return opener
+
+ def test_check_initial_url(self):
+ # check_and_follow_branch_reference rejects all URLs that are not
+ # allowed.
+ opener = self.make_branch_opener(None, [], set(['a']))
+ self.assertRaises(
+ BadUrl, opener.check_and_follow_branch_reference, 'a')
+
+ def test_not_reference(self):
+ # When branch references are forbidden, check_and_follow_branch_reference
+ # does not raise on non-references.
+ opener = self.make_branch_opener(False, ['a', None])
+ self.assertEquals(
+ 'a', opener.check_and_follow_branch_reference('a'))
+ self.assertEquals(['a'], opener.follow_reference_calls)
+
+ def test_branch_reference_forbidden(self):
+ # check_and_follow_branch_reference raises BranchReferenceForbidden if
+ # branch references are forbidden and the source URL points to a
+ # branch reference.
+ opener = self.make_branch_opener(False, ['a', 'b'])
+ self.assertRaises(
+ BranchReferenceForbidden,
+ opener.check_and_follow_branch_reference, 'a')
+ self.assertEquals(['a'], opener.follow_reference_calls)
+
+ def test_allowed_reference(self):
+ # check_and_follow_branch_reference does not raise if following references
+ # is allowed and the source URL points to a branch reference to a
+ # permitted location.
+ opener = self.make_branch_opener(True, ['a', 'b', None])
+ self.assertEquals(
+ 'b', opener.check_and_follow_branch_reference('a'))
+ self.assertEquals(['a', 'b'], opener.follow_reference_calls)
+
+ def test_check_referenced_urls(self):
+ # check_and_follow_branch_reference checks if the URL a reference points
+ # to is safe.
+ opener = self.make_branch_opener(
+ True, ['a', 'b', None], unsafe_urls=set('b'))
+ self.assertRaises(
+ BadUrl, opener.check_and_follow_branch_reference, 'a')
+ self.assertEquals(['a'], opener.follow_reference_calls)
+
+ def test_self_referencing_branch(self):
+ # check_and_follow_branch_reference raises BranchReferenceLoopError if
+ # following references is allowed and the source url points to a
+ # self-referencing branch reference.
+ opener = self.make_branch_opener(True, ['a', 'a'])
+ self.assertRaises(
+ BranchLoopError, opener.check_and_follow_branch_reference, 'a')
+ self.assertEquals(['a'], opener.follow_reference_calls)
+
+ def test_branch_reference_loop(self):
+ # check_and_follow_branch_reference raises BranchReferenceLoopError if
+ # following references is allowed and the source url points to a loop
+ # of branch references.
+ references = ['a', 'b', 'a']
+ opener = self.make_branch_opener(True, references)
+ self.assertRaises(
+ BranchLoopError, opener.check_and_follow_branch_reference, 'a')
+ self.assertEquals(['a', 'b'], opener.follow_reference_calls)
+
+
+class TrackingProber(BzrProber):
+ """Subclass of BzrProber which tracks URLs it has been asked to open."""
+
+ seen_urls = []
+
+ @classmethod
+ def probe_transport(klass, transport):
+ klass.seen_urls.append(transport.base)
+ return BzrProber.probe_transport(transport)
+
+
+class TestBranchOpenerStacking(TestCaseWithTransport):
+
+ def setUp(self):
+ super(TestBranchOpenerStacking, self).setUp()
+ BranchOpener.install_hook()
+
+ def make_branch_opener(self, allowed_urls, probers=None):
+ policy = WhitelistPolicy(True, allowed_urls, True)
+ return BranchOpener(policy, probers)
+
+ def test_probers(self):
+ # Only the specified probers should be used
+ b = self.make_branch('branch')
+ opener = self.make_branch_opener([b.base], probers=[])
+ self.assertRaises(NotBranchError, opener.open, b.base)
+ opener = self.make_branch_opener([b.base], probers=[BzrProber])
+ self.assertEquals(b.base, opener.open(b.base).base)
+
+ def test_default_probers(self):
+ # If no probers are specified to the constructor
+ # of BranchOpener, then a safe set will be used,
+ # rather than all probers registered in bzr.
+ self.addCleanup(ControlDirFormat.unregister_prober, TrackingProber)
+ ControlDirFormat.register_prober(TrackingProber)
+ # Open a location without any branches, so that all probers are
+ # tried.
+ # First, check that the TrackingProber tracks correctly.
+ TrackingProber.seen_urls = []
+ opener = self.make_branch_opener(["."], probers=[TrackingProber])
+ self.assertRaises(NotBranchError, opener.open, ".")
+ self.assertEquals(1, len(TrackingProber.seen_urls))
+ TrackingProber.seen_urls = []
+ # And make sure it's registered in such a way that BzrDir.open would
+ # use it.
+ self.assertRaises(NotBranchError, BzrDir.open, ".")
+ self.assertEquals(1, len(TrackingProber.seen_urls))
+
+ def test_allowed_url(self):
+ # the opener does not raise an exception for branches stacked on
+ # branches with allowed URLs.
+ stacked_on_branch = self.make_branch('base-branch', format='1.6')
+ stacked_branch = self.make_branch('stacked-branch', format='1.6')
+ stacked_branch.set_stacked_on_url(stacked_on_branch.base)
+ opener = self.make_branch_opener(
+ [stacked_branch.base, stacked_on_branch.base])
+ # This doesn't raise an exception.
+ opener.open(stacked_branch.base)
+
+ def test_nstackable_repository(self):
+ # treats branches with UnstackableRepositoryFormats as
+ # being not stacked.
+ branch = self.make_branch('unstacked', format='knit')
+ opener = self.make_branch_opener([branch.base])
+ # This doesn't raise an exception.
+ opener.open(branch.base)
+
+ def test_allowed_relative_url(self):
+ # passes on absolute urls to check_one_url, even if the
+ # value of stacked_on_location in the config is set to a relative URL.
+ stacked_on_branch = self.make_branch('base-branch', format='1.6')
+ stacked_branch = self.make_branch('stacked-branch', format='1.6')
+ stacked_branch.set_stacked_on_url('../base-branch')
+ opener = self.make_branch_opener(
+ [stacked_branch.base, stacked_on_branch.base])
+ # Note that stacked_on_branch.base is not '../base-branch', it's an
+ # absolute URL.
+ self.assertNotEqual('../base-branch', stacked_on_branch.base)
+ # This doesn't raise an exception.
+ opener.open(stacked_branch.base)
+
+ def test_allowed_relative_nested(self):
+ # Relative URLs are resolved relative to the stacked branch.
+ self.get_transport().mkdir('subdir')
+ a = self.make_branch('subdir/a', format='1.6')
+ b = self.make_branch('b', format='1.6')
+ b.set_stacked_on_url('../subdir/a')
+ c = self.make_branch('subdir/c', format='1.6')
+ c.set_stacked_on_url('../../b')
+ opener = self.make_branch_opener([c.base, b.base, a.base])
+ # This doesn't raise an exception.
+ opener.open(c.base)
+
+ def test_forbidden_url(self):
+ # raises a BadUrl exception if a branch is stacked on a
+ # branch with a forbidden URL.
+ stacked_on_branch = self.make_branch('base-branch', format='1.6')
+ stacked_branch = self.make_branch('stacked-branch', format='1.6')
+ stacked_branch.set_stacked_on_url(stacked_on_branch.base)
+ opener = self.make_branch_opener([stacked_branch.base])
+ self.assertRaises(BadUrl, opener.open, stacked_branch.base)
+
+ def test_forbidden_url_nested(self):
+ # raises a BadUrl exception if a branch is stacked on a
+ # branch that is in turn stacked on a branch with a forbidden URL.
+ a = self.make_branch('a', format='1.6')
+ b = self.make_branch('b', format='1.6')
+ b.set_stacked_on_url(a.base)
+ c = self.make_branch('c', format='1.6')
+ c.set_stacked_on_url(b.base)
+ opener = self.make_branch_opener([c.base, b.base])
+ self.assertRaises(BadUrl, opener.open, c.base)
+
+ def test_self_stacked_branch(self):
+ # raises StackingLoopError if a branch is stacked on
+ # itself. This avoids infinite recursion errors.
+ a = self.make_branch('a', format='1.6')
+ # Bazaar 1.17 and up make it harder to create branches like this.
+ # It's still worth testing that we don't blow up in the face of them,
+ # so we grovel around a bit to create one anyway.
+ a.get_config().set_user_option('stacked_on_location', a.base)
+ opener = self.make_branch_opener([a.base])
+ self.assertRaises(BranchLoopError, opener.open, a.base)
+
+ def test_loop_stacked_branch(self):
+ # raises StackingLoopError if a branch is stacked in such
+ # a way so that it is ultimately stacked on itself. e.g. a stacked on
+ # b stacked on a.
+ a = self.make_branch('a', format='1.6')
+ b = self.make_branch('b', format='1.6')
+ a.set_stacked_on_url(b.base)
+ b.set_stacked_on_url(a.base)
+ opener = self.make_branch_opener([a.base, b.base])
+ self.assertRaises(BranchLoopError, opener.open, a.base)
+ self.assertRaises(BranchLoopError, opener.open, b.base)
+
+ def test_custom_opener(self):
+ # A custom function for opening a control dir can be specified.
+ a = self.make_branch('a', format='2a')
+ b = self.make_branch('b', format='2a')
+ b.set_stacked_on_url(a.base)
+
+ TrackingProber.seen_urls = []
+ opener = self.make_branch_opener(
+ [a.base, b.base], probers=[TrackingProber])
+ opener.open(b.base)
+ self.assertEquals(
+ set(TrackingProber.seen_urls), set([b.base, a.base]))
+
+ def test_custom_opener_with_branch_reference(self):
+ # A custom function for opening a control dir can be specified.
+ a = self.make_branch('a', format='2a')
+ b_dir = self.make_bzrdir('b')
+ b = BranchReferenceFormat().initialize(b_dir, target_branch=a)
+ TrackingProber.seen_urls = []
+ opener = self.make_branch_opener(
+ [a.base, b.base], probers=[TrackingProber])
+ opener.open(b.base)
+ self.assertEquals(
+ set(TrackingProber.seen_urls), set([b.base, a.base]))
+
+
+class TestOpenOnlyScheme(TestCaseWithTransport):
+ """Tests for `open_only_scheme`."""
+
+ def setUp(self):
+ super(TestOpenOnlyScheme, self).setUp()
+ BranchOpener.install_hook()
+
+ def test_hook_does_not_interfere(self):
+ # The transform_fallback_location hook does not interfere with regular
+ # stacked branch access outside of open_only_scheme.
+ self.make_branch('stacked')
+ self.make_branch('stacked-on')
+ Branch.open('stacked').set_stacked_on_url('../stacked-on')
+ Branch.open('stacked')
+
+ def get_chrooted_scheme(self, relpath):
+ """Create a server that is chrooted to `relpath`.
+
+ :return: ``(scheme, get_url)`` where ``scheme`` is the scheme of the
+ chroot server and ``get_url`` returns URLs on said server.
+ """
+ transport = self.get_transport(relpath)
+ chroot_server = chroot.ChrootServer(transport)
+ chroot_server.start_server()
+ self.addCleanup(chroot_server.stop_server)
+
+ def get_url(relpath):
+ return chroot_server.get_url() + relpath
+
+ return urlutils.URL.from_string(chroot_server.get_url()).scheme, get_url
+
+ def test_stacked_within_scheme(self):
+ # A branch that is stacked on a URL of the same scheme is safe to
+ # open.
+ self.get_transport().mkdir('inside')
+ self.make_branch('inside/stacked')
+ self.make_branch('inside/stacked-on')
+ scheme, get_chrooted_url = self.get_chrooted_scheme('inside')
+ Branch.open(get_chrooted_url('stacked')).set_stacked_on_url(
+ get_chrooted_url('stacked-on'))
+ open_only_scheme(scheme, get_chrooted_url('stacked'))
+
+ def test_stacked_outside_scheme(self):
+ # A branch that is stacked on a URL that is not of the same scheme is
+ # not safe to open.
+ self.get_transport().mkdir('inside')
+ self.get_transport().mkdir('outside')
+ self.make_branch('inside/stacked')
+ self.make_branch('outside/stacked-on')
+ scheme, get_chrooted_url = self.get_chrooted_scheme('inside')
+ Branch.open(get_chrooted_url('stacked')).set_stacked_on_url(
+ self.get_url('outside/stacked-on'))
+ self.assertRaises(
+ BadUrl, open_only_scheme, scheme, get_chrooted_url('stacked'))
=== added file 'bzrlib/url_policy_open.py'
--- a/bzrlib/url_policy_open.py 1970-01-01 00:00:00 +0000
+++ b/bzrlib/url_policy_open.py 2012-01-19 00:55:26 +0000
@@ -0,0 +1,314 @@
+# Copyright (C) 2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Branch opening with URL-based restrictions."""
+
+from __future__ import absolute_import
+
+import threading
+
+from bzrlib import (
+ errors,
+ urlutils,
+ )
+from bzrlib.branch import Branch
+from bzrlib.controldir import (
+ ControlDir,
+ )
+
+
+class BadUrl(errors.BzrError):
+
+ _fmt = "Tried to access a branch from bad URL %(url)s."
+
+
+class BranchReferenceForbidden(errors.BzrError):
+
+ _fmt = ("Trying to mirror a branch reference and the branch type "
+ "does not allow references.")
+
+
+class BranchLoopError(errors.BzrError):
+ """Encountered a branch cycle.
+
+ A URL may point to a branch reference or it may point to a stacked branch.
+ In either case, it's possible for there to be a cycle in these references,
+ and this exception is raised when we detect such a cycle.
+ """
+
+ _fmt = "Encountered a branch cycle"""
+
+
+class BranchOpenPolicy(object):
+ """Policy on how to open branches.
+
+ In particular, a policy determines which branches are okay to open by
+ checking their URLs and deciding whether or not to follow branch
+ references.
+ """
+
+ def should_follow_references(self):
+ """Whether we traverse references when mirroring.
+
+ Subclasses must override this method.
+
+ If we encounter a branch reference and this returns false, an error is
+ raised.
+
+ :returns: A boolean to indicate whether to follow a branch reference.
+ """
+ raise NotImplementedError(self.should_follow_references)
+
+ def transform_fallback_location(self, branch, url):
+ """Validate, maybe modify, 'url' to be used as a stacked-on location.
+
+ :param branch: The branch that is being opened.
+ :param url: The URL that the branch provides for its stacked-on
+ location.
+ :return: (new_url, check) where 'new_url' is the URL of the branch to
+ actually open and 'check' is true if 'new_url' needs to be
+ validated by check_and_follow_branch_reference.
+ """
+ raise NotImplementedError(self.transform_fallback_location)
+
+ def check_one_url(self, url):
+ """Check a URL.
+
+ Subclasses must override this method.
+
+ :param url: The source URL to check.
+ :raise BadUrl: subclasses are expected to raise this or a subclass
+ when it finds a URL it deems to be unacceptable.
+ """
+ raise NotImplementedError(self.check_one_url)
+
+
+class _BlacklistPolicy(BranchOpenPolicy):
+ """Branch policy that forbids certain URLs.
+
+ This doesn't cope with various alternative spellings of URLs,
+ with e.g. url encoding. It's mostly useful for tests.
+ """
+
+ def __init__(self, should_follow_references, bad_urls=None):
+ if bad_urls is None:
+ bad_urls = set()
+ self._bad_urls = bad_urls
+ self._should_follow_references = should_follow_references
+
+ def should_follow_references(self):
+ return self._should_follow_references
+
+ def check_one_url(self, url):
+ if url in self._bad_urls:
+ raise BadUrl(url)
+
+ def transform_fallback_location(self, branch, url):
+ """See `BranchOpenPolicy.transform_fallback_location`.
+
+ This class is not used for testing our smarter stacking features so we
+ just do the simplest thing: return the URL that would be used anyway
+ and don't check it.
+ """
+ return urlutils.join(branch.base, url), False
+
+
+class AcceptAnythingPolicy(_BlacklistPolicy):
+ """Accept anything, to make testing easier."""
+
+ def __init__(self):
+ super(AcceptAnythingPolicy, self).__init__(True, set())
+
+
+class WhitelistPolicy(BranchOpenPolicy):
+ """Branch policy that only allows certain URLs."""
+
+ def __init__(self, should_follow_references, allowed_urls=None,
+ check=False):
+ if allowed_urls is None:
+ allowed_urls = []
+ self.allowed_urls = set(url.rstrip('/') for url in allowed_urls)
+ self.check = check
+
+ def should_follow_references(self):
+ return self._should_follow_references
+
+ def check_one_url(self, url):
+ if url.rstrip('/') not in self.allowed_urls:
+ raise BadUrl(url)
+
+ def transform_fallback_location(self, branch, url):
+ """See `BranchOpenPolicy.transform_fallback_location`.
+
+ Here we return the URL that would be used anyway and optionally check
+ it.
+ """
+ return urlutils.join(branch.base, url), self.check
+
+
+class SingleSchemePolicy(BranchOpenPolicy):
+ """Branch open policy that rejects URLs not on the given scheme."""
+
+ def __init__(self, allowed_scheme):
+ self.allowed_scheme = allowed_scheme
+
+ def should_follow_references(self):
+ return True
+
+ def transform_fallback_location(self, branch, url):
+ return urlutils.join(branch.base, url), True
+
+ def check_one_url(self, url):
+ """Check that `url` is okay to open."""
+ if urlutils.URL.from_string(str(url)).scheme != self.allowed_scheme:
+ raise BadUrl(url)
+
+
+class BranchOpener(object):
+ """Branch opener which uses a URL policy.
+
+ All locations that are opened (stacked-on branches, references) are
+ checked against a policy object.
+
+ The policy object is expected to have the following methods:
+ * check_one_url
+ * should_follow_references
+ * transform_fallback_location
+ """
+
+ _threading_data = threading.local()
+
+ def __init__(self, policy, probers=None):
+ """Create a new BranchOpener.
+
+ :param policy: The opener policy to use.
+ :param probers: Optional list of probers to allow.
+ Defaults to local and remote bzr probers.
+ """
+ self.policy = policy
+ self._seen_urls = set()
+ self.probers = probers
+
+ @classmethod
+ def install_hook(cls):
+ """Install the ``transform_fallback_location`` hook.
+
+ This is done at module import time, but transform_fallback_locationHook
+ doesn't do anything unless the `_active_openers` threading.Local
+ object has a 'opener' attribute in this thread.
+
+ This is in a module-level function rather than performed at module
+ level so that it can be called in setUp for testing `BranchOpener`
+ as bzrlib.tests.TestCase.setUp clears hooks.
+ """
+ Branch.hooks.install_named_hook(
+ 'transform_fallback_location',
+ cls.transform_fallback_locationHook,
+ 'BranchOpener.transform_fallback_locationHook')
+
+ def check_and_follow_branch_reference(self, url):
+ """Check URL (and possibly the referenced URL).
+
+ This method checks that `url` passes the policy's `check_one_url`
+ method, and if `url` refers to a branch reference, it checks whether
+ references are allowed and whether the reference's URL passes muster
+ also -- recursively, until a real branch is found.
+
+ :param url: URL to check
+ :raise BranchLoopError: If the branch references form a loop.
+ :raise BranchReferenceForbidden: If this opener forbids branch
+ references.
+ """
+ while True:
+ if url in self._seen_urls:
+ raise BranchLoopError()
+ self._seen_urls.add(url)
+ self.policy.check_one_url(url)
+ next_url = self.follow_reference(url)
+ if next_url is None:
+ return url
+ url = next_url
+ if not self.policy.should_follow_references():
+ raise BranchReferenceForbidden(url)
+
+ @classmethod
+ def transform_fallback_locationHook(cls, branch, url):
+ """Installed as the 'transform_fallback_location' Branch hook.
+
+ This method calls `transform_fallback_location` on the policy object and
+ either returns the url it provides or passes it back to
+ check_and_follow_branch_reference.
+ """
+ try:
+ opener = getattr(cls._threading_data, "opener")
+ except AttributeError:
+ return url
+ new_url, check = opener.policy.transform_fallback_location(branch, url)
+ if check:
+ return opener.check_and_follow_branch_reference(new_url)
+ else:
+ return new_url
+
+ def run_with_transform_fallback_location_hook_installed(
+ self, callable, *args, **kw):
+ if (self.transform_fallback_locationHook not in
+ Branch.hooks['transform_fallback_location']):
+ raise AssertionError("hook not installed")
+ self._threading_data.opener = self
+ try:
+ return callable(*args, **kw)
+ finally:
+ del self._threading_data.opener
+ # We reset _seen_urls here to avoid multiple calls to open giving
+ # spurious loop exceptions.
+ self._seen_urls = set()
+
+ def follow_reference(self, url):
+ """Get the branch-reference value at the specified url.
+
+ This exists as a separate method only to be overriden in unit tests.
+ """
+ bzrdir = ControlDir.open(url, probers=self.probers)
+ return bzrdir.get_branch_reference()
+
+ def open(self, url):
+ """Open the Bazaar branch at url, first checking it.
+
+ What is acceptable means is defined by the policy's `follow_reference` and
+ `check_one_url` methods.
+ """
+ if type(url) != str:
+ raise TypeError
+
+ url = self.check_and_follow_branch_reference(url)
+
+ def open_branch(url):
+ dir = ControlDir.open(url, probers=self.probers)
+ return dir.open_branch()
+ return self.run_with_transform_fallback_location_hook_installed(
+ open_branch, url)
+
+
+def open_only_scheme(allowed_scheme, url):
+ """Open the branch at `url`, only accessing URLs on `allowed_scheme`.
+
+ :raises BadUrl: An attempt was made to open a URL that was not on
+ `allowed_scheme`.
+ """
+ return BranchOpener(SingleSchemePolicy(allowed_scheme)).open(url)
+
+
+BranchOpener.install_hook()
=== modified file 'doc/en/release-notes/bzr-2.5.txt'
--- a/doc/en/release-notes/bzr-2.5.txt 2012-01-25 17:41:05 +0000
+++ b/doc/en/release-notes/bzr-2.5.txt 2012-01-27 13:33:53 +0000
@@ -82,10 +82,13 @@
* A new matcher ``RevisionHistoryMatches`` has been added. (Jelmer Vernooij)
+* Add new module ``bzrlib.url_policy_open``. (Jelmer Vernooij, #850843)
+
* ``MutableTree`` has two new hooks ``pre_transform`` and
``post_transform`` that are called for tree transform operations.
(Jelmer Vernooij, #912084)
+
Testing
*******
More information about the bazaar-commits
mailing list