Rev 6460: (jelmer) Add utility code for opening branches with a specific policy for in file:///srv/pqm.bazaar-vcs.org/archives/thelove/bzr/2.5/

Patch Queue Manager pqm at pqm.ubuntu.com
Fri Jan 27 13:33:55 UTC 2012


At file:///srv/pqm.bazaar-vcs.org/archives/thelove/bzr/2.5/

------------------------------------------------------------
revno: 6460 [merge]
revision-id: pqm at pqm.ubuntu.com-20120127133353-roaqncvkncz1m2oi
parent: pqm at pqm.ubuntu.com-20120127130635-ouv0mnnesy5b2elj
parent: jelmer at samba.org-20120119005526-q2gst8d3v8qtqpot
committer: Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: 2.5
timestamp: Fri 2012-01-27 13:33:53 +0000
message:
  (jelmer) Add utility code for opening branches with a specific policy for
   URLs to access. (Jelmer Vernooij)
added:
  bzrlib/tests/test_url_policy_open.py test_safe_open.py-20111222011853-iu027ec0o6wj9mfr-2
  bzrlib/url_policy_open.py      safe_open.py-20111222011853-iu027ec0o6wj9mfr-1
modified:
  bzrlib/tests/__init__.py       selftest.py-20050531073622-8d0e3c8845c97a64
  doc/en/release-notes/bzr-2.5.txt bzr2.5.txt-20110708125756-587p0hpw7oke4h05-1
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py	2012-01-04 23:51:16 +0000
+++ b/bzrlib/tests/__init__.py	2012-01-27 13:33:53 +0000
@@ -4057,6 +4057,7 @@
         'bzrlib.tests.test_revisiontree',
         'bzrlib.tests.test_rio',
         'bzrlib.tests.test_rules',
+        'bzrlib.tests.test_url_policy_open',
         'bzrlib.tests.test_sampler',
         'bzrlib.tests.test_scenarios',
         'bzrlib.tests.test_script',

