stages/authenticator_validate: add ability to limit webauthn device types (#9180)

* stages/authenticator_validate: add ability to limit webauthn device types

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* reword

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* require enterprise attestation when a device restriction is configured as we need the aaguid

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* improve error message

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add more tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens L
2024-04-11 13:10:05 +02:00
committed by GitHub
parent 35448f6017
commit fd44bc2bec
14 changed files with 398 additions and 83 deletions

View File

@ -7,11 +7,16 @@ from authentik.core.api.used_by import UsedByMixin
from authentik.flows.api.stages import StageSerializer
from authentik.flows.models import NotConfiguredAction
from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage
from authentik.stages.authenticator_webauthn.api.device_types import WebAuthnDeviceTypeSerializer
class AuthenticatorValidateStageSerializer(StageSerializer):
"""AuthenticatorValidateStage Serializer"""
webauthn_allowed_device_types_obj = WebAuthnDeviceTypeSerializer(
source="webauthn_allowed_device_types", many=True, read_only=True
)
def validate_not_configured_action(self, value):
"""Ensure that a configuration stage is set when not_configured_action is configure"""
configuration_stages = self.initial_data.get("configuration_stages", None)
@ -31,6 +36,8 @@ class AuthenticatorValidateStageSerializer(StageSerializer):
"configuration_stages",
"last_auth_threshold",
"webauthn_user_verification",
"webauthn_allowed_device_types",
"webauthn_allowed_device_types_obj",
]

View File

@ -14,8 +14,9 @@ from structlog.stdlib import get_logger
from webauthn import options_to_json
from webauthn.authentication.generate_authentication_options import generate_authentication_options
from webauthn.authentication.verify_authentication_response import verify_authentication_response
from webauthn.helpers import parse_authentication_credential_json
from webauthn.helpers.base64url_to_bytes import base64url_to_bytes
from webauthn.helpers.exceptions import InvalidAuthenticationResponse
from webauthn.helpers.exceptions import InvalidAuthenticationResponse, InvalidJSONStructure
from webauthn.helpers.structs import UserVerificationRequirement
from authentik.core.api.utils import JSONDictField, PassiveSerializer
@ -131,23 +132,40 @@ def validate_challenge_webauthn(data: dict, stage_view: StageView, user: User) -
"""Validate WebAuthn Challenge"""
request = stage_view.request
challenge = request.session.get(SESSION_KEY_WEBAUTHN_CHALLENGE)
credential_id = data.get("id")
stage: AuthenticatorValidateStage = stage_view.executor.current_stage
try:
credential = parse_authentication_credential_json(data)
except InvalidJSONStructure as exc:
LOGGER.warning("Invalid WebAuthn challenge response", exc=exc)
raise ValidationError("Invalid device", "invalid") from None
device = WebAuthnDevice.objects.filter(credential_id=credential_id).first()
device = WebAuthnDevice.objects.filter(credential_id=credential.id).first()
if not device:
raise ValidationError("Invalid device")
raise ValidationError("Invalid device", "invalid")
# We can only check the device's user if the user we're given isn't anonymous
# as this validation is also used for password-less login where webauthn is the very first
# step done by a user. Only if this validation happens at a later stage we can check
# that the device belongs to the user
if not user.is_anonymous and device.user != user:
raise ValidationError("Invalid device")
stage: AuthenticatorValidateStage = stage_view.executor.current_stage
raise ValidationError("Invalid device", "invalid")
# When a device_type was set when creating the device (2024.4+), and we have a limitation,
# make sure the device type is allowed.
if (
device.device_type
and stage.webauthn_allowed_device_types.exists()
and not stage.webauthn_allowed_device_types.filter(pk=device.device_type.pk).exists()
):
raise ValidationError(
_(
"Invalid device type. Contact your {brand} administrator for help.".format(
brand=stage_view.request.brand.branding_title
)
),
"invalid",
)
try:
authentication_verification = verify_authentication_response(
credential=data,
credential=credential,
expected_challenge=challenge,
expected_rp_id=get_rp_id(request),
expected_origin=get_origin(request),

View File

@ -0,0 +1,27 @@
# Generated by Django 5.0.3 on 2024-04-08 18:33
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
(
"authentik_stages_authenticator_validate",
"0012_authenticatorvalidatestage_webauthn_user_verification",
),
(
"authentik_stages_authenticator_webauthn",
"0010_webauthndevicetype_authenticatorwebauthnstage_and_more",
),
]
operations = [
migrations.AddField(
model_name="authenticatorvalidatestage",
name="webauthn_allowed_device_types",
field=models.ManyToManyField(
blank=True, to="authentik_stages_authenticator_webauthn.webauthndevicetype"
),
),
]

View File

