
* rename + add field Signed-off-by: Jens Langhammer <jens@goauthentik.io> * initial implementation Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix Signed-off-by: Jens Langhammer <jens@goauthentik.io> * refactor Signed-off-by: Jens Langhammer <jens@goauthentik.io> * rework source cc tests Signed-off-by: Jens Langhammer <jens@goauthentik.io> * add tests Signed-off-by: Jens Langhammer <jens@goauthentik.io> * update docs Signed-off-by: Jens Langhammer <jens@goauthentik.io> * re-migrate Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix a Signed-off-by: Jens Langhammer <jens@goauthentik.io> * Apply suggestions from code review Co-authored-by: Tana M Berry <tanamarieberry@yahoo.com> Signed-off-by: Jens L. <jens@beryju.org> --------- Signed-off-by: Jens Langhammer <jens@goauthentik.io> Signed-off-by: Jens L. <jens@beryju.org> Co-authored-by: Tana M Berry <tanamarieberry@yahoo.com>
236 lines
8.9 KiB
Python
236 lines
8.9 KiB
Python
"""Test token view"""
|
|
|
|
from datetime import datetime, timedelta
|
|
from json import loads
|
|
|
|
from django.test import RequestFactory
|
|
from django.urls import reverse
|
|
from jwt import decode
|
|
|
|
from authentik.blueprints.tests import apply_blueprint
|
|
from authentik.core.models import Application, Group
|
|
from authentik.core.tests.utils import create_test_cert, create_test_flow
|
|
from authentik.lib.generators import generate_id
|
|
from authentik.policies.models import PolicyBinding
|
|
from authentik.providers.oauth2.constants import (
|
|
GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
SCOPE_OPENID,
|
|
SCOPE_OPENID_EMAIL,
|
|
SCOPE_OPENID_PROFILE,
|
|
TOKEN_TYPE,
|
|
)
|
|
from authentik.providers.oauth2.models import (
|
|
OAuth2Provider,
|
|
RedirectURI,
|
|
RedirectURIMatchingMode,
|
|
ScopeMapping,
|
|
)
|
|
from authentik.providers.oauth2.tests.utils import OAuthTestCase
|
|
from authentik.providers.oauth2.views.jwks import JWKSView
|
|
from authentik.sources.oauth.models import OAuthSource
|
|
|
|
|
|
class TestTokenClientCredentialsJWTSource(OAuthTestCase):
|
|
"""Test token (client_credentials, with JWT) view"""
|
|
|
|
@apply_blueprint("system/providers-oauth2.yaml")
|
|
def setUp(self) -> None:
|
|
super().setUp()
|
|
self.factory = RequestFactory()
|
|
self.other_cert = create_test_cert()
|
|
# Provider used as a helper to sign JWTs with the same key as the OAuth source has
|
|
self.helper_provider = OAuth2Provider.objects.create(
|
|
name=generate_id(),
|
|
authorization_flow=create_test_flow(),
|
|
signing_key=self.other_cert,
|
|
)
|
|
self.cert = create_test_cert()
|
|
|
|
jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig")
|
|
self.source: OAuthSource = OAuthSource.objects.create(
|
|
name=generate_id(),
|
|
slug=generate_id(),
|
|
provider_type="openidconnect",
|
|
consumer_key=generate_id(),
|
|
consumer_secret=generate_id(),
|
|
authorization_url="http://foo",
|
|
access_token_url=f"http://{generate_id()}",
|
|
profile_url="http://foo",
|
|
oidc_well_known_url="",
|
|
oidc_jwks_url="",
|
|
oidc_jwks={
|
|
"keys": [jwk],
|
|
},
|
|
)
|
|
|
|
self.provider: OAuth2Provider = OAuth2Provider.objects.create(
|
|
name="test",
|
|
authorization_flow=create_test_flow(),
|
|
redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
|
|
signing_key=self.cert,
|
|
)
|
|
self.provider.jwt_federation_sources.add(self.source)
|
|
self.provider.property_mappings.set(ScopeMapping.objects.all())
|
|
self.app = Application.objects.create(name="test", slug="test", provider=self.provider)
|
|
|
|
def test_invalid_type(self):
|
|
"""test invalid type"""
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "foo",
|
|
"client_assertion": "foo.bar",
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 400)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["error"], "invalid_grant")
|
|
|
|
def test_invalid_jwt(self):
|
|
"""test invalid JWT"""
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
|
"client_assertion": "foo.bar",
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 400)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["error"], "invalid_grant")
|
|
|
|
def test_invalid_signature(self):
|
|
"""test invalid JWT"""
|
|
token = self.helper_provider.encode(
|
|
{
|
|
"sub": "foo",
|
|
"exp": datetime.now() + timedelta(hours=2),
|
|
}
|
|
)
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
|
"client_assertion": token + "foo",
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 400)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["error"], "invalid_grant")
|
|
|
|
def test_invalid_expired(self):
|
|
"""test invalid JWT"""
|
|
token = self.helper_provider.encode(
|
|
{
|
|
"sub": "foo",
|
|
"exp": datetime.now() - timedelta(hours=2),
|
|
}
|
|
)
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
|
"client_assertion": token,
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 400)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["error"], "invalid_grant")
|
|
|
|
def test_invalid_no_app(self):
|
|
"""test invalid JWT"""
|
|
self.app.provider = None
|
|
self.app.save()
|
|
token = self.helper_provider.encode(
|
|
{
|
|
"sub": "foo",
|
|
"exp": datetime.now() + timedelta(hours=2),
|
|
}
|
|
)
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
|
"client_assertion": token,
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 400)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["error"], "invalid_grant")
|
|
|
|
def test_invalid_access_denied(self):
|
|
"""test invalid JWT"""
|
|
group = Group.objects.create(name="foo")
|
|
PolicyBinding.objects.create(
|
|
group=group,
|
|
target=self.app,
|
|
order=0,
|
|
)
|
|
token = self.helper_provider.encode(
|
|
{
|
|
"sub": "foo",
|
|
"exp": datetime.now() + timedelta(hours=2),
|
|
}
|
|
)
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
|
"client_assertion": token,
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 400)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["error"], "invalid_grant")
|
|
|
|
def test_successful(self):
|
|
"""test successful"""
|
|
token = self.helper_provider.encode(
|
|
{
|
|
"sub": "foo",
|
|
"exp": datetime.now() + timedelta(hours=2),
|
|
}
|
|
)
|
|
response = self.client.post(
|
|
reverse("authentik_providers_oauth2:token"),
|
|
{
|
|
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
|
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
|
"client_id": self.provider.client_id,
|
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
|
"client_assertion": token,
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 200)
|
|
body = loads(response.content.decode())
|
|
self.assertEqual(body["token_type"], TOKEN_TYPE)
|
|
_, alg = self.provider.jwt_key
|
|
jwt = decode(
|
|
body["access_token"],
|
|
key=self.provider.signing_key.public_key,
|
|
algorithms=[alg],
|
|
audience=self.provider.client_id,
|
|
)
|
|
self.assertEqual(
|
|
jwt["given_name"], "Autogenerated user from application test (client credentials JWT)"
|
|
)
|
|
self.assertEqual(jwt["preferred_username"], "test-foo")
|