=== added file 'bzrlib/tests/test_url_policy_open.py'
--- a/bzrlib/tests/test_url_policy_open.py	1970-01-01 00:00:00 +0000
+++ b/bzrlib/tests/test_url_policy_open.py	2012-01-19 00:51:24 +0000
@@ -0,0 +1,357 @@
+# Copyright (C) 2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Tests for the branch open with specific URL policy code."""
+
+from bzrlib import urlutils
+from bzrlib.branch import (
+    Branch,
+    BranchReferenceFormat,
+    )
+from bzrlib.bzrdir import (
+    BzrDir,
+    BzrProber,
+    )
+from bzrlib.controldir import ControlDirFormat
+from bzrlib.errors import NotBranchError
+from bzrlib.url_policy_open import (
+    BadUrl,
+    _BlacklistPolicy,
+    BranchLoopError,
+    BranchReferenceForbidden,
+    open_only_scheme,
+    BranchOpener,
+    WhitelistPolicy,
+    )
+from bzrlib.tests import (
+    TestCase,
+    TestCaseWithTransport,
+    )
+from bzrlib.transport import chroot
+
+
+class TestBranchOpenerCheckAndFollowBranchReference(TestCase):
+    """Unit tests for `BranchOpener.check_and_follow_branch_reference`."""
+
+    def setUp(self):
+        super(TestBranchOpenerCheckAndFollowBranchReference, self).setUp()
+        BranchOpener.install_hook()
+
+    class StubbedBranchOpener(BranchOpener):
+        """BranchOpener that provides canned answers.
+
+        We implement the methods we need to to be able to control all the
+        inputs to the `follow_reference` method, which is what is
+        being tested in this class.
+        """
+
+        def __init__(self, references, policy):
+            parent_cls = TestBranchOpenerCheckAndFollowBranchReference
+            super(parent_cls.StubbedBranchOpener, self).__init__(policy)
+            self._reference_values = {}
+            for i in range(len(references) - 1):
+                self._reference_values[references[i]] = references[i + 1]
+            self.follow_reference_calls = []
+
+        def follow_reference(self, url):
+            self.follow_reference_calls.append(url)
+            return self._reference_values[url]
+
+    def make_branch_opener(self, should_follow_references, references,
+                           unsafe_urls=None):
+        policy = _BlacklistPolicy(should_follow_references, unsafe_urls)
+        opener = self.StubbedBranchOpener(references, policy)
+        return opener
+
+    def test_check_initial_url(self):
+        # check_and_follow_branch_reference rejects all URLs that are not
+        # allowed.
+        opener = self.make_branch_opener(None, [], set(['a']))
+        self.assertRaises(
+            BadUrl, opener.check_and_follow_branch_reference, 'a')
+
+    def test_not_reference(self):
+        # When branch references are forbidden, check_and_follow_branch_reference
+        # does not raise on non-references.
+        opener = self.make_branch_opener(False, ['a', None])
+        self.assertEquals(
+            'a', opener.check_and_follow_branch_reference('a'))
+        self.assertEquals(['a'], opener.follow_reference_calls)
+
+    def test_branch_reference_forbidden(self):
+        # check_and_follow_branch_reference raises BranchReferenceForbidden if
+        # branch references are forbidden and the source URL points to a
+        # branch reference.
+        opener = self.make_branch_opener(False, ['a', 'b'])
+        self.assertRaises(
+            BranchReferenceForbidden,
+            opener.check_and_follow_branch_reference, 'a')
+        self.assertEquals(['a'], opener.follow_reference_calls)
+
+    def test_allowed_reference(self):
+        # check_and_follow_branch_reference does not raise if following references
+        # is allowed and the source URL points to a branch reference to a
+        # permitted location.
+        opener = self.make_branch_opener(True, ['a', 'b', None])
+        self.assertEquals(
+            'b', opener.check_and_follow_branch_reference('a'))
+        self.assertEquals(['a', 'b'], opener.follow_reference_calls)
+
+    def test_check_referenced_urls(self):
+        # check_and_follow_branch_reference checks if the URL a reference points
+        # to is safe.
+        opener = self.make_branch_opener(
+            True, ['a', 'b', None], unsafe_urls=set('b'))
+        self.assertRaises(
+            BadUrl, opener.check_and_follow_branch_reference, 'a')
+        self.assertEquals(['a'], opener.follow_reference_calls)
+
+    def test_self_referencing_branch(self):
+        # check_and_follow_branch_reference raises BranchReferenceLoopError if
+        # following references is allowed and the source url points to a
+        # self-referencing branch reference.
+        opener = self.make_branch_opener(True, ['a', 'a'])
+        self.assertRaises(
+            BranchLoopError, opener.check_and_follow_branch_reference, 'a')
+        self.assertEquals(['a'], opener.follow_reference_calls)
+
+    def test_branch_reference_loop(self):
+        # check_and_follow_branch_reference raises BranchReferenceLoopError if
+        # following references is allowed and the source url points to a loop
+        # of branch references.
+        references = ['a', 'b', 'a']
+        opener = self.make_branch_opener(True, references)
+        self.assertRaises(
+            BranchLoopError, opener.check_and_follow_branch_reference, 'a')
+        self.assertEquals(['a', 'b'], opener.follow_reference_calls)
+
+
+class TrackingProber(BzrProber):
+    """Subclass of BzrProber which tracks URLs it has been asked to open."""
+
+    seen_urls = []
+
+    @classmethod
+    def probe_transport(klass, transport):
+        klass.seen_urls.append(transport.base)
+        return BzrProber.probe_transport(transport)
+
+
+class TestBranchOpenerStacking(TestCaseWithTransport):
+
+    def setUp(self):
+        super(TestBranchOpenerStacking, self).setUp()
+        BranchOpener.install_hook()
+
+    def make_branch_opener(self, allowed_urls, probers=None):
+        policy = WhitelistPolicy(True, allowed_urls, True)
+        return BranchOpener(policy, probers)
+
+    def test_probers(self):
+        # Only the specified probers should be used
+        b = self.make_branch('branch')
+        opener = self.make_branch_opener([b.base], probers=[])
+        self.assertRaises(NotBranchError, opener.open, b.base)
+        opener = self.make_branch_opener([b.base], probers=[BzrProber])
+        self.assertEquals(b.base, opener.open(b.base).base)
+
+    def test_default_probers(self):
+        # If no probers are specified to the constructor
+        # of BranchOpener, then a safe set will be used,
+        # rather than all probers registered in bzr.
+        self.addCleanup(ControlDirFormat.unregister_prober, TrackingProber)
+        ControlDirFormat.register_prober(TrackingProber)
+        # Open a location without any branches, so that all probers are
+        # tried.
+        # First, check that the TrackingProber tracks correctly.
+        TrackingProber.seen_urls = []
+        opener = self.make_branch_opener(["."], probers=[TrackingProber])
+        self.assertRaises(NotBranchError, opener.open, ".")
+        self.assertEquals(1, len(TrackingProber.seen_urls))
+        TrackingProber.seen_urls = []
+        # And make sure it's registered in such a way that BzrDir.open would
+        # use it.
+        self.assertRaises(NotBranchError, BzrDir.open, ".")
+        self.assertEquals(1, len(TrackingProber.seen_urls))
+
+    def test_allowed_url(self):
+        # the opener does not raise an exception for branches stacked on
+        # branches with allowed URLs.
+        stacked_on_branch = self.make_branch('base-branch', format='1.6')
+        stacked_branch = self.make_branch('stacked-branch', format='1.6')
+        stacked_branch.set_stacked_on_url(stacked_on_branch.base)
+        opener = self.make_branch_opener(
+            [stacked_branch.base, stacked_on_branch.base])
+        # This doesn't raise an exception.
+        opener.open(stacked_branch.base)
+
+    def test_nstackable_repository(self):
+        # treats branches with UnstackableRepositoryFormats as
+        # being not stacked.
+        branch = self.make_branch('unstacked', format='knit')
+        opener = self.make_branch_opener([branch.base])
+        # This doesn't raise an exception.
+        opener.open(branch.base)
+
+    def test_allowed_relative_url(self):
+        # passes on absolute urls to check_one_url, even if the
+        # value of stacked_on_location in the config is set to a relative URL.
+        stacked_on_branch = self.make_branch('base-branch', format='1.6')
+        stacked_branch = self.make_branch('stacked-branch', format='1.6')
+        stacked_branch.set_stacked_on_url('../base-branch')
+        opener = self.make_branch_opener(
+            [stacked_branch.base, stacked_on_branch.base])
+        # Note that stacked_on_branch.base is not '../base-branch', it's an
+        # absolute URL.
+        self.assertNotEqual('../base-branch', stacked_on_branch.base)
+        # This doesn't raise an exception.
+        opener.open(stacked_branch.base)
+
+    def test_allowed_relative_nested(self):
+        # Relative URLs are resolved relative to the stacked branch.
+        self.get_transport().mkdir('subdir')
+        a = self.make_branch('subdir/a', format='1.6')
+        b = self.make_branch('b', format='1.6')
+        b.set_stacked_on_url('../subdir/a')
+        c = self.make_branch('subdir/c', format='1.6')
+        c.set_stacked_on_url('../../b')
+        opener = self.make_branch_opener([c.base, b.base, a.base])
+        # This doesn't raise an exception.
+        opener.open(c.base)
+
+    def test_forbidden_url(self):
+        # raises a BadUrl exception if a branch is stacked on a
+        # branch with a forbidden URL.
+        stacked_on_branch = self.make_branch('base-branch', format='1.6')
+        stacked_branch = self.make_branch('stacked-branch', format='1.6')
+        stacked_branch.set_stacked_on_url(stacked_on_branch.base)
+        opener = self.make_branch_opener([stacked_branch.base])
+        self.assertRaises(BadUrl, opener.open, stacked_branch.base)
+
+    def test_forbidden_url_nested(self):
+        # raises a BadUrl exception if a branch is stacked on a
+        # branch that is in turn stacked on a branch with a forbidden URL.
+        a = self.make_branch('a', format='1.6')
+        b = self.make_branch('b', format='1.6')
+        b.set_stacked_on_url(a.base)
+        c = self.make_branch('c', format='1.6')
+        c.set_stacked_on_url(b.base)
+        opener = self.make_branch_opener([c.base, b.base])
+        self.assertRaises(BadUrl, opener.open, c.base)
+
+    def test_self_stacked_branch(self):
+        # raises StackingLoopError if a branch is stacked on
+        # itself. This avoids infinite recursion errors.
+        a = self.make_branch('a', format='1.6')
+        # Bazaar 1.17 and up make it harder to create branches like this.
+        # It's still worth testing that we don't blow up in the face of them,
+        # so we grovel around a bit to create one anyway.
+        a.get_config().set_user_option('stacked_on_location', a.base)
+        opener = self.make_branch_opener([a.base])
+        self.assertRaises(BranchLoopError, opener.open, a.base)
+
+    def test_loop_stacked_branch(self):
+        # raises StackingLoopError if a branch is stacked in such
+        # a way so that it is ultimately stacked on itself. e.g. a stacked on
+        # b stacked on a.
+        a = self.make_branch('a', format='1.6')
+        b = self.make_branch('b', format='1.6')
+        a.set_stacked_on_url(b.base)
+        b.set_stacked_on_url(a.base)
+        opener = self.make_branch_opener([a.base, b.base])
+        self.assertRaises(BranchLoopError, opener.open, a.base)
+        self.assertRaises(BranchLoopError, opener.open, b.base)
+
+    def test_custom_opener(self):
+        # A custom function for opening a control dir can be specified.
+        a = self.make_branch('a', format='2a')
+        b = self.make_branch('b', format='2a')
+        b.set_stacked_on_url(a.base)
+
+        TrackingProber.seen_urls = []
+        opener = self.make_branch_opener(
+            [a.base, b.base], probers=[TrackingProber])
+        opener.open(b.base)
+        self.assertEquals(
+            set(TrackingProber.seen_urls), set([b.base, a.base]))
+
+    def test_custom_opener_with_branch_reference(self):
+        # A custom function for opening a control dir can be specified.
+        a = self.make_branch('a', format='2a')
+        b_dir = self.make_bzrdir('b')
+        b = BranchReferenceFormat().initialize(b_dir, target_branch=a)
+        TrackingProber.seen_urls = []
+        opener = self.make_branch_opener(
+            [a.base, b.base], probers=[TrackingProber])
+        opener.open(b.base)
+        self.assertEquals(
+            set(TrackingProber.seen_urls), set([b.base, a.base]))
+
+
+class TestOpenOnlyScheme(TestCaseWithTransport):
+    """Tests for `open_only_scheme`."""
+
+    def setUp(self):
+        super(TestOpenOnlyScheme, self).setUp()
+        BranchOpener.install_hook()
+
+    def test_hook_does_not_interfere(self):
+        # The transform_fallback_location hook does not interfere with regular
+        # stacked branch access outside of open_only_scheme.
+        self.make_branch('stacked')
+        self.make_branch('stacked-on')
+        Branch.open('stacked').set_stacked_on_url('../stacked-on')
+        Branch.open('stacked')
+
+    def get_chrooted_scheme(self, relpath):
+        """Create a server that is chrooted to `relpath`.
+
+        :return: ``(scheme, get_url)`` where ``scheme`` is the scheme of the
+            chroot server and ``get_url`` returns URLs on said server.
+        """
+        transport = self.get_transport(relpath)
+        chroot_server = chroot.ChrootServer(transport)
+        chroot_server.start_server()
+        self.addCleanup(chroot_server.stop_server)
+
+        def get_url(relpath):
+            return chroot_server.get_url() + relpath
+
+        return urlutils.URL.from_string(chroot_server.get_url()).scheme, get_url
+
+    def test_stacked_within_scheme(self):
+        # A branch that is stacked on a URL of the same scheme is safe to
+        # open.
+        self.get_transport().mkdir('inside')
+        self.make_branch('inside/stacked')
+        self.make_branch('inside/stacked-on')
+        scheme, get_chrooted_url = self.get_chrooted_scheme('inside')
+        Branch.open(get_chrooted_url('stacked')).set_stacked_on_url(
+            get_chrooted_url('stacked-on'))
+        open_only_scheme(scheme, get_chrooted_url('stacked'))
+
+    def test_stacked_outside_scheme(self):
+        # A branch that is stacked on a URL that is not of the same scheme is
+        # not safe to open.
+        self.get_transport().mkdir('inside')
+        self.get_transport().mkdir('outside')
+        self.make_branch('inside/stacked')
+        self.make_branch('outside/stacked-on')
+        scheme, get_chrooted_url = self.get_chrooted_scheme('inside')
+        Branch.open(get_chrooted_url('stacked')).set_stacked_on_url(
+            self.get_url('outside/stacked-on'))
+        self.assertRaises(
+            BadUrl, open_only_scheme, scheme, get_chrooted_url('stacked'))

