stages/authenticator_webauthn: add option to configure max attempts (#15041)

* house keeping - migrate to session part 1

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* cleanup v2

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add max_attempts

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* teeny tiny cleanup

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add ui

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens L.
2025-06-23 01:49:07 +02:00
committed by GitHub
parent f76becfd86
commit 53d8f9bd8c
12 changed files with 286 additions and 82 deletions

View File

@ -13,7 +13,6 @@ class Command(TenantCommand):
parser.add_argument("usernames", nargs="*", type=str)
def handle_per_tenant(self, **options):
print(options)
new_type = UserTypes(options["type"])
qs = (
User.objects.exclude_anonymous()

View File

@ -1,6 +1,7 @@
"""Validation stage challenge checking"""
from json import loads
from typing import TYPE_CHECKING
from urllib.parse import urlencode
from django.http import HttpRequest
@ -36,10 +37,12 @@ from authentik.stages.authenticator_email.models import EmailDevice
from authentik.stages.authenticator_sms.models import SMSDevice
from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
from authentik.stages.authenticator_webauthn.models import UserVerification, WebAuthnDevice
from authentik.stages.authenticator_webauthn.stage import SESSION_KEY_WEBAUTHN_CHALLENGE
from authentik.stages.authenticator_webauthn.stage import PLAN_CONTEXT_WEBAUTHN_CHALLENGE
from authentik.stages.authenticator_webauthn.utils import get_origin, get_rp_id
LOGGER = get_logger()
if TYPE_CHECKING:
from authentik.stages.authenticator_validate.stage import AuthenticatorValidateStageView
class DeviceChallenge(PassiveSerializer):
@ -52,11 +55,11 @@ class DeviceChallenge(PassiveSerializer):
def get_challenge_for_device(
request: HttpRequest, stage: AuthenticatorValidateStage, device: Device
stage_view: "AuthenticatorValidateStageView", stage: AuthenticatorValidateStage, device: Device
) -> dict:
"""Generate challenge for a single device"""
if isinstance(device, WebAuthnDevice):
return get_webauthn_challenge(request, stage, device)
return get_webauthn_challenge(stage_view, stage, device)
if isinstance(device, EmailDevice):
return {"email": mask_email(device.email)}
# Code-based challenges have no hints
@ -64,26 +67,30 @@ def get_challenge_for_device(
def get_webauthn_challenge_without_user(
request: HttpRequest, stage: AuthenticatorValidateStage
stage_view: "AuthenticatorValidateStageView", stage: AuthenticatorValidateStage
) -> dict:
"""Same as `get_webauthn_challenge`, but allows any client device. We can then later check
who the device belongs to."""
request.session.pop(SESSION_KEY_WEBAUTHN_CHALLENGE, None)
stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
authentication_options = generate_authentication_options(
rp_id=get_rp_id(request),
rp_id=get_rp_id(stage_view.request),
allow_credentials=[],
user_verification=UserVerificationRequirement(stage.webauthn_user_verification),
)
request.session[SESSION_KEY_WEBAUTHN_CHALLENGE] = authentication_options.challenge
stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = (
authentication_options.challenge
)
return loads(options_to_json(authentication_options))
def get_webauthn_challenge(
request: HttpRequest, stage: AuthenticatorValidateStage, device: WebAuthnDevice | None = None
stage_view: "AuthenticatorValidateStageView",
stage: AuthenticatorValidateStage,
device: WebAuthnDevice | None = None,
) -> dict:
"""Send the client a challenge that we'll check later"""
request.session.pop(SESSION_KEY_WEBAUTHN_CHALLENGE, None)
stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
allowed_credentials = []
@ -94,12 +101,14 @@ def get_webauthn_challenge(
allowed_credentials.append(user_device.descriptor)
authentication_options = generate_authentication_options(
rp_id=get_rp_id(request),
rp_id=get_rp_id(stage_view.request),
allow_credentials=allowed_credentials,
user_verification=UserVerificationRequirement(stage.webauthn_user_verification),
)
request.session[SESSION_KEY_WEBAUTHN_CHALLENGE] = authentication_options.challenge
stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = (
authentication_options.challenge
)
return loads(options_to_json(authentication_options))
@ -146,7 +155,7 @@ def validate_challenge_code(code: str, stage_view: StageView, user: User) -> Dev
def validate_challenge_webauthn(data: dict, stage_view: StageView, user: User) -> Device:
"""Validate WebAuthn Challenge"""
request = stage_view.request
challenge = request.session.get(SESSION_KEY_WEBAUTHN_CHALLENGE)
challenge = stage_view.executor.plan.context.get(PLAN_CONTEXT_WEBAUTHN_CHALLENGE)
stage: AuthenticatorValidateStage = stage_view.executor.current_stage
try:
credential = parse_authentication_credential_json(data)

View File

@ -224,7 +224,7 @@ class AuthenticatorValidateStageView(ChallengeStageView):
data={
"device_class": device_class,
"device_uid": device.pk,
"challenge": get_challenge_for_device(self.request, stage, device),
"challenge": get_challenge_for_device(self, stage, device),
"last_used": device.last_used,
}
)
@ -243,7 +243,7 @@ class AuthenticatorValidateStageView(ChallengeStageView):
"device_class": DeviceClasses.WEBAUTHN,
"device_uid": -1,
"challenge": get_webauthn_challenge_without_user(
self.request,
self,
self.executor.current_stage,
),
"last_used": None,

View File

@ -31,7 +31,7 @@ from authentik.stages.authenticator_webauthn.models import (
WebAuthnDevice,
WebAuthnDeviceType,
)
from authentik.stages.authenticator_webauthn.stage import SESSION_KEY_WEBAUTHN_CHALLENGE
from authentik.stages.authenticator_webauthn.stage import PLAN_CONTEXT_WEBAUTHN_CHALLENGE
from authentik.stages.authenticator_webauthn.tasks import webauthn_mds_import
from authentik.stages.identification.models import IdentificationStage, UserFields
from authentik.stages.user_login.models import UserLoginStage
@ -103,7 +103,11 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
device_classes=[DeviceClasses.WEBAUTHN],
webauthn_user_verification=UserVerification.PREFERRED,
)
challenge = get_challenge_for_device(request, stage, webauthn_device)
plan = FlowPlan("")
stage_view = AuthenticatorValidateStageView(
FlowExecutorView(flow=None, current_stage=stage, plan=plan), request=request
)
challenge = get_challenge_for_device(stage_view, stage, webauthn_device)
del challenge["challenge"]
self.assertEqual(
challenge,
@ -122,7 +126,9 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
with self.assertRaises(ValidationError):
validate_challenge_webauthn(
{}, StageView(FlowExecutorView(current_stage=stage), request=request), self.user
{},
StageView(FlowExecutorView(current_stage=stage, plan=plan), request=request),
self.user,
)
def test_device_challenge_webauthn_restricted(self):
@ -193,22 +199,35 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
sign_count=0,
rp_id=generate_id(),
)
challenge = get_challenge_for_device(request, stage, webauthn_device)
webauthn_challenge = request.session[SESSION_KEY_WEBAUTHN_CHALLENGE]
plan = FlowPlan("")
plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
"g98I51mQvZXo5lxLfhrD2zfolhZbLRyCgqkkYap1jwSaJ13BguoJWCF9_Lg3AgO4Wh-Bqa556JE20oKsYbl6RA"
)
stage_view = AuthenticatorValidateStageView(
FlowExecutorView(flow=None, current_stage=stage, plan=plan), request=request
)
challenge = get_challenge_for_device(stage_view, stage, webauthn_device)
self.assertEqual(
challenge,
{
"allowCredentials": [
challenge["allowCredentials"],
[
{
"id": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
"type": "public-key",
}
],
"challenge": bytes_to_base64url(webauthn_challenge),
"rpId": "testserver",
"timeout": 60000,
"userVerification": "preferred",
},
)
self.assertIsNotNone(challenge["challenge"])
self.assertEqual(
challenge["rpId"],
"testserver",
)
self.assertEqual(
challenge["timeout"],
60000,
)
self.assertEqual(
challenge["userVerification"],
"preferred",
)
def test_get_challenge_userless(self):
@ -228,18 +247,16 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
sign_count=0,
rp_id=generate_id(),
)
challenge = get_webauthn_challenge_without_user(request, stage)
webauthn_challenge = request.session[SESSION_KEY_WEBAUTHN_CHALLENGE]
self.assertEqual(
challenge,
{
"allowCredentials": [],
"challenge": bytes_to_base64url(webauthn_challenge),
"rpId": "testserver",
"timeout": 60000,
"userVerification": "preferred",
},
plan = FlowPlan("")
stage_view = AuthenticatorValidateStageView(
FlowExecutorView(flow=None, current_stage=stage, plan=plan), request=request
)
challenge = get_webauthn_challenge_without_user(stage_view, stage)
self.assertEqual(challenge["allowCredentials"], [])
self.assertIsNotNone(challenge["challenge"])
self.assertEqual(challenge["rpId"], "testserver")
self.assertEqual(challenge["timeout"], 60000)
self.assertEqual(challenge["userVerification"], "preferred")
def test_validate_challenge_unrestricted(self):
"""Test webauthn authentication (unrestricted webauthn device)"""
@ -275,10 +292,10 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
"last_used": None,
}
]
session[SESSION_KEY_PLAN] = plan
session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
"aCC6ak_DP45xMH1qyxzUM5iC2xc4QthQb09v7m4qDBmY8FvWvhxFzSuFlDYQmclrh5fWS5q0TPxgJGF4vimcFQ"
)
session[SESSION_KEY_PLAN] = plan
session.save()
response = self.client.post(
@ -352,10 +369,10 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
"last_used": None,
}
]
session[SESSION_KEY_PLAN] = plan
session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
"aCC6ak_DP45xMH1qyxzUM5iC2xc4QthQb09v7m4qDBmY8FvWvhxFzSuFlDYQmclrh5fWS5q0TPxgJGF4vimcFQ"
)
session[SESSION_KEY_PLAN] = plan
session.save()
response = self.client.post(
@ -433,10 +450,10 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
"last_used": None,
}
]
session[SESSION_KEY_PLAN] = plan
session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
"g98I51mQvZXo5lxLfhrD2zfolhZbLRyCgqkkYap1jwSaJ13BguoJWCF9_Lg3AgO4Wh-Bqa556JE20oKsYbl6RA"
)
session[SESSION_KEY_PLAN] = plan
session.save()
response = self.client.post(
@ -496,17 +513,14 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
not_configured_action=NotConfiguredAction.CONFIGURE,
device_classes=[DeviceClasses.WEBAUTHN],
)
stage_view = AuthenticatorValidateStageView(
FlowExecutorView(flow=flow, current_stage=stage), request=request
)
request = get_request("/")
request.session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
plan = FlowPlan(flow.pk.hex)
plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
"g98I51mQvZXo5lxLfhrD2zfolhZbLRyCgqkkYap1jwSaJ13BguoJWCF9_Lg3AgO4Wh-Bqa556JE20oKsYbl6RA"
)
request.session.save()
request = get_request("/")
stage_view = AuthenticatorValidateStageView(
FlowExecutorView(flow=flow, current_stage=stage), request=request
FlowExecutorView(flow=flow, current_stage=stage, plan=plan), request=request
)
request.META["SERVER_NAME"] = "localhost"
request.META["SERVER_PORT"] = "9000"

