providers/oauth2: add initial JWE support (#11344)

* providers/oauth2: add initial JWE support

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* re-migrate, only set id_token_encryption_* when encryption key is set

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add docs

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add jwks test with encryption

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens L.
2024-10-17 14:04:19 +02:00
committed by GitHub
parent fc1f146049
commit 47206d3328
18 changed files with 329 additions and 35 deletions

View File

@ -6,6 +6,7 @@
"authn",
"entra",
"goauthentik",
"jwe",
"jwks",
"kubernetes",
"oidc",

View File

@ -39,6 +39,7 @@ class OAuth2ProviderSerializer(ProviderSerializer):
"refresh_token_validity",
"include_claims_in_id_token",
"signing_key",
"encryption_key",
"redirect_uris",
"sub_mode",
"property_mappings",

View File

@ -0,0 +1,42 @@
# Generated by Django 5.0.9 on 2024-10-16 14:53
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_crypto", "0004_alter_certificatekeypair_name"),
(
"authentik_providers_oauth2",
"0020_remove_accesstoken_authentik_p_token_4bc870_idx_and_more",
),
]
operations = [
migrations.AddField(
model_name="oauth2provider",
name="encryption_key",
field=models.ForeignKey(
help_text="Key used to encrypt the tokens. When set, tokens will be encrypted and returned as JWEs.",
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="oauth2provider_encryption_key_set",
to="authentik_crypto.certificatekeypair",
verbose_name="Encryption Key",
),
),
migrations.AlterField(
model_name="oauth2provider",
name="signing_key",
field=models.ForeignKey(
help_text="Key used to sign the tokens.",
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="oauth2provider_signing_key_set",
to="authentik_crypto.certificatekeypair",
verbose_name="Signing Key",
),
),
]

View File

@ -18,6 +18,9 @@ from django.http import HttpRequest
from django.templatetags.static import static
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from jwcrypto.common import json_encode
from jwcrypto.jwe import JWE
from jwcrypto.jwk import JWK
from jwt import encode
from rest_framework.serializers import Serializer
from structlog.stdlib import get_logger
@ -206,9 +209,19 @@ class OAuth2Provider(WebfingerProvider, Provider):
verbose_name=_("Signing Key"),
on_delete=models.SET_NULL,
null=True,
help_text=_("Key used to sign the tokens."),
related_name="oauth2provider_signing_key_set",
)
encryption_key = models.ForeignKey(
CertificateKeyPair,
verbose_name=_("Encryption Key"),
on_delete=models.SET_NULL,
null=True,
help_text=_(
"Key used to sign the tokens. Only required when JWT Algorithm is set to RS256."
"Key used to encrypt the tokens. When set, "
"tokens will be encrypted and returned as JWEs."
),
related_name="oauth2provider_encryption_key_set",
)
jwks_sources = models.ManyToManyField(
@ -287,7 +300,27 @@ class OAuth2Provider(WebfingerProvider, Provider):
if self.signing_key:
headers["kid"] = self.signing_key.kid
key, alg = self.jwt_key
return encode(payload, key, algorithm=alg, headers=headers)
encoded = encode(payload, key, algorithm=alg, headers=headers)
if self.encryption_key:
return self.encrypt(encoded)
return encoded
def encrypt(self, raw: str) -> str:
"""Encrypt JWT"""
key = JWK.from_pem(self.encryption_key.certificate_data.encode())
jwe = JWE(
raw,
json_encode(
{
"alg": "RSA-OAEP-256",
"enc": "A256CBC-HS512",
"typ": "JWE",
"kid": self.encryption_key.kid,
}
),
)
jwe.add_recipient(key)
return jwe.serialize(compact=True)
def webfinger(self, resource: str, request: HttpRequest):
return {

View File

@ -412,6 +412,73 @@ class TestAuthorize(OAuthTestCase):
delta=5,
)
@apply_blueprint("system/providers-oauth2.yaml")
def test_full_implicit_enc(self):
"""Test full authorization with encryption"""
flow = create_test_flow()
provider: OAuth2Provider = OAuth2Provider.objects.create(
name=generate_id(),
client_id="test",
authorization_flow=flow,
redirect_uris="http://localhost",
signing_key=self.keypair,
encryption_key=self.keypair,
)
provider.property_mappings.set(
ScopeMapping.objects.filter(
managed__in=[
"goauthentik.io/providers/oauth2/scope-openid",
"goauthentik.io/providers/oauth2/scope-email",
"goauthentik.io/providers/oauth2/scope-profile",
]
)
)
provider.property_mappings.add(
ScopeMapping.objects.create(
name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
)
)
Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
state = generate_id()
user = create_test_admin_user()
self.client.force_login(user)
with patch(
"authentik.providers.oauth2.id_token.get_login_event",
MagicMock(
return_value=Event(
action=EventAction.LOGIN,
context={PLAN_CONTEXT_METHOD: "password"},
created=now(),
)
),
):
# Step 1, initiate params and get redirect to flow
self.client.get(
reverse("authentik_providers_oauth2:authorize"),
data={
"response_type": "id_token",
"client_id": "test",
"state": state,
"scope": "openid test",
"redirect_uri": "http://localhost",
"nonce": generate_id(),
},
)
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
)
self.assertEqual(response.status_code, 200)
token: AccessToken = AccessToken.objects.filter(user=user).first()
expires = timedelta_from_string(provider.access_token_validity).total_seconds()
jwt = self.validate_jwe(token, provider)
self.assertEqual(jwt["amr"], ["pwd"])
self.assertEqual(jwt["sub"], "foo")
self.assertAlmostEqual(
jwt["exp"] - now().timestamp(),
expires,
delta=5,
)
def test_full_fragment_code(self):
"""Test full authorization"""
flow = create_test_flow()

