Rev 2371: Implement a basic auth HTTP server, rewrite tests accordingly. in file:///v/home/vila/src/bugs/72792/
Vincent Ladeuil
v.ladeuil+lp at free.fr
Fri Apr 13 09:11:03 BST 2007
At file:///v/home/vila/src/bugs/72792/
------------------------------------------------------------
revno: 2371
revision-id: v.ladeuil+lp at free.fr-20070413081101-j0keov4vgf493m2d
parent: v.ladeuil+lp at free.fr-20070412160004-zrffqcemqyjb8gvq
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: 72792
timestamp: Fri 2007-04-13 10:11:01 +0200
message:
Implement a basic auth HTTP server, rewrite tests accordingly.
* bzrlib/transport/http/_urllib2_wrappers.py:
(PasswordManager.add_password, PasswordManager.find_user_password,
PasswordManager.reduce_uri): Copied from python-2.5 urllib2.py as
a stop gap. A python-2.4 compatible work must be found.
(Opener.preprocess_request): Deleted.
* bzrlib/tests/test_http.py:
(TestHTTPBasicAuth.create_transport_readonly_server): Use the
right auth HTTP server.
(TestHTTPBasicAuth.setUp): Plug the the server.
(TestHTTPBasicAuth.process_request): Deleted.
* bzrlib/tests/HTTPTestUtil.py:
(BasicAuthRequestHandler, AuthHTTPServer): New classes. Implement
basic authentication on HTTP server.
modified:
bzrlib/tests/HTTPTestUtil.py HTTPTestUtil.py-20050914180604-247d3aafb7a43343
bzrlib/tests/test_http.py testhttp.py-20051018020158-b2eef6e867c514d9
bzrlib/transport/http/_urllib2_wrappers.py _urllib2_wrappers.py-20060913231729-ha9ugi48ktx481ao-1
-------------- next part --------------
=== modified file 'bzrlib/tests/HTTPTestUtil.py'
--- a/bzrlib/tests/HTTPTestUtil.py 2007-04-09 04:49:55 +0000
+++ b/bzrlib/tests/HTTPTestUtil.py 2007-04-13 08:11:01 +0000
@@ -309,3 +309,38 @@
self.old_server = self.get_secondary_server()
+class BasicAuthRequestHandler(TestingHTTPRequestHandler):
+ """Requires a basic authentification to process requests."""
+
+ def do_GET(self):
+ tcs = self.server.test_case_server
+ if tcs.auth == 'basic':
+ auth_header = self.headers.get('Authorization')
+ authorized = False
+ if auth_header:
+ coded_auth = auth_header[len('Basic '):]
+ user, password = coded_auth.decode('base64').split(':')
+ authorized = tcs.authorized(user, password)
+ if not authorized:
+ self.send_response(401)
+ self.send_header('www-authenticate',
+ 'Basic realm="Thou should not pass"')
+ self.end_headers()
+ return
+
+ TestingHTTPRequestHandler.do_GET(self)
+
+class AuthHTTPServer(HttpServer):
+ """AuthHTTPServer extends HttpServer with a dictionary of passwords"""
+
+ def __init__(self, request_handler, auth):
+ HttpServer.__init__(self, request_handler)
+ # No authentification is done by default
+ self.auth = auth
+ self.password_of = {}
+
+ def add_user(self, user, password):
+ self.password_of[user] = password
+
+ def authorized(self, user, password):
+ return self.password_of[user] == password
=== modified file 'bzrlib/tests/test_http.py'
--- a/bzrlib/tests/test_http.py 2007-04-12 16:00:04 +0000
+++ b/bzrlib/tests/test_http.py 2007-04-13 08:11:01 +0000
@@ -41,8 +41,10 @@
HttpServer_urllib,
)
from bzrlib.tests.HTTPTestUtil import (
+ AuthHTTPServer,
BadProtocolRequestHandler,
BadStatusRequestHandler,
+ BasicAuthRequestHandler,
FakeProxyRequestHandler,
ForbiddenRequestHandler,
HTTPServerRedirecting,
@@ -1143,38 +1145,45 @@
class TestHTTPBasicAuth(TestCaseWithWebserver):
-
"""Test basic authentication scheme"""
_transport = HttpTransport_urllib
_auth_header = 'Authorization'
_auth_type = 'basic'
+ _request_handler_class = BasicAuthRequestHandler
def create_transport_readonly_server(self):
- return HttpServer()
+ return AuthHTTPServer(self._request_handler_class, self._auth_type)
def setUp(self):
super(TestHTTPBasicAuth, self).setUp()
- self.transport = self._transport('http://bar.com')
- self.opener = self.transport._opener
+ self.build_tree_contents([('a', 'contents of a\n'),
+ ('b', 'contents of b\n'),])
+ self.server = self.get_readonly_server()
- def process_request(self, request, user, password=None):
- request.auth = self._auth_type
- request.user = user
- request.password = password
- return self.opener.preprocess_request(request)
+ def get_user_url(self, user, password=None):
+ """Build an url embedding user and password"""
+ user_pass = None
+ if user is not None:
+ userpass = user
+ if password is not None:
+ userpass += ':' + password
+ url = '%s://' % self.server._url_protocol
+ if user is not None:
+ url += user
+ if password is not None:
+ url += ':' + password
+ url += '@'
+ url += '%s:%s/' % (self.server.host, self.server.port)
+ return url
def test_empty_pass(self):
- request = _urllib2_wrappers.Request('GET', 'http://bar.com')
- request.user = 'joe'
- request.password = ''
- request = self.process_request(request, 'joe', '')
- self.assertEqual('Basic ' + 'joe:'.encode('base64').strip(),
- request.headers[self._auth_header])
+ self.server.add_user('joe', '')
+ t = self._transport(self.get_user_url('joe', ''))
+ self.assertEqual(t.get('a').read(), 'contents of a\n')
def test_user_pass(self):
- request = _urllib2_wrappers.Request('GET', 'http://bar.com')
- request = self.process_request(request, 'joe', 'foo')
- self.assertEqual('Basic ' + 'joe:foo'.encode('base64').strip(),
- request.headers[self._auth_header])
+ self.server.add_user('joe', 'foo')
+ t = self._transport(self.get_user_url('joe', 'foo'))
+ self.assertEqual(t.get('a').read(), 'contents of a\n')
=== modified file 'bzrlib/transport/http/_urllib2_wrappers.py'
--- a/bzrlib/transport/http/_urllib2_wrappers.py 2007-04-12 16:00:04 +0000
+++ b/bzrlib/transport/http/_urllib2_wrappers.py 2007-04-13 08:11:01 +0000
@@ -197,6 +197,52 @@
def __init__(self):
urllib2.HTTPPasswordMgrWithDefaultRealm.__init__(self)
+ def add_password(self, realm, uri, user, passwd):
+ # uri could be a single URI or a sequence
+ if isinstance(uri, basestring):
+ uri = [uri]
+ if not realm in self.passwd:
+ self.passwd[realm] = {}
+ for default_port in True, False:
+ reduced_uri = tuple(
+ [self.reduce_uri(u, default_port) for u in uri])
+ self.passwd[realm][reduced_uri] = (user, passwd)
+
+ def find_user_password(self, realm, authuri):
+ domains = self.passwd.get(realm, {})
+ for default_port in True, False:
+ reduced_authuri = self.reduce_uri(authuri, default_port)
+ for uris, authinfo in domains.iteritems():
+ for uri in uris:
+ if self.is_suburi(uri, reduced_authuri):
+ return authinfo
+ if realm is not None:
+ return self.find_user_password(None, authuri)
+ return None, None
+
+ def reduce_uri(self, uri, default_port=True):
+ """Accept authority or URI and extract only the authority and path."""
+ # note HTTP URLs do not have a userinfo component
+ parts = urlparse.urlsplit(uri)
+ if parts[1]:
+ # URI
+ scheme = parts[0]
+ authority = parts[1]
+ path = parts[2] or '/'
+ else:
+ # host or host:port
+ scheme = None
+ authority = uri
+ path = '/'
+ host, port = urllib.splitport(authority)
+ if default_port and port is None and scheme is not None:
+ dport = {"http": 80,
+ "https": 443,
+ }.get(scheme)
+ if dport is not None:
+ authority = "%s:%d" % (host, dport)
+ return authority, path
+
class ConnectionHandler(urllib2.BaseHandler):
"""Provides connection-sharing by pre-processing requests.
@@ -818,14 +864,3 @@
# handler is used, when and for what.
import pprint
pprint.pprint(self._opener.__dict__)
-
- def preprocess_request(self, request):
- """Pre-process the request for test purposes"""
- protocol = request.get_type()
-
- # pre-process request
- meth_name = protocol+"_request"
- for processor in self._opener.process_request.get(protocol, []):
- meth = getattr(processor, meth_name)
- request = meth(request)
- return request
More information about the bazaar-commits
mailing list