Rev 1279: Add tests for auth code. in http://people.samba.org/bzr/jelmer/bzr-svn/0.4
Jelmer Vernooij
jelmer at samba.org
Sun Jun 22 21:17:01 BST 2008
At http://people.samba.org/bzr/jelmer/bzr-svn/0.4
------------------------------------------------------------
revno: 1279
revision-id: jelmer at samba.org-20080622201700-lbta0usx527q32w0
parent: jelmer at samba.org-20080622191537-202nhdkyj15i9mga
committer: Jelmer Vernooij <jelmer at samba.org>
branch nick: 0.4
timestamp: Sun 2008-06-22 22:17:00 +0200
message:
Add tests for auth code.
modified:
errors.py errors.py-20061226172623-w1sbj8ynpo0eojqp-1
ra.c ra.pyx-20080313140933-qybkqaxe3m4mcll7-1
tests/test_ra.py test_ra.py-20080313141743-uzsm7ejitrlqone5-1
=== modified file 'errors.py'
--- a/errors.py 2008-06-22 08:31:12 +0000
+++ b/errors.py 2008-06-22 20:17:00 +0000
@@ -55,6 +55,7 @@
ERR_CANCELLED = 200015
ERR_WC_UNSUPPORTED_FORMAT = 155021
ERR_UNKNOWN_CAPABILITY = 200026
+ERR_AUTHN_NO_PROVIDER = 215001
class NotSvnBranchPath(NotBranchError):
=== modified file 'ra.c'
--- a/ra.c 2008-06-22 16:17:37 +0000
+++ b/ra.c 2008-06-22 20:17:00 +0000
@@ -34,6 +34,7 @@
PyAPI_DATA(PyTypeObject) Reporter_Type;
PyAPI_DATA(PyTypeObject) RemoteAccess_Type;
PyAPI_DATA(PyTypeObject) AuthProvider_Type;
+PyAPI_DATA(PyTypeObject) CredentialsIter_Type;
PyAPI_DATA(PyTypeObject) TxDeltaWindowHandler_Type;
static svn_error_t *py_commit_callback(const svn_commit_info_t *commit_info, void *baton, apr_pool_t *pool)
@@ -1406,9 +1407,95 @@
return PyString_FromString(svn_auth_get_parameter(auth->auth_baton, name));
}
+typedef struct {
+ PyObject_HEAD
+ apr_pool_t *pool;
+ char *cred_kind;
+ svn_auth_iterstate_t *state;
+ void *credentials;
+} CredentialsIterObject;
+
+static PyObject *auth_first_credentials(PyObject *self, PyObject *args)
+{
+ char *cred_kind;
+ char *realmstring;
+ AuthObject *auth = (AuthObject *)self;
+ void *creds;
+ apr_pool_t *pool;
+ CredentialsIterObject *ret;
+ svn_auth_iterstate_t *state;
+
+ if (!PyArg_ParseTuple(args, "ss", &cred_kind, &realmstring))
+ return NULL;
+
+ pool = Pool(NULL);
+ if (pool == NULL)
+ return NULL;
+
+ RUN_SVN_WITH_POOL(pool,
+ svn_auth_first_credentials(&creds, &state, cred_kind, realmstring, auth->auth_baton, pool));
+
+ ret = PyObject_New(CredentialsIterObject, &CredentialsIter_Type);
+ if (ret == NULL)
+ return NULL;
+
+ ret->pool = pool;
+ ret->cred_kind = apr_pstrdup(pool, cred_kind);
+ ret->state = state;
+ ret->credentials = creds;
+
+ return (PyObject *)ret;
+}
+
+static void credentials_iter_dealloc(PyObject *self)
+{
+ CredentialsIterObject *credsiter = (CredentialsIterObject *)self;
+ apr_pool_destroy(credsiter->pool);
+ PyObject_Del(self);
+}
+
+static PyObject *credentials_iter_next(CredentialsIterObject *iterator)
+{
+ PyObject *ret;
+
+ if (iterator->credentials == NULL) {
+ PyErr_SetString(PyExc_StopIteration, "No more credentials available");
+ return NULL;
+ }
+
+ if (!strcmp(iterator->cred_kind, SVN_AUTH_CRED_SIMPLE)) {
+ svn_auth_cred_simple_t *simple = iterator->credentials;
+ ret = Py_BuildValue("(zzb)", simple->username, simple->password, simple->may_save);
+ } else if (!strcmp(iterator->cred_kind, SVN_AUTH_CRED_USERNAME)) {
+ svn_auth_cred_username_t *uname = iterator->credentials;
+ ret = Py_BuildValue("(zb)", uname->username, uname->may_save);
+ } else if (!strcmp(iterator->cred_kind, SVN_AUTH_CRED_SSL_CLIENT_CERT)) {
+ svn_auth_cred_ssl_client_cert_t *ccert = iterator->credentials;
+ ret = Py_BuildValue("(zb)", ccert->cert_file, ccert->may_save);
+ } else {
+ PyErr_Format(PyExc_RuntimeError, "Unknown cred kind %s", iterator->cred_kind);
+ return NULL;
+ }
+
+ RUN_SVN_WITH_POOL(iterator->pool,
+ svn_auth_next_credentials(&iterator->credentials, iterator->state, iterator->pool));
+
+ return ret;
+}
+
+PyTypeObject CredentialsIter_Type = {
+ PyObject_HEAD_INIT(&PyType_Type) 0,
+ .tp_basicsize = sizeof(CredentialsIterObject),
+ .tp_dealloc = (destructor)credentials_iter_dealloc,
+ .tp_name = "ra.CredentialsIter",
+ .tp_iter = PyObject_SelfIter,
+ .tp_iternext = (iternextfunc)credentials_iter_next,
+};
+
static PyMethodDef auth_methods[] = {
{ "set_parameter", auth_set_parameter, METH_VARARGS, NULL },
{ "get_parameter", auth_get_parameter, METH_VARARGS, NULL },
+ { "credentials", auth_first_credentials, METH_VARARGS, NULL },
{ NULL, }
};
@@ -1434,6 +1521,7 @@
ret = PyObject_CallFunction(fn, "sb", realm, may_save);
if (ret == NULL)
return py_svn_error();
+ *cred = apr_pcalloc(pool, sizeof(**cred));
(*cred)->username = apr_pstrdup(pool, PyString_AsString(PyTuple_GetItem(ret, 0)));
(*cred)->may_save = (PyTuple_GetItem(ret, 1) == Py_True);
return NULL;
@@ -1461,7 +1549,12 @@
ret = PyObject_CallFunction(fn, "ssb", realm, username, may_save);
if (ret == NULL)
return py_svn_error();
+ if (!PyTuple_Check(ret)) {
+ PyErr_SetString(PyExc_TypeError, "expected tuple with simple credentials");
+ return py_svn_error();
+ }
/* FIXME: Check type of ret */
+ *cred = apr_pcalloc(pool, sizeof(**cred));
(*cred)->username = apr_pstrdup(pool, PyString_AsString(PyTuple_GetItem(ret, 0)));
(*cred)->password = apr_pstrdup(pool, PyString_AsString(PyTuple_GetItem(ret, 1)));
(*cred)->may_save = (PyTuple_GetItem(ret, 2) == Py_True);
@@ -1500,7 +1593,8 @@
return py_svn_error();
/* FIXME: Check that ret is a tuple of size 2 */
-
+
+ *cred = apr_pcalloc(pool, sizeof(**cred));
(*cred)->may_save = (PyTuple_GetItem(ret, 0) == Py_True);
(*cred)->accepted_failures = PyLong_AsLong(PyTuple_GetItem(ret, 1));
@@ -1533,11 +1627,27 @@
if (ret == NULL)
return py_svn_error();
/* FIXME: Check ret is a tuple of size 2 */
+ *cred = apr_pcalloc(pool, sizeof(**cred));
(*cred)->password = apr_pstrdup(pool, PyString_AsString(PyTuple_GetItem(ret, 0)));
(*cred)->may_save = (PyTuple_GetItem(ret, 1) == Py_True);
return NULL;
}
+static svn_error_t *py_ssl_client_cert_prompt(svn_auth_cred_ssl_client_cert_t **cred, void *baton, const char *realm, svn_boolean_t may_save, apr_pool_t *pool)
+{
+ PyObject *fn = (PyObject *)baton, *ret;
+ ret = PyObject_CallFunction(fn, "sb", realm, may_save);
+ if (ret == NULL)
+ return py_svn_error();
+ /* FIXME: Check ret is a tuple of size 2 */
+ *cred = apr_pcalloc(pool, sizeof(**cred));
+ (*cred)->cert_file = apr_pstrdup(pool, PyString_AsString(PyTuple_GetItem(ret, 0)));
+ (*cred)->may_save = (PyTuple_GetItem(ret, 1) == Py_True);
+ return NULL;
+}
+
+
+
static PyObject *get_ssl_client_cert_pw_prompt_provider(PyObject *self, PyObject *args)
{
PyObject *prompt_func;
@@ -1558,6 +1668,26 @@
return (PyObject *)auth;
}
+static PyObject *get_ssl_client_cert_prompt_provider(PyObject *self, PyObject *args)
+{
+ PyObject *prompt_func;
+ int retry_limit;
+ AuthProviderObject *auth;
+
+ if (!PyArg_ParseTuple(args, "Oi", &prompt_func, &retry_limit))
+ return NULL;
+
+ auth = PyObject_New(AuthProviderObject, &AuthProvider_Type);
+ if (auth == NULL)
+ return NULL;
+ auth->pool = Pool(NULL);
+ if (auth->pool == NULL)
+ return NULL;
+ Py_INCREF(prompt_func);
+ svn_auth_get_ssl_client_cert_prompt_provider (&auth->provider, py_ssl_client_cert_prompt, (void *)prompt_func, retry_limit, auth->pool);
+ return (PyObject *)auth;
+}
+
static PyObject *get_username_provider(PyObject *self)
{
AuthProviderObject *auth;
@@ -1645,6 +1775,7 @@
{ "get_username_prompt_provider", (PyCFunction)get_username_prompt_provider, METH_VARARGS, NULL },
{ "get_simple_prompt_provider", (PyCFunction)get_simple_prompt_provider, METH_VARARGS, NULL },
{ "get_ssl_server_trust_prompt_provider", (PyCFunction)get_ssl_server_trust_prompt_provider, METH_VARARGS, NULL },
+ { "get_ssl_client_cert_prompt_provider", (PyCFunction)get_ssl_client_cert_prompt_provider, METH_VARARGS, NULL },
{ "get_ssl_client_cert_pw_prompt_provider", (PyCFunction)get_ssl_client_cert_pw_prompt_provider, METH_VARARGS, NULL },
{ "get_username_provider", (PyCFunction)get_username_provider, METH_NOARGS, NULL },
{ NULL, }
@@ -1676,6 +1807,9 @@
if (PyType_Ready(&Auth_Type) < 0)
return;
+ if (PyType_Ready(&CredentialsIter_Type) < 0)
+ return;
+
if (PyType_Ready(&AuthProvider_Type) < 0)
return;
=== modified file 'tests/test_ra.py'
--- a/tests/test_ra.py 2008-06-22 08:31:12 +0000
+++ b/tests/test_ra.py 2008-06-22 20:17:00 +0000
@@ -113,3 +113,38 @@
dir.close()
editor.close()
+
+class AuthTests(TestCase):
+ def test_not_registered(self):
+ auth = ra.Auth([])
+ self.assertRaises(core.SubversionException, auth.credentials, "svn.simple", "MyRealm")
+
+ def test_simple(self):
+ auth = ra.Auth([ra.get_simple_prompt_provider(lambda realm, uname, may_save: ('foo', "geheim", 0), 0)])
+ creds = auth.credentials("svn.simple", "MyRealm")
+ self.assertEquals(("foo", "geheim", 0), creds.next())
+ self.assertRaises(StopIteration, creds.next)
+
+ def test_username(self):
+ auth = ra.Auth([ra.get_username_prompt_provider(lambda realm, may_save: ("somebody", 0), 0)])
+ creds = auth.credentials("svn.username", "MyRealm")
+ self.assertEquals(("somebody", 0), creds.next())
+ self.assertRaises(StopIteration, creds.next)
+
+ def test_client_cert(self):
+ auth = ra.Auth([ra.get_ssl_client_cert_prompt_provider(lambda realm, may_save: ("filename", 0), 0)])
+ creds = auth.credentials("svn.ssl.client-cert", "MyRealm")
+ self.assertEquals(("filename", 0), creds.next())
+ self.assertRaises(StopIteration, creds.next)
+
+ def test_retry(self):
+ self.i = 0
+ def inc_foo(realm, may_save):
+ self.i += 1
+ return ("somebody%d" % self.i, 0)
+ auth = ra.Auth([ra.get_username_prompt_provider(inc_foo, 2)])
+ creds = auth.credentials("svn.username", "MyRealm")
+ self.assertEquals(("somebody1", 0), creds.next())
+ self.assertEquals(("somebody2", 0), creds.next())
+ self.assertEquals(("somebody3", 0), creds.next())
+ self.assertRaises(StopIteration, creds.next)
More information about the bazaar-commits
mailing list