Rev 2371: Implement a basic auth HTTP server, rewrite tests accordingly. in file:///v/home/vila/src/bugs/72792/

Vincent Ladeuil v.ladeuil+lp at
Fri Apr 13 09:11:03 BST 2007

At file:///v/home/vila/src/bugs/72792/

revno: 2371
revision-id: v.ladeuil+lp at
parent: v.ladeuil+lp at
committer: Vincent Ladeuil <v.ladeuil+lp at>
branch nick: 72792
timestamp: Fri 2007-04-13 10:11:01 +0200
  Implement a basic auth HTTP server, rewrite tests accordingly.
  * bzrlib/transport/http/
  (PasswordManager.add_password, PasswordManager.find_user_password,
  PasswordManager.reduce_uri): Copied from python-2.5 as
  a stop gap. A python-2.4 compatible work must be found.
  (Opener.preprocess_request): Deleted.
  * bzrlib/tests/
  (TestHTTPBasicAuth.create_transport_readonly_server): Use the
  right auth HTTP server.
  (TestHTTPBasicAuth.setUp): Plug the the server.
  (TestHTTPBasicAuth.process_request): Deleted.
  * bzrlib/tests/
  (BasicAuthRequestHandler, AuthHTTPServer): New classes. Implement
  basic authentication on HTTP server.
-------------- next part --------------
=== modified file 'bzrlib/tests/'
--- a/bzrlib/tests/	2007-04-09 04:49:55 +0000
+++ b/bzrlib/tests/	2007-04-13 08:11:01 +0000
@@ -309,3 +309,38 @@
        self.old_server = self.get_secondary_server()
+class BasicAuthRequestHandler(TestingHTTPRequestHandler):
+    """Requires a basic authentification to process requests."""
+    def do_GET(self):
+        tcs = self.server.test_case_server
+        if tcs.auth == 'basic':
+            auth_header = self.headers.get('Authorization')
+            authorized = False
+            if auth_header:
+                coded_auth = auth_header[len('Basic '):]
+                user, password = coded_auth.decode('base64').split(':')
+                authorized = tcs.authorized(user, password)
+            if not authorized:
+                self.send_response(401)
+                self.send_header('www-authenticate',
+                                 'Basic realm="Thou should not pass"')
+                self.end_headers()
+                return
+        TestingHTTPRequestHandler.do_GET(self)
+class AuthHTTPServer(HttpServer):
+    """AuthHTTPServer extends HttpServer with a dictionary of passwords"""
+    def __init__(self, request_handler, auth):
+        HttpServer.__init__(self, request_handler)
+        # No authentification is done by default
+        self.auth = auth
+        self.password_of = {}
+    def add_user(self, user, password):
+        self.password_of[user] = password
+    def authorized(self, user, password):
+        return self.password_of[user] == password