View File

@ -93,6 +93,24 @@ class TestJWKS(OAuthTestCase):
self.assertEqual(len(body["keys"]), 1)
PyJWKSet.from_dict(body)
def test_enc(self):
"""Test with JWE"""
provider = OAuth2Provider.objects.create(
name="test",
client_id="test",
authorization_flow=create_test_flow(),
redirect_uris="http://local.invalid",
signing_key=create_test_cert(PrivateKeyAlg.ECDSA),
encryption_key=create_test_cert(PrivateKeyAlg.ECDSA),
)
app = Application.objects.create(name="test", slug="test", provider=provider)
response = self.client.get(
reverse("authentik_providers_oauth2:jwks", kwargs={"application_slug": app.slug})
)
body = json.loads(response.content.decode())
self.assertEqual(len(body["keys"]), 2)
PyJWKSet.from_dict(body)
def test_ecdsa_coords_mismatched(self):
"""Test JWKS request with ES256"""
cert = CertificateKeyPair.objects.create(

View File

@ -152,6 +152,36 @@ class TestToken(OAuthTestCase):
)
self.validate_jwt(access, provider)
def test_auth_code_enc(self):
"""test request param"""
provider = OAuth2Provider.objects.create(
name=generate_id(),
authorization_flow=create_test_flow(),
redirect_uris="http://local.invalid",
signing_key=self.keypair,
encryption_key=self.keypair,
)
# Needs to be assigned to an application for iss to be set
self.app.provider = provider
self.app.save()
header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
user = create_test_admin_user()
code = AuthorizationCode.objects.create(
code="foobar", provider=provider, user=user, auth_time=timezone.now()
)
response = self.client.post(
reverse("authentik_providers_oauth2:token"),
data={
"grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
"code": code.code,
"redirect_uri": "http://local.invalid",
},
HTTP_AUTHORIZATION=f"Basic {header}",
)
self.assertEqual(response.status_code, 200)
access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
self.validate_jwe(access, provider)
@apply_blueprint("system/providers-oauth2.yaml")
def test_refresh_token_view(self):
"""test request param"""

View File

