providers/oauth2: improve conformance with client_credentials standard (#8471)
* allow using username:password base64 encoded as client_secret Signed-off-by: Jens Langhammer <jens@goauthentik.io> * support standard method by generating a user Signed-off-by: Jens Langhammer <jens@goauthentik.io> * update docs Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix warning Signed-off-by: Jens Langhammer <jens@goauthentik.io> --------- Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
@ -44,7 +44,7 @@ class CertificateBuilder:
|
||||
def generate_private_key(self) -> PrivateKeyTypes:
|
||||
"""Generate private key"""
|
||||
if self._use_ec_private_key:
|
||||
return ec.generate_private_key(curve=ec.SECP256R1)
|
||||
return ec.generate_private_key(curve=ec.SECP256R1())
|
||||
return rsa.generate_private_key(
|
||||
public_exponent=65537, key_size=4096, backend=default_backend()
|
||||
)
|
||||
|
||||
170
authentik/providers/oauth2/tests/test_token_cc_standard.py
Normal file
170
authentik/providers/oauth2/tests/test_token_cc_standard.py
Normal file
@ -0,0 +1,170 @@
|
||||
"""Test token view"""
|
||||
|
||||
from json import loads
|
||||
|
||||
from django.test import RequestFactory
|
||||
from django.urls import reverse
|
||||
from jwt import decode
|
||||
|
||||
from authentik.blueprints.tests import apply_blueprint
|
||||
from authentik.core.models import Application, Group, Token, TokenIntents, UserTypes
|
||||
from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow
|
||||
from authentik.policies.models import PolicyBinding
|
||||
from authentik.providers.oauth2.constants import (
|
||||
GRANT_TYPE_CLIENT_CREDENTIALS,
|
||||
GRANT_TYPE_PASSWORD,
|
||||
SCOPE_OPENID,
|
||||
SCOPE_OPENID_EMAIL,
|
||||
SCOPE_OPENID_PROFILE,
|
||||
TOKEN_TYPE,
|
||||
)
|
||||
from authentik.providers.oauth2.errors import TokenError
|
||||
from authentik.providers.oauth2.models import OAuth2Provider, ScopeMapping
|
||||
from authentik.providers.oauth2.tests.utils import OAuthTestCase
|
||||
|
||||
|
||||
class TestTokenClientCredentialsStandard(OAuthTestCase):
|
||||
"""Test token (client_credentials) view"""
|
||||
|
||||
@apply_blueprint("system/providers-oauth2.yaml")
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
self.factory = RequestFactory()
|
||||
self.provider = OAuth2Provider.objects.create(
|
||||
name="test",
|
||||
authorization_flow=create_test_flow(),
|
||||
redirect_uris="http://testserver",
|
||||
signing_key=create_test_cert(),
|
||||
)
|
||||
self.provider.property_mappings.set(ScopeMapping.objects.all())
|
||||
self.app = Application.objects.create(name="test", slug="test", provider=self.provider)
|
||||
self.user = create_test_admin_user("sa")
|
||||
self.user.type = UserTypes.SERVICE_ACCOUNT
|
||||
self.user.save()
|
||||
self.token = Token.objects.create(
|
||||
identifier="sa-token",
|
||||
user=self.user,
|
||||
intent=TokenIntents.INTENT_APP_PASSWORD,
|
||||
expiring=False,
|
||||
)
|
||||
|
||||
def test_wrong_user(self):
|
||||
"""test invalid username"""
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
{
|
||||
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
||||
"scope": SCOPE_OPENID,
|
||||
"client_id": self.provider.client_id,
|
||||
"client_secret": self.provider.client_secret + "foo",
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertJSONEqual(
|
||||
response.content.decode(),
|
||||
{"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]},
|
||||
)
|
||||
|
||||
def test_no_provider(self):
|
||||
"""test no provider"""
|
||||
self.app.provider = None
|
||||
self.app.save()
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
{
|
||||
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
||||
"scope": SCOPE_OPENID,
|
||||
"client_id": self.provider.client_id,
|
||||
"client_secret": self.provider.client_secret,
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertJSONEqual(
|
||||
response.content.decode(),
|
||||
{"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]},
|
||||
)
|
||||
|
||||
def test_permission_denied(self):
|
||||
"""test permission denied"""
|
||||
group = Group.objects.create(name="foo")
|
||||
PolicyBinding.objects.create(
|
||||
group=group,
|
||||
target=self.app,
|
||||
order=0,
|
||||
)
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
{
|
||||
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
||||
"scope": SCOPE_OPENID,
|
||||
"client_id": self.provider.client_id,
|
||||
"client_secret": self.provider.client_secret,
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertJSONEqual(
|
||||
response.content.decode(),
|
||||
{"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]},
|
||||
)
|
||||
|
||||
def test_successful(self):
|
||||
"""test successful"""
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
{
|
||||
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
||||
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
||||
"client_id": self.provider.client_id,
|
||||
"client_secret": self.provider.client_secret,
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
body = loads(response.content.decode())
|
||||
self.assertEqual(body["token_type"], TOKEN_TYPE)
|
||||
_, alg = self.provider.jwt_key
|
||||
jwt = decode(
|
||||
body["access_token"],
|
||||
key=self.provider.signing_key.public_key,
|
||||
algorithms=[alg],
|
||||
audience=self.provider.client_id,
|
||||
)
|
||||
self.assertEqual(
|
||||
jwt["given_name"], "Autogenerated user from application test (client credentials)"
|
||||
)
|
||||
self.assertEqual(jwt["preferred_username"], "ak-test-client_credentials")
|
||||
jwt = decode(
|
||||
body["id_token"],
|
||||
key=self.provider.signing_key.public_key,
|
||||
algorithms=[alg],
|
||||
audience=self.provider.client_id,
|
||||
)
|
||||
self.assertEqual(
|
||||
jwt["given_name"], "Autogenerated user from application test (client credentials)"
|
||||
)
|
||||
self.assertEqual(jwt["preferred_username"], "ak-test-client_credentials")
|
||||
|
||||
def test_successful_password(self):
|
||||
"""test successful (password grant)"""
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
{
|
||||
"grant_type": GRANT_TYPE_PASSWORD,
|
||||
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
||||
"client_id": self.provider.client_id,
|
||||
"client_secret": self.provider.client_secret,
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
body = loads(response.content.decode())
|
||||
self.assertEqual(body["token_type"], TOKEN_TYPE)
|
||||
_, alg = self.provider.jwt_key
|
||||
jwt = decode(
|
||||
body["access_token"],
|
||||
key=self.provider.signing_key.public_key,
|
||||
algorithms=[alg],
|
||||
audience=self.provider.client_id,
|
||||
)
|
||||
self.assertEqual(
|
||||
jwt["given_name"], "Autogenerated user from application test (client credentials)"
|
||||
)
|
||||
self.assertEqual(jwt["preferred_username"], "ak-test-client_credentials")
|
||||
@ -0,0 +1,182 @@
|
||||
"""Test token view"""
|
||||
|
||||
from base64 import b64encode
|
||||
from json import loads
|
||||
|
||||
from django.test import RequestFactory
|
||||
from django.urls import reverse
|
||||
from jwt import decode
|
||||
|
||||
from authentik.blueprints.tests import apply_blueprint
|
||||
from authentik.core.models import Application, Group, Token, TokenIntents, UserTypes
|
||||
from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow
|
||||
from authentik.policies.models import PolicyBinding
|
||||
from authentik.providers.oauth2.constants import (
|
||||
GRANT_TYPE_CLIENT_CREDENTIALS,
|
||||
GRANT_TYPE_PASSWORD,
|
||||
SCOPE_OPENID,
|
||||
SCOPE_OPENID_EMAIL,
|
||||
SCOPE_OPENID_PROFILE,
|
||||
TOKEN_TYPE,
|
||||
)
|
||||
from authentik.providers.oauth2.errors import TokenError
|
||||
from authentik.providers.oauth2.models import OAuth2Provider, ScopeMapping
|
||||
from authentik.providers.oauth2.tests.utils import OAuthTestCase
|
||||
|
||||
|
||||
class TestTokenClientCredentialsStandardCompat(OAuthTestCase):
|
||||
"""Test token (client_credentials) view"""
|
||||
|
||||
@apply_blueprint("system/providers-oauth2.yaml")
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
self.factory = RequestFactory()
|
||||
self.provider = OAuth2Provider.objects.create(
|
||||
name="test",
|
||||
authorization_flow=create_test_flow(),
|
||||
redirect_uris="http://testserver",
|
||||
signing_key=create_test_cert(),
|
||||
)
|
||||
self.provider.property_mappings.set(ScopeMapping.objects.all())
|
||||
self.app = Application.objects.create(name="test", slug="test", provider=self.provider)
|
||||
self.user = create_test_admin_user("sa")
|
||||
self.user.type = UserTypes.SERVICE_ACCOUNT
|
||||
self.user.save()
|
||||
self.token = Token.objects.create(
|
||||
identifier="sa-token",
|
||||
user=self.user,
|
||||
intent=TokenIntents.INTENT_APP_PASSWORD,
|
||||
expiring=False,
|
||||
)
|
||||
|
||||
def test_wrong_user(self):
|
||||
"""test invalid username"""
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
{
|
||||
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
||||
"scope": SCOPE_OPENID,
|
||||
"client_id": self.provider.client_id,
|
||||
"client_secret": b64encode(f"saa:{self.token.key}".encode()).decode(),
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertJSONEqual(
|
||||
response.content.decode(),
|
||||
{"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]},
|
||||
)
|
||||
|
||||
def test_wrong_token(self):
|
||||
"""test invalid token"""
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
{
|
||||
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
||||
"scope": SCOPE_OPENID,
|
||||
"client_id": self.provider.client_id,
|
||||
"client_secret": b64encode(f"sa:{self.token.key}foo".encode()).decode(),
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertJSONEqual(
|
||||
response.content.decode(),
|
||||
{"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]},
|
||||
)
|
||||
|
||||
def test_no_provider(self):
|
||||
"""test no provider"""
|
||||
self.app.provider = None
|
||||
self.app.save()
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
{
|
||||
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
||||
"scope": SCOPE_OPENID,
|
||||
"client_id": self.provider.client_id,
|
||||
"client_secret": b64encode(f"sa:{self.token.key}".encode()).decode(),
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertJSONEqual(
|
||||
response.content.decode(),
|
||||
{"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]},
|
||||
)
|
||||
|
||||
def test_permission_denied(self):
|
||||
"""test permission denied"""
|
||||
group = Group.objects.create(name="foo")
|
||||
PolicyBinding.objects.create(
|
||||
group=group,
|
||||
target=self.app,
|
||||
order=0,
|
||||
)
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
{
|
||||
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
||||
"scope": SCOPE_OPENID,
|
||||
"client_id": self.provider.client_id,
|
||||
"client_secret": b64encode(f"sa:{self.token.key}".encode()).decode(),
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertJSONEqual(
|
||||
response.content.decode(),
|
||||
{"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]},
|
||||
)
|
||||
|
||||
def test_successful(self):
|
||||
"""test successful"""
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
{
|
||||
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
|
||||
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
||||
"client_id": self.provider.client_id,
|
||||
"client_secret": b64encode(f"sa:{self.token.key}".encode()).decode(),
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
body = loads(response.content.decode())
|
||||
self.assertEqual(body["token_type"], TOKEN_TYPE)
|
||||
_, alg = self.provider.jwt_key
|
||||
jwt = decode(
|
||||
body["access_token"],
|
||||
key=self.provider.signing_key.public_key,
|
||||
algorithms=[alg],
|
||||
audience=self.provider.client_id,
|
||||
)
|
||||
self.assertEqual(jwt["given_name"], self.user.name)
|
||||
self.assertEqual(jwt["preferred_username"], self.user.username)
|
||||
jwt = decode(
|
||||
body["id_token"],
|
||||
key=self.provider.signing_key.public_key,
|
||||
algorithms=[alg],
|
||||
audience=self.provider.client_id,
|
||||
)
|
||||
self.assertEqual(jwt["given_name"], self.user.name)
|
||||
self.assertEqual(jwt["preferred_username"], self.user.username)
|
||||
|
||||
def test_successful_password(self):
|
||||
"""test successful (password grant)"""
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
{
|
||||
"grant_type": GRANT_TYPE_PASSWORD,
|
||||
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
|
||||
"client_id": self.provider.client_id,
|
||||
"client_secret": b64encode(f"sa:{self.token.key}".encode()).decode(),
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
body = loads(response.content.decode())
|
||||
self.assertEqual(body["token_type"], TOKEN_TYPE)
|
||||
_, alg = self.provider.jwt_key
|
||||
jwt = decode(
|
||||
body["access_token"],
|
||||
key=self.provider.signing_key.public_key,
|
||||
algorithms=[alg],
|
||||
audience=self.provider.client_id,
|
||||
)
|
||||
self.assertEqual(jwt["given_name"], self.user.name)
|
||||
self.assertEqual(jwt["preferred_username"], self.user.username)
|
||||
@ -23,7 +23,7 @@ from authentik.providers.oauth2.models import OAuth2Provider, ScopeMapping
|
||||
from authentik.providers.oauth2.tests.utils import OAuthTestCase
|
||||
|
||||
|
||||
class TestTokenClientCredentials(OAuthTestCase):
|
||||
class TestTokenClientCredentialsUserNamePassword(OAuthTestCase):
|
||||
"""Test token (client_credentials) view"""
|
||||
|
||||
@apply_blueprint("system/providers-oauth2.yaml")
|
||||
@ -1,6 +1,7 @@
|
||||
"""authentik OAuth2 Token views"""
|
||||
|
||||
from base64 import urlsafe_b64encode
|
||||
from base64 import b64decode, urlsafe_b64encode
|
||||
from binascii import Error
|
||||
from dataclasses import InitVar, dataclass
|
||||
from datetime import datetime
|
||||
from hashlib import sha256
|
||||
@ -23,10 +24,12 @@ from authentik.core.middleware import CTX_AUTH_VIA
|
||||
from authentik.core.models import (
|
||||
USER_ATTRIBUTE_EXPIRES,
|
||||
USER_ATTRIBUTE_GENERATED,
|
||||
USER_PATH_SYSTEM_PREFIX,
|
||||
Application,
|
||||
Token,
|
||||
TokenIntents,
|
||||
User,
|
||||
UserTypes,
|
||||
)
|
||||
from authentik.events.models import Event, EventAction
|
||||
from authentik.events.signals import get_login_event
|
||||
@ -286,11 +289,29 @@ class TokenParams:
|
||||
raise TokenError("invalid_grant")
|
||||
|
||||
def __post_init_client_credentials(self, request: HttpRequest):
|
||||
# client_credentials flow with client assertion
|
||||
if request.POST.get(CLIENT_ASSERTION_TYPE, "") != "":
|
||||
return self.__post_init_client_credentials_jwt(request)
|
||||
# authentik-custom-ish client credentials flow
|
||||
if request.POST.get("username", "") != "":
|
||||
return self.__post_init_client_credentials_creds(
|
||||
request, request.POST.get("username"), request.POST.get("password")
|
||||
)
|
||||
# Standard method which creates an automatic user
|
||||
if self.client_secret == self.provider.client_secret:
|
||||
return self.__post_init_client_credentials_generated(request)
|
||||
# Standard workaround method which stores username:password
|
||||
# as client_secret
|
||||
try:
|
||||
user, _, password = b64decode(self.client_secret).decode("utf-8").partition(":")
|
||||
return self.__post_init_client_credentials_creds(request, user, password)
|
||||
except (ValueError, Error):
|
||||
raise TokenError("invalid_grant")
|
||||
|
||||
def __post_init_client_credentials_creds(
|
||||
self, request: HttpRequest, username: str, password: str
|
||||
):
|
||||
# Authenticate user based on credentials
|
||||
username = request.POST.get("username")
|
||||
password = request.POST.get("password")
|
||||
user = User.objects.filter(username=username).first()
|
||||
if not user:
|
||||
raise TokenError("invalid_grant")
|
||||
@ -316,7 +337,6 @@ class TokenParams:
|
||||
PLAN_CONTEXT_APPLICATION: app,
|
||||
},
|
||||
).from_http(request, user=user)
|
||||
return None
|
||||
|
||||
# pylint: disable=too-many-locals
|
||||
def __post_init_client_credentials_jwt(self, request: HttpRequest):
|
||||
@ -409,6 +429,35 @@ class TokenParams:
|
||||
},
|
||||
).from_http(request, user=self.user)
|
||||
|
||||
def __post_init_client_credentials_generated(self, request: HttpRequest):
|
||||
# Authorize user access
|
||||
app = Application.objects.filter(provider=self.provider).first()
|
||||
if not app or not app.provider:
|
||||
raise TokenError("invalid_grant")
|
||||
self.user, _ = User.objects.update_or_create(
|
||||
# trim username to ensure the entire username is max 150 chars
|
||||
# (22 chars being the length of the "template")
|
||||
username=f"ak-{self.provider.name[:150-22]}-client_credentials",
|
||||
defaults={
|
||||
"attributes": {
|
||||
USER_ATTRIBUTE_GENERATED: True,
|
||||
},
|
||||
"last_login": timezone.now(),
|
||||
"name": f"Autogenerated user from application {app.name} (client credentials)",
|
||||
"path": f"{USER_PATH_SYSTEM_PREFIX}/apps/{app.slug}",
|
||||
"type": UserTypes.SERVICE_ACCOUNT,
|
||||
},
|
||||
)
|
||||
self.__check_policy_access(app, request)
|
||||
|
||||
Event.new(
|
||||
action=EventAction.LOGIN,
|
||||
**{
|
||||
PLAN_CONTEXT_METHOD: "oauth_client_secret",
|
||||
PLAN_CONTEXT_APPLICATION: app,
|
||||
},
|
||||
).from_http(request, user=self.user)
|
||||
|
||||
def __post_init_device_code(self, request: HttpRequest):
|
||||
device_code = request.POST.get("device_code", "")
|
||||
code = DeviceToken.objects.filter(device_code=device_code, provider=self.provider).first()
|
||||
@ -418,7 +467,6 @@ class TokenParams:
|
||||
|
||||
def __create_user_from_jwt(self, token: dict[str, Any], app: Application, source: OAuthSource):
|
||||
"""Create user from JWT"""
|
||||
exp = token.get("exp")
|
||||
self.user, created = User.objects.update_or_create(
|
||||
username=f"{self.provider.name}-{token.get('sub')}",
|
||||
defaults={
|
||||
@ -428,8 +476,10 @@ class TokenParams:
|
||||
"last_login": timezone.now(),
|
||||
"name": f"Autogenerated user from application {app.name} (client credentials JWT)",
|
||||
"path": source.get_user_path(),
|
||||
"type": UserTypes.SERVICE_ACCOUNT,
|
||||
},
|
||||
)
|
||||
exp = token.get("exp")
|
||||
if created and exp:
|
||||
self.user.attributes[USER_ATTRIBUTE_EXPIRES] = exp
|
||||
self.user.save()
|
||||
|
||||
@ -23,6 +23,10 @@ password=my-token
|
||||
|
||||
This will return a JSON response with an `access_token`, which is a signed JWT token. This token can be sent along requests to other hosts, which can then validate the JWT based on the signing key configured in authentik.
|
||||
|
||||
Starting with authentik 2024.next, it is also possible to encode the username and token of the user to authenticate with, separated with a colon, into a base64 string and pass it as `client_secret` value.
|
||||
|
||||
In addition to that, with authentik 2024.next it is also possible to pass the configured `client_secret` value, which will automatically generate a service account user for which the JWT token will be issued.
|
||||
|
||||
### JWT-authentication
|
||||
|
||||
Starting with authentik 2022.4, you can authenticate and get a token using an existing JWT.
|
||||
|
||||
Reference in New Issue
Block a user