@ -71,6 +71,9 @@ class AuthenticatorValidateStage(Stage):
choices=UserVerification.choices,
default=UserVerification.PREFERRED,
)
webauthn_allowed_device_types = models.ManyToManyField(
"authentik_stages_authenticator_webauthn.WebAuthnDeviceType", blank=True
)
@property
def serializer(self) -> type[BaseSerializer]:

View File

@ -5,6 +5,7 @@ from hashlib import sha256
from django.conf import settings
from django.http import HttpRequest, HttpResponse
from django.utils.translation import gettext_lazy as _
from jwt import PyJWTError, decode, encode
from rest_framework.fields import CharField, IntegerField, ListField, UUIDField
from rest_framework.serializers import ValidationError
@ -176,15 +177,30 @@ class AuthenticatorValidateStageView(ChallengeStageView):
threshold = timedelta_from_string(stage.last_auth_threshold)
allowed_devices = []
has_webauthn_filters_set = stage.webauthn_allowed_device_types.exists()
for device in user_devices:
device_class = device.__class__.__name__.lower().replace("device", "")
if device_class not in stage.device_classes:
self.logger.debug("device class not allowed", device_class=device_class)
continue
if isinstance(device, SMSDevice) and device.is_hashed:
self.logger.debug("Hashed SMS device, skipping")
self.logger.debug("Hashed SMS device, skipping", device=device)
continue
allowed_devices.append(device)
# Ignore WebAuthn devices which are not in the allowed types
if (
isinstance(device, WebAuthnDevice)
and device.device_type
and has_webauthn_filters_set
):
if not stage.webauthn_allowed_device_types.filter(
pk=device.device_type.pk
).exists():
self.logger.debug(
"WebAuthn device type not allowed", device=device, type=device.device_type
)
continue
# Ensure only one challenge per device class
# WebAuthn does another device loop to find all WebAuthn devices
if device_class in seen_classes:
@ -251,7 +267,7 @@ class AuthenticatorValidateStageView(ChallengeStageView):
return self.executor.stage_ok()
if stage.not_configured_action == NotConfiguredAction.DENY:
self.logger.debug("Authenticator not configured, denying")
return self.executor.stage_invalid()
return self.executor.stage_invalid(_("No (allowed) MFA authenticator configured."))
if stage.not_configured_action == NotConfiguredAction.CONFIGURE:
self.logger.debug("Authenticator not configured, forcing configure")
return self.prepare_stages(user)

View File