View File

@ -25,6 +25,7 @@ class AuthenticatorWebAuthnStageSerializer(StageSerializer):
"resident_key_requirement",
"device_type_restrictions",
"device_type_restrictions_obj",
"max_attempts",
]

View File

@ -0,0 +1,21 @@
# Generated by Django 5.1.11 on 2025-06-13 22:41
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
(
"authentik_stages_authenticator_webauthn",
"0012_webauthndevice_created_webauthndevice_last_updated_and_more",
),
]
operations = [
migrations.AddField(
model_name="authenticatorwebauthnstage",
name="max_attempts",
field=models.PositiveIntegerField(default=0),
),
]

View File

@ -84,6 +84,8 @@ class AuthenticatorWebAuthnStage(ConfigurableStage, FriendlyNamedStage, Stage):
device_type_restrictions = models.ManyToManyField("WebAuthnDeviceType", blank=True)
max_attempts = models.PositiveIntegerField(default=0)
@property
def serializer(self) -> type[BaseSerializer]:
from authentik.stages.authenticator_webauthn.api.stages import (

View File

@ -5,12 +5,13 @@ from uuid import UUID
from django.http import HttpRequest, HttpResponse
from django.http.request import QueryDict
from django.utils.translation import gettext as __
from django.utils.translation import gettext_lazy as _
from rest_framework.fields import CharField
from rest_framework.serializers import ValidationError
from webauthn import options_to_json
from webauthn.helpers.bytes_to_base64url import bytes_to_base64url
from webauthn.helpers.exceptions import InvalidRegistrationResponse
from webauthn.helpers.exceptions import WebAuthnException
from webauthn.helpers.structs import (
AttestationConveyancePreference,
AuthenticatorAttachment,
@ -41,7 +42,8 @@ from authentik.stages.authenticator_webauthn.models import (
)
from authentik.stages.authenticator_webauthn.utils import get_origin, get_rp_id
SESSION_KEY_WEBAUTHN_CHALLENGE = "authentik/stages/authenticator_webauthn/challenge"
PLAN_CONTEXT_WEBAUTHN_CHALLENGE = "goauthentik.io/stages/authenticator_webauthn/challenge"
PLAN_CONTEXT_WEBAUTHN_ATTEMPT = "goauthentik.io/stages/authenticator_webauthn/attempt"
class AuthenticatorWebAuthnChallenge(WithUserInfoChallenge):
@ -62,7 +64,7 @@ class AuthenticatorWebAuthnChallengeResponse(ChallengeResponse):
def validate_response(self, response: dict) -> dict:
"""Validate webauthn challenge response"""
challenge = self.request.session[SESSION_KEY_WEBAUTHN_CHALLENGE]
challenge = self.stage.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE]
try:
registration: VerifiedRegistration = verify_registration_response(
@ -71,7 +73,7 @@ class AuthenticatorWebAuthnChallengeResponse(ChallengeResponse):
expected_rp_id=get_rp_id(self.request),
expected_origin=get_origin(self.request),
)
except InvalidRegistrationResponse as exc:
except WebAuthnException as exc:
self.stage.logger.warning("registration failed", exc=exc)
raise ValidationError(f"Registration failed. Error: {exc}") from None
@ -114,9 +116,10 @@ class AuthenticatorWebAuthnStageView(ChallengeStageView):
response_class = AuthenticatorWebAuthnChallengeResponse
def get_challenge(self, *args, **kwargs) -> Challenge:
# clear session variables prior to starting a new registration
self.request.session.pop(SESSION_KEY_WEBAUTHN_CHALLENGE, None)
stage: AuthenticatorWebAuthnStage = self.executor.current_stage
self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0)
# clear flow variables prior to starting a new registration
self.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
user = self.get_pending_user()
# library accepts none so we store null in the database, but if there is a value
@ -139,8 +142,7 @@ class AuthenticatorWebAuthnStageView(ChallengeStageView):
attestation=AttestationConveyancePreference.DIRECT,
)
self.request.session[SESSION_KEY_WEBAUTHN_CHALLENGE] = registration_options.challenge
self.request.session.save()
self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = registration_options.challenge
return AuthenticatorWebAuthnChallenge(
data={
"registration": loads(options_to_json(registration_options)),
@ -153,6 +155,24 @@ class AuthenticatorWebAuthnStageView(ChallengeStageView):
response.user = self.get_pending_user()
return response
def challenge_invalid(self, response):
stage: AuthenticatorWebAuthnStage = self.executor.current_stage
self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0)
self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] += 1
if (
stage.max_attempts > 0
and self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] >= stage.max_attempts
):
return self.executor.stage_invalid(
__(
"Exceeded maximum attempts. "
"Contact your {brand} administrator for help.".format(
brand=self.request.brand.branding_title
)
)
)
return super().challenge_invalid(response)
def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
# Webauthn Challenge has already been validated
webauthn_credential: VerifiedRegistration = response.validated_data["response"]
@ -179,6 +199,3 @@ class AuthenticatorWebAuthnStageView(ChallengeStageView):
else:
return self.executor.stage_invalid("Device with Credential ID already exists.")
return self.executor.stage_ok()
def cleanup(self):
self.request.session.pop(SESSION_KEY_WEBAUTHN_CHALLENGE, None)

