stages webauthn: migrate tasks
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
@ -1,6 +1,8 @@
|
||||
"""authentik webauthn app config"""
|
||||
|
||||
from authentik.blueprints.apps import ManagedAppConfig
|
||||
from authentik.lib.utils.time import fqdn_rand
|
||||
from authentik.tasks.schedules.lib import ScheduleSpec
|
||||
|
||||
|
||||
class AuthentikStageAuthenticatorWebAuthnConfig(ManagedAppConfig):
|
||||
@ -10,3 +12,11 @@ class AuthentikStageAuthenticatorWebAuthnConfig(ManagedAppConfig):
|
||||
label = "authentik_stages_authenticator_webauthn"
|
||||
verbose_name = "authentik Stages.Authenticator.WebAuthn"
|
||||
default = True
|
||||
|
||||
def get_tenant_schedule_specs(self) -> list[ScheduleSpec]:
|
||||
return [
|
||||
ScheduleSpec(
|
||||
actor_name="authentik.stages.authenticator_webauthn.tasks.webauthn_mds_import",
|
||||
crontab=f"{fqdn_rand('webauthn_mds_import')} {fqdn_rand('webauthn_mds_import', 24)} * * {fqdn_rand('webauthn_mds_import', 7)}", # noqa: E501
|
||||
),
|
||||
]
|
||||
|
@ -1,17 +0,0 @@
|
||||
"""Stage authenticator webauthn Settings"""
|
||||
|
||||
from celery.schedules import crontab
|
||||
|
||||
from authentik.lib.utils.time import fqdn_rand
|
||||
|
||||
CELERY_BEAT_SCHEDULE = {
|
||||
"stages_authenticator_webauthn_import_mds": {
|
||||
"task": "authentik.stages.authenticator_webauthn.tasks.webauthn_mds_import",
|
||||
"schedule": crontab(
|
||||
minute=fqdn_rand("webauthn_mds_import"),
|
||||
hour=fqdn_rand("webauthn_mds_import", 24),
|
||||
day_of_week=fqdn_rand("webauthn_mds_import", 7),
|
||||
),
|
||||
"options": {"queue": "authentik_scheduled"},
|
||||
},
|
||||
}
|
@ -6,15 +6,15 @@ from pathlib import Path
|
||||
|
||||
from django.core.cache import cache
|
||||
from django.db.transaction import atomic
|
||||
from dramatiq.actor import actor
|
||||
from fido2.mds3 import filter_revoked, parse_blob
|
||||
|
||||
from authentik.events.models import TaskStatus
|
||||
from authentik.events.system_tasks import SystemTask, prefill_task
|
||||
from authentik.root.celery import CELERY_APP
|
||||
from authentik.stages.authenticator_webauthn.models import (
|
||||
UNKNOWN_DEVICE_TYPE_AAGUID,
|
||||
WebAuthnDeviceType,
|
||||
)
|
||||
from authentik.tasks.middleware import CurrentTask
|
||||
from authentik.tasks.models import Task, TaskStatus
|
||||
|
||||
CACHE_KEY_MDS_NO = "goauthentik.io/stages/authenticator_webauthn/mds_no"
|
||||
AAGUID_BLOB_PATH = Path(__file__).parent / "mds" / "aaguid.json"
|
||||
@ -29,13 +29,10 @@ def mds_ca() -> bytes:
|
||||
return _raw_root.read()
|
||||
|
||||
|
||||
@CELERY_APP.task(
|
||||
bind=True,
|
||||
base=SystemTask,
|
||||
)
|
||||
@prefill_task
|
||||
def webauthn_mds_import(self: SystemTask, force=False):
|
||||
@actor
|
||||
def webauthn_mds_import(force=False):
|
||||
"""Background task to import FIDO Alliance MDS blob and AAGUIDs into database"""
|
||||
self: Task = CurrentTask.get_task()
|
||||
with open(MDS_BLOB_PATH, mode="rb") as _raw_blob:
|
||||
blob = parse_blob(_raw_blob.read(), mds_ca())
|
||||
to_create_update = [
|
||||
|
@ -139,7 +139,7 @@ class TestAuthenticatorWebAuthnStage(FlowTestCase):
|
||||
|
||||
def test_register_restricted_device_type_deny(self):
|
||||
"""Test registration with restricted devices (fail)"""
|
||||
webauthn_mds_import.delay(force=True).get()
|
||||
webauthn_mds_import.send(force=True)
|
||||
self.stage.device_type_restrictions.set(
|
||||
WebAuthnDeviceType.objects.filter(
|
||||
description="Android Authenticator with SafetyNet Attestation"
|
||||
@ -204,7 +204,7 @@ class TestAuthenticatorWebAuthnStage(FlowTestCase):
|
||||
|
||||
def test_register_restricted_device_type_allow(self):
|
||||
"""Test registration with restricted devices (allow)"""
|
||||
webauthn_mds_import.delay(force=True).get()
|
||||
webauthn_mds_import.send(force=True)
|
||||
self.stage.device_type_restrictions.set(
|
||||
WebAuthnDeviceType.objects.filter(description="iCloud Keychain")
|
||||
)
|
||||
@ -253,7 +253,7 @@ class TestAuthenticatorWebAuthnStage(FlowTestCase):
|
||||
|
||||
def test_register_restricted_device_type_allow_unknown(self):
|
||||
"""Test registration with restricted devices (allow, unknown device type)"""
|
||||
webauthn_mds_import.delay(force=True).get()
|
||||
webauthn_mds_import.send(force=True)
|
||||
WebAuthnDeviceType.objects.filter(description="iCloud Keychain").delete()
|
||||
self.stage.device_type_restrictions.set(
|
||||
WebAuthnDeviceType.objects.filter(aaguid=UNKNOWN_DEVICE_TYPE_AAGUID)
|
||||
|
Reference in New Issue
Block a user