@ -26,8 +26,16 @@ from authentik.stages.authenticator_validate.stage import (
PLAN_CONTEXT_DEVICE_CHALLENGES,
AuthenticatorValidateStageView,
)
from authentik.stages.authenticator_webauthn.models import UserVerification, WebAuthnDevice
from authentik.stages.authenticator_webauthn.models import (
UserVerification,
WebAuthnDevice,
WebAuthnDeviceType,
)
from authentik.stages.authenticator_webauthn.stage import SESSION_KEY_WEBAUTHN_CHALLENGE
from authentik.stages.authenticator_webauthn.tasks import (
webauthn_aaguid_import,
webauthn_mds_import,
)
from authentik.stages.identification.models import IdentificationStage, UserFields
from authentik.stages.user_login.models import UserLoginStage
@ -120,7 +128,56 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
{}, StageView(FlowExecutorView(current_stage=stage), request=request), self.user
)
def test_get_challenge(self):
def test_device_challenge_webauthn_restricted(self):
"""Test webauthn (getting device challenges with a webauthn
device that is not allowed due to aaguid restrictions)"""
webauthn_mds_import(force=True)
webauthn_aaguid_import()
request = get_request("/")
request.user = self.user
WebAuthnDevice.objects.create(
user=self.user,
public_key=bytes_to_base64url(b"qwerqwerqre"),
credential_id=bytes_to_base64url(b"foobarbaz"),
sign_count=0,
rp_id=generate_id(),
device_type=WebAuthnDeviceType.objects.get(
aaguid="2fc0579f-8113-47ea-b116-bb5a8db9202a"
),
)
flow = create_test_flow()
stage = AuthenticatorValidateStage.objects.create(
name=generate_id(),
last_auth_threshold="milliseconds=0",
not_configured_action=NotConfiguredAction.DENY,
device_classes=[DeviceClasses.WEBAUTHN],
webauthn_user_verification=UserVerification.PREFERRED,
)
stage.webauthn_allowed_device_types.set(
WebAuthnDeviceType.objects.filter(
description="Android Authenticator with SafetyNet Attestation"
)
)
session = self.client.session
plan = FlowPlan(flow_pk=flow.pk.hex)
plan.append_stage(stage)
plan.append_stage(UserLoginStage(name=generate_id()))
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session[SESSION_KEY_PLAN] = plan
session.save()
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
)
self.assertStageResponse(
response,
flow,
component="ak-stage-access-denied",
error_message="No (allowed) MFA authenticator configured.",
)
def test_raw_get_challenge(self):
"""Test webauthn"""
request = get_request("/")
request.user = self.user
@ -190,17 +247,21 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
},
)
def test_validate_challenge(self):
"""Test webauthn"""
def test_validate_challenge_unrestricted(self):
"""Test webauthn authentication (unrestricted webauthn device)"""
webauthn_mds_import(force=True)
webauthn_aaguid_import()
device = WebAuthnDevice.objects.create(
user=self.user,
public_key=(
"pQECAyYgASFYIGsBLkklToCQkT7qJT_bJYN1sEc1oJdbnmoOc43i0J"
"H6IlggLTXytuhzFVYYAK4PQNj8_coGrbbzSfUxdiPAcZTQCyU"
"pQECAyYgASFYIF-N4GvQJdTJMAmTOxFX9_boL00zBiSrP0DY9xvJl_FFIlggnyZloVSVofdJNTLMeMdjQHgW2Rzmd5_Xt5AWtNztcdo"
),
credential_id="QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
sign_count=4,
credential_id="X43ga9Al1MkwCZM7EXD1r8Sxj7aXnNsuR013XM7he4kZ-GS9TaA-u3i36wsswjPm",
sign_count=2,
rp_id=generate_id(),
device_type=WebAuthnDeviceType.objects.get(
aaguid="2fc0579f-8113-47ea-b116-bb5a8db9202a"
),
)
flow = create_test_flow()
stage = AuthenticatorValidateStage.objects.create(
@ -222,7 +283,7 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
]
session[SESSION_KEY_PLAN] = plan
session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
"g98I51mQvZXo5lxLfhrD2zfolhZbLRyCgqkkYap1jwSaJ13BguoJWCF9_Lg3AgO4Wh-Bqa556JE20oKsYbl6RA"
"aCC6ak_DP45xMH1qyxzUM5iC2xc4QthQb09v7m4qDBmY8FvWvhxFzSuFlDYQmclrh5fWS5q0TPxgJGF4vimcFQ"
)
session.save()
@ -230,24 +291,23 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
data={
"webauthn": {
"id": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
"rawId": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
"id": "X43ga9Al1MkwCZM7EXD1r8Sxj7aXnNsuR013XM7he4kZ-GS9TaA-u3i36wsswjPm",
"rawId": "X43ga9Al1MkwCZM7EXD1r8Sxj7aXnNsuR013XM7he4kZ-GS9TaA-u3i36wsswjPm",
"type": "public-key",
"assertionClientExtensions": "{}",
"response": {
"clientDataJSON": (
"eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiZzk4STUxbVF2WlhvN"
"Wx4TGZockQyemZvbGhaYkxSeUNncWtrWWFwMWp3U2FKMTNCZ3VvSldDRjlfTGczQW"
"dPNFdoLUJxYTU1NkpFMjBvS3NZYmw2UkEiLCJvcmlnaW4iOiJodHRwOi8vbG9jYWx"
"ob3N0OjkwMDAiLCJjcm9zc09yaWdpbiI6ZmFsc2UsIm90aGVyX2tleXNfY2FuX2Jl"
"X2FkZGVkX2hlcmUiOiJkbyBub3QgY29tcGFyZSBjbGllbnREYXRhSlNPTiBhZ2Fpb"
"nN0IGEgdGVtcGxhdGUuIFNlZSBodHRwczovL2dvby5nbC95YWJQZXgifQ=="
"eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiYUNDN"
"mFrX0RQNDV4TUgxcXl4elVNNWlDMnhjNFF0aFFiMDl2N200cURCbV"
"k4RnZXdmh4RnpTdUZsRFlRbWNscmg1ZldTNXEwVFB4Z0pHRjR2aW1"
"jRlEiLCJvcmlnaW4iOiJodHRwOi8vbG9jYWxob3N0OjkwMDAiLCJj"
"cm9zc09yaWdpbiI6ZmFsc2V9"
),
"signature": (
"MEQCIFNlrHf9ablJAalXLWkrqvHB8oIu8kwvRpH3X3rbJVpI"
"AiAqtOK6mIZPk62kZN0OzFsHfuvu_RlOl7zlqSNzDdz_Ag=="
"MEQCIAHQCGfE_PX1z6mBDaXUNqK_NrllhXylNOmETUD3Khv9AiBTl"
"rX3GDRj5OaOfTToOwUwAhtd74tu0T6DZAVHPb_hlQ=="
),
"authenticatorData": "SZYN5YgOjGh0NBcPZHZgW4_krrmihjLHmVzzuoMdl2MFAAAABQ==",
"authenticatorData": "SZYN5YgOjGh0NBcPZHZgW4_krrmihjLHmVzzuoMdl2MFAAAABg==",
"userHandle": None,
},
},
@ -261,6 +321,96 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
)
self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
def test_validate_challenge_restricted(self):
"""Test webauthn authentication (restricted device type, failure)"""
webauthn_mds_import(force=True)
webauthn_aaguid_import()
device = WebAuthnDevice.objects.create(
user=self.user,
public_key=(
"pQECAyYgASFYIF-N4GvQJdTJMAmTOxFX9_boL00zBiSrP0DY9xvJl_FFIlggnyZloVSVofdJNTLMeMdjQHgW2Rzmd5_Xt5AWtNztcdo"
),
credential_id="X43ga9Al1MkwCZM7EXD1r8Sxj7aXnNsuR013XM7he4kZ-GS9TaA-u3i36wsswjPm",
sign_count=2,
rp_id=generate_id(),
device_type=WebAuthnDeviceType.objects.get(
aaguid="2fc0579f-8113-47ea-b116-bb5a8db9202a"
),
)
flow = create_test_flow()
stage = AuthenticatorValidateStage.objects.create(
name=generate_id(),
not_configured_action=NotConfiguredAction.CONFIGURE,
device_classes=[DeviceClasses.WEBAUTHN],
)
stage.webauthn_allowed_device_types.set(
WebAuthnDeviceType.objects.filter(
description="Android Authenticator with SafetyNet Attestation"
)
)
session = self.client.session
plan = FlowPlan(flow_pk=flow.pk.hex)
plan.append_stage(stage)
plan.append_stage(UserLoginStage(name=generate_id()))
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
plan.context[PLAN_CONTEXT_DEVICE_CHALLENGES] = [
{
"device_class": device.__class__.__name__.lower().replace("device", ""),
"device_uid": device.pk,
"challenge": {},
}
]
session[SESSION_KEY_PLAN] = plan
session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
"aCC6ak_DP45xMH1qyxzUM5iC2xc4QthQb09v7m4qDBmY8FvWvhxFzSuFlDYQmclrh5fWS5q0TPxgJGF4vimcFQ"
)
session.save()
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
data={
"webauthn": {
"id": "X43ga9Al1MkwCZM7EXD1r8Sxj7aXnNsuR013XM7he4kZ-GS9TaA-u3i36wsswjPm",
"rawId": "X43ga9Al1MkwCZM7EXD1r8Sxj7aXnNsuR013XM7he4kZ-GS9TaA-u3i36wsswjPm",
"type": "public-key",
"assertionClientExtensions": "{}",
"response": {
"clientDataJSON": (
"eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiYUNDN"
"mFrX0RQNDV4TUgxcXl4elVNNWlDMnhjNFF0aFFiMDl2N200cURCbV"
"k4RnZXdmh4RnpTdUZsRFlRbWNscmg1ZldTNXEwVFB4Z0pHRjR2aW1"
"jRlEiLCJvcmlnaW4iOiJodHRwOi8vbG9jYWxob3N0OjkwMDAiLCJj"
"cm9zc09yaWdpbiI6ZmFsc2V9"
),
"signature": (
"MEQCIAHQCGfE_PX1z6mBDaXUNqK_NrllhXylNOmETUD3Khv9AiBTl"
"rX3GDRj5OaOfTToOwUwAhtd74tu0T6DZAVHPb_hlQ=="
),
"authenticatorData": "SZYN5YgOjGh0NBcPZHZgW4_krrmihjLHmVzzuoMdl2MFAAAABg==",
"userHandle": None,
},
}
},
SERVER_NAME="localhost",
SERVER_PORT="9000",
)
self.assertEqual(response.status_code, 200)
self.assertStageResponse(
response,
flow,
component="ak-stage-authenticator-validate",
response_errors={
"webauthn": [
{
"string": (
"Invalid device type. Contact your authentik administrator for help."
),
"code": "invalid",
}
]
},
)
def test_validate_challenge_userless(self):
"""Test webauthn"""
device = WebAuthnDevice.objects.create(

View File

@ -126,6 +126,10 @@ class AuthenticatorWebAuthnStageView(ChallengeStageView):
if authenticator_attachment:
authenticator_attachment = AuthenticatorAttachment(str(authenticator_attachment))
attestation = AttestationConveyancePreference.DIRECT
if stage.device_type_restrictions.exists():
attestation = AttestationConveyancePreference.ENTERPRISE
registration_options: PublicKeyCredentialCreationOptions = generate_registration_options(
rp_id=get_rp_id(self.request),
rp_name=self.request.brand.branding_title,
@ -137,7 +141,7 @@ class AuthenticatorWebAuthnStageView(ChallengeStageView):
user_verification=UserVerificationRequirement(str(stage.user_verification)),
authenticator_attachment=authenticator_attachment,
),
attestation=AttestationConveyancePreference.DIRECT,
attestation=attestation,
)
self.request.session[SESSION_KEY_WEBAUTHN_CHALLENGE] = registration_options.challenge