
* always treat flow as openid flow Signed-off-by: Jens Langhammer <jens@goauthentik.io> * improve issuer URL generation Signed-off-by: Jens Langhammer <jens@goauthentik.io> * more refactoring Signed-off-by: Jens Langhammer <jens@goauthentik.io> * update introspection Signed-off-by: Jens Langhammer <jens@goauthentik.io> * more refinement Signed-off-by: Jens Langhammer <jens@goauthentik.io> * migrate more Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix more things, update api Signed-off-by: Jens Langhammer <jens@goauthentik.io> * regen migrations Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix a bunch of things Signed-off-by: Jens Langhammer <jens@goauthentik.io> * start updating tests Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix implicit flow, auto set exp Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix timeozone not used correctly Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix revoke Signed-off-by: Jens Langhammer <jens@goauthentik.io> * more timezone shenanigans Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix userinfo tests Signed-off-by: Jens Langhammer <jens@goauthentik.io> * update web Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix proxy outpost Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix api tests Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix missing at_hash for implicit flows Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix tests Signed-off-by: Jens Langhammer <jens@goauthentik.io> * re-include at_hash in implicit auth flow Signed-off-by: Jens Langhammer <jens@goauthentik.io> * use folder context for outpost build Signed-off-by: Jens Langhammer <jens@goauthentik.io> --------- Signed-off-by: Jens Langhammer <jens@goauthentik.io>
225 lines
8.5 KiB
Python
225 lines
8.5 KiB
Python
"""Test token view"""
|
|
from datetime import datetime, timedelta
|
|
from json import loads
|
|
|
|
from django.test import RequestFactory
|
|
from django.urls import reverse
|
|
from jwt import decode
|
|
|
|
from authentik.blueprints.tests import apply_blueprint
|
|
from authentik.core.models import Application, Group
|
|
from authentik.core.tests.utils import create_test_cert, create_test_flow
|
|
from authentik.lib.generators import generate_id, generate_key
|
|
from authentik.policies.models import PolicyBinding
|
|
from authentik.providers.oauth2.constants import (
|
|
GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
SCOPE_OPENID,
|
|
SCOPE_OPENID_EMAIL,
|
|
SCOPE_OPENID_PROFILE,
|
|
TOKEN_TYPE,
|
|
)
|
|
from authentik.providers.oauth2.models import OAuth2Provider, ScopeMapping
|
|
from authentik.providers.oauth2.tests.utils import OAuthTestCase
|
|
from authentik.providers.oauth2.views.jwks import JWKSView
|
|
from authentik.sources.oauth.models import OAuthSource
|
|
|
|
|
|
class TestTokenClientCredentialsJWTSource(OAuthTestCase):
|
|
"""Test token (client_credentials, with JWT) view"""
|
|
|
|
@apply_blueprint("system/providers-oauth2.yaml")
|
|
def setUp(self) -> None:
|
|
super().setUp()
|
|
self.factory = RequestFactory()
|
|
self.cert = create_test_cert()
|
|
|
|
jwk = JWKSView().get_jwk_for_key(self.cert)
|
|
self.source: OAuthSource = OAuthSource.objects.create(
|
|
name=generate_id(),
|
|
slug=generate_id(),
|
|
provider_type="openidconnect",
|
|
consumer_key=generate_id(),
|
|
consumer_secret=generate_key(),
|
|
authorization_url="http://foo",
|
|
access_token_url=f"http://{generate_id()}",
|
|
profile_url="http://foo",
|
|
oidc_well_known_url="",
|
|
oidc_jwks_url="",
|
|
oidc_jwks={
|
|
"keys": [jwk],
|
|
},
|
|
)
|
|
|
|
self.provider: OAuth2Provider = OAuth2Provider.objects.create(
|
|
name="test",
|
|
client_id=generate_id(),
|
|
client_secret=generate_key(),
|
|
authorization_flow=create_test_flow(),
|
|
redirect_uris="http://testserver",
|
|
signing_key=self.cert,
|
|
)
|
|
self.provider.jwks_sources.add(self.source)
|
|
self.provider.property_mappings.set(ScopeMapping.objects.all())
|
|
self.app = Application.objects.create(name="test", slug="test", provider=self.provider)
|
|
|
|
def test_invalid_type(self):
|
|
"""test invalid type"""
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "foo",
|
|
"client_assertion": "foo.bar",
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 400)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["error"], "invalid_grant")
|
|
|
|
def test_invalid_jwt(self):
|
|
"""test invalid JWT"""
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
|
"client_assertion": "foo.bar",
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 400)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["error"], "invalid_grant")
|
|
|
|
def test_invalid_signature(self):
|
|
"""test invalid JWT"""
|
|
token = self.provider.encode(
|
|
{
|
|
"sub": "foo",
|
|
"exp": datetime.now() + timedelta(hours=2),
|
|
}
|
|
)
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
|
"client_assertion": token + "foo",
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 400)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["error"], "invalid_grant")
|
|
|
|
def test_invalid_expired(self):
|
|
"""test invalid JWT"""
|
|
token = self.provider.encode(
|
|
{
|
|
"sub": "foo",
|
|
"exp": datetime.now() - timedelta(hours=2),
|
|
}
|
|
)
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
|
"client_assertion": token,
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 400)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["error"], "invalid_grant")
|
|
|
|
def test_invalid_no_app(self):
|
|
"""test invalid JWT"""
|
|
self.app.provider = None
|
|
self.app.save()
|
|
token = self.provider.encode(
|
|
{
|
|
"sub": "foo",
|
|
"exp": datetime.now() + timedelta(hours=2),
|
|
}
|
|
)
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
|
"client_assertion": token,
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 400)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["error"], "invalid_grant")
|
|
|
|
def test_invalid_access_denied(self):
|
|
"""test invalid JWT"""
|
|
group = Group.objects.create(name="foo")
|
|
PolicyBinding.objects.create(
|
|
group=group,
|
|
target=self.app,
|
|
order=0,
|
|
)
|
|
token = self.provider.encode(
|
|
{
|
|
"sub": "foo",
|
|
"exp": datetime.now() + timedelta(hours=2),
|
|
}
|
|
)
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
|
"client_assertion": token,
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 400)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["error"], "invalid_grant")
|
|
|
|
def test_successful(self):
|
|
"""test successful"""
|
|
token = self.provider.encode(
|
|
{
|
|
"sub": "foo",
|
|
"exp": datetime.now() + timedelta(hours=2),
|
|
}
|
|
)
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
|
"client_assertion": token,
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 200)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["token_type"], TOKEN_TYPE)
|
|
_, alg = self.provider.jwt_key
|
|
jwt = decode(
|
|
body["access_token"],
|
|
key=self.provider.signing_key.public_key,
|
|
algorithms=[alg],
|
|
audience=self.provider.client_id,
|
|
)
|
|
self.assertEqual(
|
|
jwt["given_name"], "Autogenerated user from application test (client credentials JWT)"
|
|
)
|
|
self.assertEqual(jwt["preferred_username"], "test-foo")
|