=== added file 'bzrlib/url_policy_open.py'
--- a/bzrlib/url_policy_open.py	1970-01-01 00:00:00 +0000
+++ b/bzrlib/url_policy_open.py	2012-01-19 00:55:26 +0000
@@ -0,0 +1,314 @@
+# Copyright (C) 2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Branch opening with URL-based restrictions."""
+
+from __future__ import absolute_import
+
+import threading
+
+from bzrlib import (
+    errors,
+    urlutils,
+    )
+from bzrlib.branch import Branch
+from bzrlib.controldir import (
+    ControlDir,
+    )
+
+
+class BadUrl(errors.BzrError):
+
+    _fmt = "Tried to access a branch from bad URL %(url)s."
+
+
+class BranchReferenceForbidden(errors.BzrError):
+
+    _fmt = ("Trying to mirror a branch reference and the branch type "
+            "does not allow references.")
+
+
+class BranchLoopError(errors.BzrError):
+    """Encountered a branch cycle.
+
+    A URL may point to a branch reference or it may point to a stacked branch.
+    In either case, it's possible for there to be a cycle in these references,
+    and this exception is raised when we detect such a cycle.
+    """
+
+    _fmt = "Encountered a branch cycle"""
+
+
+class BranchOpenPolicy(object):
+    """Policy on how to open branches.
+
+    In particular, a policy determines which branches are okay to open by
+    checking their URLs and deciding whether or not to follow branch
+    references.
+    """
+
+    def should_follow_references(self):
+        """Whether we traverse references when mirroring.
+
+        Subclasses must override this method.
+
+        If we encounter a branch reference and this returns false, an error is
+        raised.
+
+        :returns: A boolean to indicate whether to follow a branch reference.
+        """
+        raise NotImplementedError(self.should_follow_references)
+
+    def transform_fallback_location(self, branch, url):
+        """Validate, maybe modify, 'url' to be used as a stacked-on location.
+
+        :param branch:  The branch that is being opened.
+        :param url: The URL that the branch provides for its stacked-on
+            location.
+        :return: (new_url, check) where 'new_url' is the URL of the branch to
+            actually open and 'check' is true if 'new_url' needs to be
+            validated by check_and_follow_branch_reference.
+        """
+        raise NotImplementedError(self.transform_fallback_location)
+
+    def check_one_url(self, url):
+        """Check a URL.
+
+        Subclasses must override this method.
+
+        :param url: The source URL to check.
+        :raise BadUrl: subclasses are expected to raise this or a subclass
+            when it finds a URL it deems to be unacceptable.
+        """
+        raise NotImplementedError(self.check_one_url)
+
+
+class _BlacklistPolicy(BranchOpenPolicy):
+    """Branch policy that forbids certain URLs.
+
+    This doesn't cope with various alternative spellings of URLs,
+    with e.g. url encoding. It's mostly useful for tests.
+    """
+
+    def __init__(self, should_follow_references, bad_urls=None):
+        if bad_urls is None:
+            bad_urls = set()
+        self._bad_urls = bad_urls
+        self._should_follow_references = should_follow_references
+
+    def should_follow_references(self):
+        return self._should_follow_references
+
+    def check_one_url(self, url):
+        if url in self._bad_urls:
+            raise BadUrl(url)
+
+    def transform_fallback_location(self, branch, url):
+        """See `BranchOpenPolicy.transform_fallback_location`.
+
+        This class is not used for testing our smarter stacking features so we
+        just do the simplest thing: return the URL that would be used anyway
+        and don't check it.
+        """
+        return urlutils.join(branch.base, url), False
+
+
+class AcceptAnythingPolicy(_BlacklistPolicy):
+    """Accept anything, to make testing easier."""
+
+    def __init__(self):
+        super(AcceptAnythingPolicy, self).__init__(True, set())
+
+
+class WhitelistPolicy(BranchOpenPolicy):
+    """Branch policy that only allows certain URLs."""
+
+    def __init__(self, should_follow_references, allowed_urls=None,
+                 check=False):
+        if allowed_urls is None:
+            allowed_urls = []
+        self.allowed_urls = set(url.rstrip('/') for url in allowed_urls)
+        self.check = check
+
+    def should_follow_references(self):
+        return self._should_follow_references
+
+    def check_one_url(self, url):
+        if url.rstrip('/') not in self.allowed_urls:
+            raise BadUrl(url)
+
+    def transform_fallback_location(self, branch, url):
+        """See `BranchOpenPolicy.transform_fallback_location`.
+
+        Here we return the URL that would be used anyway and optionally check
+        it.
+        """
+        return urlutils.join(branch.base, url), self.check
+
+
+class SingleSchemePolicy(BranchOpenPolicy):
+    """Branch open policy that rejects URLs not on the given scheme."""
+
+    def __init__(self, allowed_scheme):
+        self.allowed_scheme = allowed_scheme
+
+    def should_follow_references(self):
+        return True
+
+    def transform_fallback_location(self, branch, url):
+        return urlutils.join(branch.base, url), True
+
+    def check_one_url(self, url):
+        """Check that `url` is okay to open."""
+        if urlutils.URL.from_string(str(url)).scheme != self.allowed_scheme:
+            raise BadUrl(url)
+
+
+class BranchOpener(object):
+    """Branch opener which uses a URL policy.
+
+    All locations that are opened (stacked-on branches, references) are
+    checked against a policy object.
+
+    The policy object is expected to have the following methods:
+    * check_one_url 
+    * should_follow_references
+    * transform_fallback_location
+    """
+
+    _threading_data = threading.local()
+
+    def __init__(self, policy, probers=None):
+        """Create a new BranchOpener.
+
+        :param policy: The opener policy to use.
+        :param probers: Optional list of probers to allow.
+            Defaults to local and remote bzr probers.
+        """
+        self.policy = policy
+        self._seen_urls = set()
+        self.probers = probers
+
+    @classmethod
+    def install_hook(cls):
+        """Install the ``transform_fallback_location`` hook.
+
+        This is done at module import time, but transform_fallback_locationHook
+        doesn't do anything unless the `_active_openers` threading.Local
+        object has a 'opener' attribute in this thread.
+
+        This is in a module-level function rather than performed at module
+        level so that it can be called in setUp for testing `BranchOpener`
+        as bzrlib.tests.TestCase.setUp clears hooks.
+        """
+        Branch.hooks.install_named_hook(
+            'transform_fallback_location',
+            cls.transform_fallback_locationHook,
+            'BranchOpener.transform_fallback_locationHook')
+
+    def check_and_follow_branch_reference(self, url):
+        """Check URL (and possibly the referenced URL).
+
+        This method checks that `url` passes the policy's `check_one_url`
+        method, and if `url` refers to a branch reference, it checks whether
+        references are allowed and whether the reference's URL passes muster
+        also -- recursively, until a real branch is found.
+
+        :param url: URL to check
+        :raise BranchLoopError: If the branch references form a loop.
+        :raise BranchReferenceForbidden: If this opener forbids branch
+            references.
+        """
+        while True:
+            if url in self._seen_urls:
+                raise BranchLoopError()
+            self._seen_urls.add(url)
+            self.policy.check_one_url(url)
+            next_url = self.follow_reference(url)
+            if next_url is None:
+                return url
+            url = next_url
+            if not self.policy.should_follow_references():
+                raise BranchReferenceForbidden(url)
+
+    @classmethod
+    def transform_fallback_locationHook(cls, branch, url):
+        """Installed as the 'transform_fallback_location' Branch hook.
+
+        This method calls `transform_fallback_location` on the policy object and
+        either returns the url it provides or passes it back to
+        check_and_follow_branch_reference.
+        """
+        try:
+            opener = getattr(cls._threading_data, "opener")
+        except AttributeError:
+            return url
+        new_url, check = opener.policy.transform_fallback_location(branch, url)
+        if check:
+            return opener.check_and_follow_branch_reference(new_url)
+        else:
+            return new_url
+
+    def run_with_transform_fallback_location_hook_installed(
+            self, callable, *args, **kw):
+        if (self.transform_fallback_locationHook not in
+                Branch.hooks['transform_fallback_location']):
+            raise AssertionError("hook not installed")
+        self._threading_data.opener = self
+        try:
+            return callable(*args, **kw)
+        finally:
+            del self._threading_data.opener
+            # We reset _seen_urls here to avoid multiple calls to open giving
+            # spurious loop exceptions.
+            self._seen_urls = set()
+
+    def follow_reference(self, url):
+        """Get the branch-reference value at the specified url.
+
+        This exists as a separate method only to be overriden in unit tests.
+        """
+        bzrdir = ControlDir.open(url, probers=self.probers)
+        return bzrdir.get_branch_reference()
+
+    def open(self, url):
+        """Open the Bazaar branch at url, first checking it.
+
+        What is acceptable means is defined by the policy's `follow_reference` and
+        `check_one_url` methods.
+        """
+        if type(url) != str:
+            raise TypeError
+
+        url = self.check_and_follow_branch_reference(url)
+
+        def open_branch(url):
+            dir = ControlDir.open(url, probers=self.probers)
+            return dir.open_branch()
+        return self.run_with_transform_fallback_location_hook_installed(
+            open_branch, url)
+
+
+def open_only_scheme(allowed_scheme, url):
+    """Open the branch at `url`, only accessing URLs on `allowed_scheme`.
+
+    :raises BadUrl: An attempt was made to open a URL that was not on
+        `allowed_scheme`.
+    """
+    return BranchOpener(SingleSchemePolicy(allowed_scheme)).open(url)
+
+
+BranchOpener.install_hook()

=== modified file 'doc/en/release-notes/bzr-2.5.txt'
--- a/doc/en/release-notes/bzr-2.5.txt	2012-01-25 17:41:05 +0000
+++ b/doc/en/release-notes/bzr-2.5.txt	2012-01-27 13:33:53 +0000
@@ -82,10 +82,13 @@
 
 * A new matcher ``RevisionHistoryMatches`` has been added. (Jelmer Vernooij)
 
+* Add new module ``bzrlib.url_policy_open``. (Jelmer Vernooij, #850843)
+
 * ``MutableTree`` has two new hooks ``pre_transform`` and
   ``post_transform`` that are called for tree transform operations.
   (Jelmer Vernooij, #912084)
 
+
 Testing
 *******
 




More information about the bazaar-commits mailing list