=== modified file 'bzrlib/tests/'
--- a/bzrlib/tests/	2007-04-12 16:00:04 +0000
+++ b/bzrlib/tests/	2007-04-13 08:11:01 +0000
@@ -41,8 +41,10 @@
 from bzrlib.tests.HTTPTestUtil import (
+    AuthHTTPServer,
+    BasicAuthRequestHandler,
@@ -1143,38 +1145,45 @@
 class TestHTTPBasicAuth(TestCaseWithWebserver):
     """Test basic authentication scheme"""
     _transport = HttpTransport_urllib
     _auth_header = 'Authorization'
     _auth_type = 'basic'
+    _request_handler_class = BasicAuthRequestHandler
     def create_transport_readonly_server(self):
-        return HttpServer()
+        return AuthHTTPServer(self._request_handler_class, self._auth_type)
     def setUp(self):
         super(TestHTTPBasicAuth, self).setUp()
-        self.transport = self._transport('')
-        self.opener = self.transport._opener
+        self.build_tree_contents([('a', 'contents of a\n'),
+                                  ('b', 'contents of b\n'),])
+        self.server = self.get_readonly_server()
-    def process_request(self, request, user, password=None):
-        request.auth = self._auth_type
-        request.user = user
-        request.password = password
-        return self.opener.preprocess_request(request)
+    def get_user_url(self, user, password=None):
+        """Build an url embedding user and password"""
+        user_pass = None
+        if user is not None:
+            userpass = user
+            if password is not None:
+                userpass += ':' + password
+        url = '%s://' % self.server._url_protocol
+        if user is not None:
+            url += user
+            if password is not None:
+                url += ':' + password
+            url += '@'
+        url += '%s:%s/' % (, self.server.port)
+        return url
     def test_empty_pass(self):
-        request = _urllib2_wrappers.Request('GET', '')
-        request.user = 'joe'
-        request.password = ''
-        request = self.process_request(request, 'joe', '')
-        self.assertEqual('Basic ' + 'joe:'.encode('base64').strip(),
-                         request.headers[self._auth_header])
+        self.server.add_user('joe', '')
+        t = self._transport(self.get_user_url('joe', ''))
+        self.assertEqual(t.get('a').read(), 'contents of a\n')
     def test_user_pass(self):
-        request = _urllib2_wrappers.Request('GET', '')
-        request = self.process_request(request, 'joe', 'foo')
-        self.assertEqual('Basic ' + 'joe:foo'.encode('base64').strip(),
-                         request.headers[self._auth_header])
+        self.server.add_user('joe', 'foo')
+        t = self._transport(self.get_user_url('joe', 'foo'))
+        self.assertEqual(t.get('a').read(), 'contents of a\n')

=== modified file 'bzrlib/transport/http/'
--- a/bzrlib/transport/http/	2007-04-12 16:00:04 +0000
+++ b/bzrlib/transport/http/	2007-04-13 08:11:01 +0000
@@ -197,6 +197,52 @@
     def __init__(self):
+    def add_password(self, realm, uri, user, passwd):
+        # uri could be a single URI or a sequence
+        if isinstance(uri, basestring):
+            uri = [uri]
+        if not realm in self.passwd:
+            self.passwd[realm] = {}
+        for default_port in True, False:
+            reduced_uri = tuple(
+                [self.reduce_uri(u, default_port) for u in uri])
+            self.passwd[realm][reduced_uri] = (user, passwd)
+    def find_user_password(self, realm, authuri):
+        domains = self.passwd.get(realm, {})
+        for default_port in True, False:
+            reduced_authuri = self.reduce_uri(authuri, default_port)
+            for uris, authinfo in domains.iteritems():
+                for uri in uris:
+                    if self.is_suburi(uri, reduced_authuri):
+                        return authinfo
+        if realm is not None:
+            return self.find_user_password(None, authuri)
+        return None, None
+    def reduce_uri(self, uri, default_port=True):
+        """Accept authority or URI and extract only the authority and path."""
+        # note HTTP URLs do not have a userinfo component
+        parts = urlparse.urlsplit(uri)
+        if parts[1]:
+            # URI
+            scheme = parts[0]
+            authority = parts[1]
+            path = parts[2] or '/'
+        else:
+            # host or host:port
+            scheme = None
+            authority = uri
+            path = '/'
+        host, port = urllib.splitport(authority)
+        if default_port and port is None and scheme is not None:
+            dport = {"http": 80,
+                     "https": 443,
+                     }.get(scheme)
+            if dport is not None:
+                authority = "%s:%d" % (host, dport)
+        return authority, path
 class ConnectionHandler(urllib2.BaseHandler):
     """Provides connection-sharing by pre-processing requests.
@@ -818,14 +864,3 @@
             # handler is used, when and for what.
             import pprint
-    def preprocess_request(self, request):
-        """Pre-process the request for test purposes"""
-        protocol = request.get_type()
-        # pre-process request
-        meth_name = protocol+"_request"
-        for processor in self._opener.process_request.get(protocol, []):
-            meth = getattr(processor, meth_name)
-            request = meth(request)
-        return request

More information about the bazaar-commits mailing list