View File

@ -18,7 +18,7 @@ from authentik.stages.authenticator_webauthn.models import (
WebAuthnDevice,
WebAuthnDeviceType,
)
from authentik.stages.authenticator_webauthn.stage import SESSION_KEY_WEBAUTHN_CHALLENGE
from authentik.stages.authenticator_webauthn.stage import PLAN_CONTEXT_WEBAUTHN_CHALLENGE
from authentik.stages.authenticator_webauthn.tasks import webauthn_mds_import
@ -57,6 +57,9 @@ class TestAuthenticatorWebAuthnStage(FlowTestCase):
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
)
plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
self.assertEqual(response.status_code, 200)
session = self.client.session
self.assertStageResponse(
@ -70,7 +73,7 @@ class TestAuthenticatorWebAuthnStage(FlowTestCase):
"name": self.user.username,
"displayName": self.user.name,
},
"challenge": bytes_to_base64url(session[SESSION_KEY_WEBAUTHN_CHALLENGE]),
"challenge": bytes_to_base64url(plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE]),
"pubKeyCredParams": [
{"type": "public-key", "alg": -7},
{"type": "public-key", "alg": -8},
@ -97,11 +100,11 @@ class TestAuthenticatorWebAuthnStage(FlowTestCase):
"""Test registration"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session[SESSION_KEY_WEBAUTHN_CHALLENGE] = b64decode(
plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = b64decode(
b"03Xodi54gKsfnP5I9VFfhaGXVVE2NUyZpBBXns/JI+x6V9RY2Tw2QmxRJkhh7174EkRazUntIwjMVY9bFG60Lw=="
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
@ -146,11 +149,11 @@ class TestAuthenticatorWebAuthnStage(FlowTestCase):
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session[SESSION_KEY_WEBAUTHN_CHALLENGE] = b64decode(
plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = b64decode(
b"03Xodi54gKsfnP5I9VFfhaGXVVE2NUyZpBBXns/JI+x6V9RY2Tw2QmxRJkhh7174EkRazUntIwjMVY9bFG60Lw=="
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
@ -209,11 +212,11 @@ class TestAuthenticatorWebAuthnStage(FlowTestCase):
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session[SESSION_KEY_WEBAUTHN_CHALLENGE] = b64decode(
plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = b64decode(
b"03Xodi54gKsfnP5I9VFfhaGXVVE2NUyZpBBXns/JI+x6V9RY2Tw2QmxRJkhh7174EkRazUntIwjMVY9bFG60Lw=="
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
@ -259,11 +262,11 @@ class TestAuthenticatorWebAuthnStage(FlowTestCase):
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session[SESSION_KEY_WEBAUTHN_CHALLENGE] = b64decode(
plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = b64decode(
b"03Xodi54gKsfnP5I9VFfhaGXVVE2NUyZpBBXns/JI+x6V9RY2Tw2QmxRJkhh7174EkRazUntIwjMVY9bFG60Lw=="
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
@ -298,3 +301,109 @@ class TestAuthenticatorWebAuthnStage(FlowTestCase):
self.assertEqual(response.status_code, 200)
self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
self.assertTrue(WebAuthnDevice.objects.filter(user=self.user).exists())
def test_register_max_retries(self):
"""Test registration (exceeding max retries)"""
self.stage.max_attempts = 2
self.stage.save()
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = b64decode(
b"03Xodi54gKsfnP5I9VFfhaGXVVE2NUyZpBBXns/JI+x6V9RY2Tw2QmxRJkhh7174EkRazUntIwjMVY9bFG60Lw=="
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
# first failed request
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
data={
"component": "ak-stage-authenticator-webauthn",
"response": {
"id": "kqnmrVLnDG-OwsSNHkihYZaNz5s",
"rawId": "kqnmrVLnDG-OwsSNHkihYZaNz5s",
"type": "public-key",
"registrationClientExtensions": "{}",
"response": {
"clientDataJSON": (
"eyJ0eXBlIjoid2ViYXV0aG4uY3JlYXRlIiwiY2hhbGxlbmd"
"lIjoiMDNYb2RpNTRnS3NmblA1STlWRmZoYUdYVlZFMk5VeV"
"pwQkJYbnNfSkkteDZWOVJZMlR3MlFteFJKa2hoNzE3NEVrU"
"mF6VW50SXdqTVZZOWJGRzYwTHciLCJvcmlnaW4iOiJodHRw"
"Oi8vbG9jYWxob3N0OjkwMDAiLCJjcm9zc09yaWdpbiI6ZmF"
),
"attestationObject": (
"o2NmbXRkbm9uZWdhdHRTdG10oGhhdXRoRGF0YViYSZYN5Yg"
"OjGh0NBcPZHZgW4_krrmihjLHmVzzuoMdl2NdAAAAAPv8MA"
"cVTk7MjAtuAgVX170AFJKp5q1S5wxvjsLEjR5IoWGWjc-bp"
"QECAyYgASFYIKtcZHPumH37XHs0IM1v3pUBRIqHVV_SE-Lq"
"2zpJAOVXIlgg74Fg_WdB0kuLYqCKbxogkEPaVtR_iR3IyQFIJAXBzds"
),
},
},
},
SERVER_NAME="localhost",
SERVER_PORT="9000",
)
self.assertEqual(response.status_code, 200)
self.assertStageResponse(
response,
flow=self.flow,
component="ak-stage-authenticator-webauthn",
response_errors={
"response": [
{
"string": (
"Registration failed. Error: Unable to decode "
"client_data_json bytes as JSON"
),
"code": "invalid",
}
]
},
)
self.assertFalse(WebAuthnDevice.objects.filter(user=self.user).exists())
# Second failed request
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
data={
"component": "ak-stage-authenticator-webauthn",
"response": {
"id": "kqnmrVLnDG-OwsSNHkihYZaNz5s",
"rawId": "kqnmrVLnDG-OwsSNHkihYZaNz5s",
"type": "public-key",
"registrationClientExtensions": "{}",
"response": {
"clientDataJSON": (
"eyJ0eXBlIjoid2ViYXV0aG4uY3JlYXRlIiwiY2hhbGxlbmd"
"lIjoiMDNYb2RpNTRnS3NmblA1STlWRmZoYUdYVlZFMk5VeV"
"pwQkJYbnNfSkkteDZWOVJZMlR3MlFteFJKa2hoNzE3NEVrU"
"mF6VW50SXdqTVZZOWJGRzYwTHciLCJvcmlnaW4iOiJodHRw"
"Oi8vbG9jYWxob3N0OjkwMDAiLCJjcm9zc09yaWdpbiI6ZmF"
),
"attestationObject": (
"o2NmbXRkbm9uZWdhdHRTdG10oGhhdXRoRGF0YViYSZYN5Yg"
"OjGh0NBcPZHZgW4_krrmihjLHmVzzuoMdl2NdAAAAAPv8MA"
"cVTk7MjAtuAgVX170AFJKp5q1S5wxvjsLEjR5IoWGWjc-bp"
"QECAyYgASFYIKtcZHPumH37XHs0IM1v3pUBRIqHVV_SE-Lq"
"2zpJAOVXIlgg74Fg_WdB0kuLYqCKbxogkEPaVtR_iR3IyQFIJAXBzds"
),
},
},
},
SERVER_NAME="localhost",
SERVER_PORT="9000",
)
self.assertEqual(response.status_code, 200)
self.assertStageResponse(
response,
flow=self.flow,
component="ak-stage-access-denied",
error_message=(
"Exceeded maximum attempts. Contact your authentik administrator for help."
),
)
self.assertFalse(WebAuthnDevice.objects.filter(user=self.user).exists())

View File

@ -13310,6 +13310,12 @@
"format": "uuid"
},
"title": "Device type restrictions"
},
"max_attempts": {
"type": "integer",
"minimum": 0,
"maximum": 2147483647,
"title": "Max attempts"
}
},
"required": []

View File

@ -34963,6 +34963,10 @@ paths:
name: friendly_name
schema:
type: string
- in: query
name: max_attempts
schema:
type: integer
- in: query
name: name
schema:
@ -42633,6 +42637,10 @@ components:
items:
$ref: '#/components/schemas/WebAuthnDeviceType'
readOnly: true
max_attempts:
type: integer
maximum: 2147483647
minimum: 0
required:
- component
- device_type_restrictions_obj
@ -42675,6 +42683,10 @@ components:
items:
type: string
format: uuid
max_attempts:
type: integer
maximum: 2147483647
minimum: 0
required:
- name
AuthorizationCodeAuthMethodEnum:
@ -52625,6 +52637,10 @@ components:
items:
type: string
format: uuid
max_attempts:
type: integer
maximum: 2147483647
minimum: 0
PatchedBlueprintInstanceRequest:
type: object
description: Info about a single blueprint instance file

View File

@ -2,6 +2,7 @@ import { RenderFlowOption } from "@goauthentik/admin/flows/utils";
import { BaseStageForm } from "@goauthentik/admin/stages/BaseStageForm";
import { deviceTypeRestrictionPair } from "@goauthentik/admin/stages/authenticator_webauthn/utils";
import { DEFAULT_CONFIG } from "@goauthentik/common/api/config";
import "@goauthentik/components/ak-number-input";
import "@goauthentik/elements/ak-dual-select/ak-dual-select-provider";
import { DataProvision } from "@goauthentik/elements/ak-dual-select/types";
import "@goauthentik/elements/forms/HorizontalFormElement";
@ -165,6 +166,15 @@ export class AuthenticatorWebAuthnStageForm extends BaseStageForm<AuthenticatorW
>
</ak-radio>
</ak-form-element-horizontal>
<ak-number-input
label=${msg("Maximum registration attempts")}
required
name="maxAttempts"
value="${this.instance?.maxAttempts || 0}"
help=${msg(
"Maximum allowed registration attempts. When set to 0 attempts, attempts are not limited.",
)}
></ak-number-input>
<ak-form-element-horizontal
label=${msg("Device type restrictions")}
name="deviceTypeRestrictions"