diff --git a/authentik/crypto/builder.py b/authentik/crypto/builder.py index 29bd8caadf..2ad0563916 100644 --- a/authentik/crypto/builder.py +++ b/authentik/crypto/builder.py @@ -44,7 +44,7 @@ class CertificateBuilder: def generate_private_key(self) -> PrivateKeyTypes: """Generate private key""" if self._use_ec_private_key: - return ec.generate_private_key(curve=ec.SECP256R1) + return ec.generate_private_key(curve=ec.SECP256R1()) return rsa.generate_private_key( public_exponent=65537, key_size=4096, backend=default_backend() ) diff --git a/authentik/providers/oauth2/tests/test_token_cc_standard.py b/authentik/providers/oauth2/tests/test_token_cc_standard.py new file mode 100644 index 0000000000..7b233794cd --- /dev/null +++ b/authentik/providers/oauth2/tests/test_token_cc_standard.py @@ -0,0 +1,170 @@ +"""Test token view""" + +from json import loads + +from django.test import RequestFactory +from django.urls import reverse +from jwt import decode + +from authentik.blueprints.tests import apply_blueprint +from authentik.core.models import Application, Group, Token, TokenIntents, UserTypes +from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow +from authentik.policies.models import PolicyBinding +from authentik.providers.oauth2.constants import ( + GRANT_TYPE_CLIENT_CREDENTIALS, + GRANT_TYPE_PASSWORD, + SCOPE_OPENID, + SCOPE_OPENID_EMAIL, + SCOPE_OPENID_PROFILE, + TOKEN_TYPE, +) +from authentik.providers.oauth2.errors import TokenError +from authentik.providers.oauth2.models import OAuth2Provider, ScopeMapping +from authentik.providers.oauth2.tests.utils import OAuthTestCase + + +class TestTokenClientCredentialsStandard(OAuthTestCase): + """Test token (client_credentials) view""" + + @apply_blueprint("system/providers-oauth2.yaml") + def setUp(self) -> None: + super().setUp() + self.factory = RequestFactory() + self.provider = OAuth2Provider.objects.create( + name="test", + authorization_flow=create_test_flow(), + redirect_uris="http://testserver", + signing_key=create_test_cert(), + ) + self.provider.property_mappings.set(ScopeMapping.objects.all()) + self.app = Application.objects.create(name="test", slug="test", provider=self.provider) + self.user = create_test_admin_user("sa") + self.user.type = UserTypes.SERVICE_ACCOUNT + self.user.save() + self.token = Token.objects.create( + identifier="sa-token", + user=self.user, + intent=TokenIntents.INTENT_APP_PASSWORD, + expiring=False, + ) + + def test_wrong_user(self): + """test invalid username""" + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + { + "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, + "scope": SCOPE_OPENID, + "client_id": self.provider.client_id, + "client_secret": self.provider.client_secret + "foo", + }, + ) + self.assertEqual(response.status_code, 400) + self.assertJSONEqual( + response.content.decode(), + {"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]}, + ) + + def test_no_provider(self): + """test no provider""" + self.app.provider = None + self.app.save() + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + { + "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, + "scope": SCOPE_OPENID, + "client_id": self.provider.client_id, + "client_secret": self.provider.client_secret, + }, + ) + self.assertEqual(response.status_code, 400) + self.assertJSONEqual( + response.content.decode(), + {"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]}, + ) + + def test_permission_denied(self): + """test permission denied""" + group = Group.objects.create(name="foo") + PolicyBinding.objects.create( + group=group, + target=self.app, + order=0, + ) + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + { + "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, + "scope": SCOPE_OPENID, + "client_id": self.provider.client_id, + "client_secret": self.provider.client_secret, + }, + ) + self.assertEqual(response.status_code, 400) + self.assertJSONEqual( + response.content.decode(), + {"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]}, + ) + + def test_successful(self): + """test successful""" + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + { + "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, + "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", + "client_id": self.provider.client_id, + "client_secret": self.provider.client_secret, + }, + ) + self.assertEqual(response.status_code, 200) + body = loads(response.content.decode()) + self.assertEqual(body["token_type"], TOKEN_TYPE) + _, alg = self.provider.jwt_key + jwt = decode( + body["access_token"], + key=self.provider.signing_key.public_key, + algorithms=[alg], + audience=self.provider.client_id, + ) + self.assertEqual( + jwt["given_name"], "Autogenerated user from application test (client credentials)" + ) + self.assertEqual(jwt["preferred_username"], "ak-test-client_credentials") + jwt = decode( + body["id_token"], + key=self.provider.signing_key.public_key, + algorithms=[alg], + audience=self.provider.client_id, + ) + self.assertEqual( + jwt["given_name"], "Autogenerated user from application test (client credentials)" + ) + self.assertEqual(jwt["preferred_username"], "ak-test-client_credentials") + + def test_successful_password(self): + """test successful (password grant)""" + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + { + "grant_type": GRANT_TYPE_PASSWORD, + "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", + "client_id": self.provider.client_id, + "client_secret": self.provider.client_secret, + }, + ) + self.assertEqual(response.status_code, 200) + body = loads(response.content.decode()) + self.assertEqual(body["token_type"], TOKEN_TYPE) + _, alg = self.provider.jwt_key + jwt = decode( + body["access_token"], + key=self.provider.signing_key.public_key, + algorithms=[alg], + audience=self.provider.client_id, + ) + self.assertEqual( + jwt["given_name"], "Autogenerated user from application test (client credentials)" + ) + self.assertEqual(jwt["preferred_username"], "ak-test-client_credentials") diff --git a/authentik/providers/oauth2/tests/test_token_cc_standard_compat.py b/authentik/providers/oauth2/tests/test_token_cc_standard_compat.py new file mode 100644 index 0000000000..1c54ad38f1 --- /dev/null +++ b/authentik/providers/oauth2/tests/test_token_cc_standard_compat.py @@ -0,0 +1,182 @@ +"""Test token view""" + +from base64 import b64encode +from json import loads + +from django.test import RequestFactory +from django.urls import reverse +from jwt import decode + +from authentik.blueprints.tests import apply_blueprint +from authentik.core.models import Application, Group, Token, TokenIntents, UserTypes +from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow +from authentik.policies.models import PolicyBinding +from authentik.providers.oauth2.constants import ( + GRANT_TYPE_CLIENT_CREDENTIALS, + GRANT_TYPE_PASSWORD, + SCOPE_OPENID, + SCOPE_OPENID_EMAIL, + SCOPE_OPENID_PROFILE, + TOKEN_TYPE, +) +from authentik.providers.oauth2.errors import TokenError +from authentik.providers.oauth2.models import OAuth2Provider, ScopeMapping +from authentik.providers.oauth2.tests.utils import OAuthTestCase + + +class TestTokenClientCredentialsStandardCompat(OAuthTestCase): + """Test token (client_credentials) view""" + + @apply_blueprint("system/providers-oauth2.yaml") + def setUp(self) -> None: + super().setUp() + self.factory = RequestFactory() + self.provider = OAuth2Provider.objects.create( + name="test", + authorization_flow=create_test_flow(), + redirect_uris="http://testserver", + signing_key=create_test_cert(), + ) + self.provider.property_mappings.set(ScopeMapping.objects.all()) + self.app = Application.objects.create(name="test", slug="test", provider=self.provider) + self.user = create_test_admin_user("sa") + self.user.type = UserTypes.SERVICE_ACCOUNT + self.user.save() + self.token = Token.objects.create( + identifier="sa-token", + user=self.user, + intent=TokenIntents.INTENT_APP_PASSWORD, + expiring=False, + ) + + def test_wrong_user(self): + """test invalid username""" + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + { + "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, + "scope": SCOPE_OPENID, + "client_id": self.provider.client_id, + "client_secret": b64encode(f"saa:{self.token.key}".encode()).decode(), + }, + ) + self.assertEqual(response.status_code, 400) + self.assertJSONEqual( + response.content.decode(), + {"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]}, + ) + + def test_wrong_token(self): + """test invalid token""" + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + { + "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, + "scope": SCOPE_OPENID, + "client_id": self.provider.client_id, + "client_secret": b64encode(f"sa:{self.token.key}foo".encode()).decode(), + }, + ) + self.assertEqual(response.status_code, 400) + self.assertJSONEqual( + response.content.decode(), + {"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]}, + ) + + def test_no_provider(self): + """test no provider""" + self.app.provider = None + self.app.save() + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + { + "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, + "scope": SCOPE_OPENID, + "client_id": self.provider.client_id, + "client_secret": b64encode(f"sa:{self.token.key}".encode()).decode(), + }, + ) + self.assertEqual(response.status_code, 400) + self.assertJSONEqual( + response.content.decode(), + {"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]}, + ) + + def test_permission_denied(self): + """test permission denied""" + group = Group.objects.create(name="foo") + PolicyBinding.objects.create( + group=group, + target=self.app, + order=0, + ) + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + { + "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, + "scope": SCOPE_OPENID, + "client_id": self.provider.client_id, + "client_secret": b64encode(f"sa:{self.token.key}".encode()).decode(), + }, + ) + self.assertEqual(response.status_code, 400) + self.assertJSONEqual( + response.content.decode(), + {"error": "invalid_grant", "error_description": TokenError.errors["invalid_grant"]}, + ) + + def test_successful(self): + """test successful""" + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + { + "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, + "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", + "client_id": self.provider.client_id, + "client_secret": b64encode(f"sa:{self.token.key}".encode()).decode(), + }, + ) + self.assertEqual(response.status_code, 200) + body = loads(response.content.decode()) + self.assertEqual(body["token_type"], TOKEN_TYPE) + _, alg = self.provider.jwt_key + jwt = decode( + body["access_token"], + key=self.provider.signing_key.public_key, + algorithms=[alg], + audience=self.provider.client_id, + ) + self.assertEqual(jwt["given_name"], self.user.name) + self.assertEqual(jwt["preferred_username"], self.user.username) + jwt = decode( + body["id_token"], + key=self.provider.signing_key.public_key, + algorithms=[alg], + audience=self.provider.client_id, + ) + self.assertEqual(jwt["given_name"], self.user.name) + self.assertEqual(jwt["preferred_username"], self.user.username) + + def test_successful_password(self): + """test successful (password grant)""" + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + { + "grant_type": GRANT_TYPE_PASSWORD, + "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", + "client_id": self.provider.client_id, + "client_secret": b64encode(f"sa:{self.token.key}".encode()).decode(), + }, + ) + self.assertEqual(response.status_code, 200) + body = loads(response.content.decode()) + self.assertEqual(body["token_type"], TOKEN_TYPE) + _, alg = self.provider.jwt_key + jwt = decode( + body["access_token"], + key=self.provider.signing_key.public_key, + algorithms=[alg], + audience=self.provider.client_id, + ) + self.assertEqual(jwt["given_name"], self.user.name) + self.assertEqual(jwt["preferred_username"], self.user.username) diff --git a/authentik/providers/oauth2/tests/test_token_cc.py b/authentik/providers/oauth2/tests/test_token_cc_user_pw.py similarity index 99% rename from authentik/providers/oauth2/tests/test_token_cc.py rename to authentik/providers/oauth2/tests/test_token_cc_user_pw.py index a766a687ab..0af554c2b2 100644 --- a/authentik/providers/oauth2/tests/test_token_cc.py +++ b/authentik/providers/oauth2/tests/test_token_cc_user_pw.py @@ -23,7 +23,7 @@ from authentik.providers.oauth2.models import OAuth2Provider, ScopeMapping from authentik.providers.oauth2.tests.utils import OAuthTestCase -class TestTokenClientCredentials(OAuthTestCase): +class TestTokenClientCredentialsUserNamePassword(OAuthTestCase): """Test token (client_credentials) view""" @apply_blueprint("system/providers-oauth2.yaml") diff --git a/authentik/providers/oauth2/views/token.py b/authentik/providers/oauth2/views/token.py index 58a85507aa..6856e4ab10 100644 --- a/authentik/providers/oauth2/views/token.py +++ b/authentik/providers/oauth2/views/token.py @@ -1,6 +1,7 @@ """authentik OAuth2 Token views""" -from base64 import urlsafe_b64encode +from base64 import b64decode, urlsafe_b64encode +from binascii import Error from dataclasses import InitVar, dataclass from datetime import datetime from hashlib import sha256 @@ -23,10 +24,12 @@ from authentik.core.middleware import CTX_AUTH_VIA from authentik.core.models import ( USER_ATTRIBUTE_EXPIRES, USER_ATTRIBUTE_GENERATED, + USER_PATH_SYSTEM_PREFIX, Application, Token, TokenIntents, User, + UserTypes, ) from authentik.events.models import Event, EventAction from authentik.events.signals import get_login_event @@ -286,11 +289,29 @@ class TokenParams: raise TokenError("invalid_grant") def __post_init_client_credentials(self, request: HttpRequest): + # client_credentials flow with client assertion if request.POST.get(CLIENT_ASSERTION_TYPE, "") != "": return self.__post_init_client_credentials_jwt(request) + # authentik-custom-ish client credentials flow + if request.POST.get("username", "") != "": + return self.__post_init_client_credentials_creds( + request, request.POST.get("username"), request.POST.get("password") + ) + # Standard method which creates an automatic user + if self.client_secret == self.provider.client_secret: + return self.__post_init_client_credentials_generated(request) + # Standard workaround method which stores username:password + # as client_secret + try: + user, _, password = b64decode(self.client_secret).decode("utf-8").partition(":") + return self.__post_init_client_credentials_creds(request, user, password) + except (ValueError, Error): + raise TokenError("invalid_grant") + + def __post_init_client_credentials_creds( + self, request: HttpRequest, username: str, password: str + ): # Authenticate user based on credentials - username = request.POST.get("username") - password = request.POST.get("password") user = User.objects.filter(username=username).first() if not user: raise TokenError("invalid_grant") @@ -316,7 +337,6 @@ class TokenParams: PLAN_CONTEXT_APPLICATION: app, }, ).from_http(request, user=user) - return None # pylint: disable=too-many-locals def __post_init_client_credentials_jwt(self, request: HttpRequest): @@ -409,6 +429,35 @@ class TokenParams: }, ).from_http(request, user=self.user) + def __post_init_client_credentials_generated(self, request: HttpRequest): + # Authorize user access + app = Application.objects.filter(provider=self.provider).first() + if not app or not app.provider: + raise TokenError("invalid_grant") + self.user, _ = User.objects.update_or_create( + # trim username to ensure the entire username is max 150 chars + # (22 chars being the length of the "template") + username=f"ak-{self.provider.name[:150-22]}-client_credentials", + defaults={ + "attributes": { + USER_ATTRIBUTE_GENERATED: True, + }, + "last_login": timezone.now(), + "name": f"Autogenerated user from application {app.name} (client credentials)", + "path": f"{USER_PATH_SYSTEM_PREFIX}/apps/{app.slug}", + "type": UserTypes.SERVICE_ACCOUNT, + }, + ) + self.__check_policy_access(app, request) + + Event.new( + action=EventAction.LOGIN, + **{ + PLAN_CONTEXT_METHOD: "oauth_client_secret", + PLAN_CONTEXT_APPLICATION: app, + }, + ).from_http(request, user=self.user) + def __post_init_device_code(self, request: HttpRequest): device_code = request.POST.get("device_code", "") code = DeviceToken.objects.filter(device_code=device_code, provider=self.provider).first() @@ -418,7 +467,6 @@ class TokenParams: def __create_user_from_jwt(self, token: dict[str, Any], app: Application, source: OAuthSource): """Create user from JWT""" - exp = token.get("exp") self.user, created = User.objects.update_or_create( username=f"{self.provider.name}-{token.get('sub')}", defaults={ @@ -428,8 +476,10 @@ class TokenParams: "last_login": timezone.now(), "name": f"Autogenerated user from application {app.name} (client credentials JWT)", "path": source.get_user_path(), + "type": UserTypes.SERVICE_ACCOUNT, }, ) + exp = token.get("exp") if created and exp: self.user.attributes[USER_ATTRIBUTE_EXPIRES] = exp self.user.save() diff --git a/website/docs/providers/oauth2/client_credentials.md b/website/docs/providers/oauth2/client_credentials.md index a0018795de..4a1c7b178a 100644 --- a/website/docs/providers/oauth2/client_credentials.md +++ b/website/docs/providers/oauth2/client_credentials.md @@ -23,6 +23,10 @@ password=my-token This will return a JSON response with an `access_token`, which is a signed JWT token. This token can be sent along requests to other hosts, which can then validate the JWT based on the signing key configured in authentik. +Starting with authentik 2024.next, it is also possible to encode the username and token of the user to authenticate with, separated with a colon, into a base64 string and pass it as `client_secret` value. + +In addition to that, with authentik 2024.next it is also possible to pass the configured `client_secret` value, which will automatically generate a service account user for which the JWT token will be issued. + ### JWT-authentication Starting with authentik 2022.4, you can authenticate and get a token using an existing JWT.