@ -34,7 +34,7 @@ class TestTokenClientCredentialsJWTSource(OAuthTestCase):
self.factory = RequestFactory()
self.cert = create_test_cert()
jwk = JWKSView().get_jwk_for_key(self.cert)
jwk = JWKSView().get_jwk_for_key(self.cert, "sig")
self.source: OAuthSource = OAuthSource.objects.create(
name=generate_id(),
slug=generate_id(),

View File

@ -3,6 +3,8 @@
from typing import Any
from django.test import TestCase
from jwcrypto.jwe import JWE
from jwcrypto.jwk import JWK
from jwt import decode
from authentik.core.tests.utils import create_test_cert
@ -32,6 +34,15 @@ class OAuthTestCase(TestCase):
if key in container:
self.assertIsNotNone(container[key])
def validate_jwe(self, token: AccessToken, provider: OAuth2Provider) -> dict[str, Any]:
"""Validate JWEs"""
private_key = JWK.from_pem(provider.encryption_key.key_data.encode())
jwetoken = JWE()
jwetoken.deserialize(token.token, key=private_key)
token.token = jwetoken.payload.decode()
return self.validate_jwt(token, provider)
def validate_jwt(self, token: AccessToken, provider: OAuth2Provider) -> dict[str, Any]:
"""Validate that all required fields are set"""
key, alg = provider.jwt_key

View File

@ -64,36 +64,42 @@ def to_base64url_uint(val: int, min_length: int = 0) -> bytes:
class JWKSView(View):
"""Show RSA Key data for Provider"""
def get_jwk_for_key(self, key: CertificateKeyPair) -> dict | None:
def get_jwk_for_key(self, key: CertificateKeyPair, use: str) -> dict | None:
"""Convert a certificate-key pair into JWK"""
private_key = key.private_key
key_data = None
if not private_key:
return key_data
key_data = {}
if use == "sig":
if isinstance(private_key, RSAPrivateKey):
key_data["alg"] = JWTAlgorithms.RS256
elif isinstance(private_key, EllipticCurvePrivateKey):
key_data["alg"] = JWTAlgorithms.ES256
elif use == "enc":
key_data["alg"] = "RSA-OAEP-256"
key_data["enc"] = "A256CBC-HS512"
if isinstance(private_key, RSAPrivateKey):
public_key: RSAPublicKey = private_key.public_key()
public_numbers = public_key.public_numbers()
key_data = {
"kid": key.kid,
"kty": "RSA",
"alg": JWTAlgorithms.RS256,
"use": "sig",
"n": to_base64url_uint(public_numbers.n).decode(),
"e": to_base64url_uint(public_numbers.e).decode(),
}
key_data["kid"] = key.kid
key_data["kty"] = "RSA"
key_data["use"] = use
key_data["n"] = to_base64url_uint(public_numbers.n).decode()
key_data["e"] = to_base64url_uint(public_numbers.e).decode()
elif isinstance(private_key, EllipticCurvePrivateKey):
public_key: EllipticCurvePublicKey = private_key.public_key()
public_numbers = public_key.public_numbers()
curve_type = type(public_key.curve)
key_data = {
"kid": key.kid,
"kty": "EC",
"alg": JWTAlgorithms.ES256,
"use": "sig",
"x": to_base64url_uint(public_numbers.x, min_length_map[curve_type]).decode(),
"y": to_base64url_uint(public_numbers.y, min_length_map[curve_type]).decode(),
"crv": ec_crv_map.get(curve_type, public_key.curve.name),
}
key_data["kid"] = key.kid
key_data["kty"] = "EC"
key_data["use"] = use
key_data["x"] = to_base64url_uint(public_numbers.x, min_length_map[curve_type]).decode()
key_data["y"] = to_base64url_uint(public_numbers.y, min_length_map[curve_type]).decode()
key_data["crv"] = ec_crv_map.get(curve_type, public_key.curve.name)
else:
return key_data
key_data["x5c"] = [b64encode(key.certificate.public_bytes(Encoding.DER)).decode("utf-8")]
@ -113,14 +119,19 @@ class JWKSView(View):
"""Show JWK Key data for Provider"""
application = get_object_or_404(Application, slug=application_slug)
provider: OAuth2Provider = get_object_or_404(OAuth2Provider, pk=application.provider_id)
signing_key: CertificateKeyPair = provider.signing_key
response_data = {}
if signing_key:
jwk = self.get_jwk_for_key(signing_key)
if signing_key := provider.signing_key:
jwk = self.get_jwk_for_key(signing_key, "sig")
if jwk:
response_data["keys"] = [jwk]
response_data.setdefault("keys", [])
response_data["keys"].append(jwk)
if encryption_key := provider.encryption_key:
jwk = self.get_jwk_for_key(encryption_key, "enc")
if jwk:
response_data.setdefault("keys", [])
response_data["keys"].append(jwk)
response = JsonResponse(response_data)
response["Access-Control-Allow-Origin"] = "*"

View File

@ -46,7 +46,7 @@ class ProviderInfoView(View):
if SCOPE_OPENID not in scopes:
scopes.append(SCOPE_OPENID)
_, supported_alg = provider.jwt_key
return {
config = {
"issuer": provider.get_issuer(self.request),
"authorization_endpoint": self.request.build_absolute_uri(
reverse("authentik_providers_oauth2:authorize")
@ -114,6 +114,10 @@ class ProviderInfoView(View):
"claims_parameter_supported": False,
"code_challenge_methods_supported": [PKCE_METHOD_PLAIN, PKCE_METHOD_S256],
}
if provider.encryption_key:
config["id_token_encryption_alg_values_supported"] = ["RSA-OAEP-256"]
config["id_token_encryption_enc_values_supported"] = ["A256CBC-HS512"]
return config
def get_claims(self, provider: OAuth2Provider) -> list[str]:
"""Get a list of supported claims based on configured scope mappings"""

View File

@ -1,6 +1,6 @@
# Generated by Django 5.0.9 on 2024-10-10 15:45
from django.db import migrations
from django.db import migrations, models
from django.apps.registry import Apps
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
@ -23,4 +23,22 @@ class Migration(migrations.Migration):
operations = [
migrations.RunPython(fix_X509SubjectName),
migrations.AlterField(
model_name="samlsource",
name="name_id_policy",
field=models.TextField(
choices=[
("urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress", "Email"),
("urn:oasis:names:tc:SAML:2.0:nameid-format:persistent", "Persistent"),
("urn:oasis:names:tc:SAML:1.1:nameid-format:X509SubjectName", "X509"),
(
"urn:oasis:names:tc:SAML:2.0:nameid-format:WindowsDomainQualifiedName",
"Windows",
),
("urn:oasis:names:tc:SAML:2.0:nameid-format:transient", "Transient"),
],
default="urn:oasis:names:tc:SAML:2.0:nameid-format:persistent",
help_text="NameID Policy sent to the IdP. Can be unset, in which case no Policy is sent.",
),
),
]

View File

@ -5354,7 +5354,13 @@
"type": "string",
"format": "uuid",
"title": "Signing Key",
"description": "Key used to sign the tokens. Only required when JWT Algorithm is set to RS256."
"description": "Key used to sign the tokens."
},
"encryption_key": {
"type": "string",
"format": "uuid",
"title": "Encryption Key",
"description": "Key used to encrypt the tokens. When set, tokens will be encrypted and returned as JWEs."
},
"redirect_uris": {
"type": "string",

17
poetry.lock generated
View File

@ -2213,6 +2213,21 @@ files = [
[package.dependencies]
referencing = ">=0.31.0"
[[package]]
name = "jwcrypto"
version = "1.5.6"
description = "Implementation of JOSE Web standards"
optional = false
python-versions = ">= 3.8"
files = [
{file = "jwcrypto-1.5.6-py3-none-any.whl", hash = "sha256:150d2b0ebbdb8f40b77f543fb44ffd2baeff48788be71f67f03566692fd55789"},
{file = "jwcrypto-1.5.6.tar.gz", hash = "sha256:771a87762a0c081ae6166958a954f80848820b2ab066937dc8b8379d65b1b039"},
]
[package.dependencies]
cryptography = ">=3.4"
typing-extensions = ">=4.5.0"
[[package]]
name = "kombu"
version = "5.3.7"
@ -5473,4 +5488,4 @@ files = [
[metadata]
lock-version = "2.0"
python-versions = "~3.12"
content-hash = "ef49ce543812a47597b9108ca277cd4b6563fe00d0739e763b6e1e1151c95eba"
content-hash = "f3bd82b8ae975dbb660a97fe248f118f780e43687d082d49f37a2d53b450adda"

View File

@ -116,6 +116,7 @@ geoip2 = "*"
google-api-python-client = "*"
gunicorn = "*"
jsonpatch = "*"
jwcrypto = "*"
kubernetes = "*"
ldap3 = "*"
lxml = "*"

View File

@ -42382,8 +42382,13 @@ components:
type: string
format: uuid
nullable: true
description: Key used to sign the tokens. Only required when JWT Algorithm
is set to RS256.
description: Key used to sign the tokens.
encryption_key:
type: string
format: uuid
nullable: true
description: Key used to encrypt the tokens. When set, tokens will be encrypted
and returned as JWEs.
redirect_uris:
type: string
description: Enter each URI on a new line.
@ -42478,8 +42483,13 @@ components:
type: string
format: uuid
nullable: true
description: Key used to sign the tokens. Only required when JWT Algorithm
is set to RS256.
description: Key used to sign the tokens.
encryption_key:
type: string
format: uuid
nullable: true
description: Key used to encrypt the tokens. When set, tokens will be encrypted
and returned as JWEs.
redirect_uris:
type: string
description: Enter each URI on a new line.
@ -46263,8 +46273,13 @@ components:
type: string
format: uuid
nullable: true
description: Key used to sign the tokens. Only required when JWT Algorithm
is set to RS256.
description: Key used to sign the tokens.
encryption_key:
type: string
format: uuid
nullable: true
description: Key used to encrypt the tokens. When set, tokens will be encrypted
and returned as JWEs.
redirect_uris:
type: string
description: Enter each URI on a new line.

View File

@ -205,6 +205,15 @@ export class OAuth2ProviderFormPage extends BaseProviderForm<OAuth2Provider> {
></ak-crypto-certificate-search>
<p class="pf-c-form__helper-text">${msg("Key used to sign the tokens.")}</p>
</ak-form-element-horizontal>
<ak-form-element-horizontal label=${msg("Encryption Key")} name="encryptionKey">
<!-- NOTE: 'null' cast to 'undefined' on encryptionKey to satisfy Lit requirements -->
<ak-crypto-certificate-search
certificate=${ifDefined(this.instance?.encryptionKey ?? undefined)}
></ak-crypto-certificate-search>
<p class="pf-c-form__helper-text">
${msg("Key used to encrypt the tokens.")}
</p>
</ak-form-element-horizontal>
</div>
</ak-form-group>

View File

@ -82,3 +82,15 @@ return True
When a client does not request any scopes, authentik will treat the request as if all configured scopes were requested. Depending on the configured authorization flow, consent still needs to be given, and all scopes are listed there.
This does _not_ apply to special scopes, as those are not configurable in the provider.
## Signing & Encryption
[JWT](https://jwt.io/introduction)s created by authentik will always be signed.
When a _Signing Key_ is selected in the provider, the JWT will be signed asymmetrically with the private key of the selected certificate, and can be verified using the public key of the certificate. The public key data of the signing key can be retrieved via the JWKS endpoint listed on the provider page.
When no _Signing Key_ is selected, the JWT will be signed symmetrically with the _Client secret_ of the provider, which can be seen in the provider settings.
### Encryption <span class="badge badge--version">authentik 2024.10+</span>
authentik can also encrypt JWTs (turning them into JWEs) it issues by selecting an _Encryption Key_ in the provider. When selected, all JWTs will be encrypted symmetrically using the selected certificate. authentik uses the `RSA-OAEP-256` algorithm with the `A256CBC-HS512` encryption method.