ensure descirptions'
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
@ -33,7 +33,7 @@ def _set_prom_info():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Update latest version info"))
|
@actor(description=_("Update latest version info."))
|
||||||
def update_latest_version():
|
def update_latest_version():
|
||||||
self: Task = CurrentTask.get_task()
|
self: Task = CurrentTask.get_task()
|
||||||
if CONFIG.get_bool("disable_update_check"):
|
if CONFIG.get_bool("disable_update_check"):
|
||||||
|
@ -108,7 +108,7 @@ class BlueprintEventHandler(FileSystemEventHandler):
|
|||||||
|
|
||||||
|
|
||||||
@actor(
|
@actor(
|
||||||
description=_("Find blueprints as `blueprints_find` does, but return a safe dict"),
|
description=_("Find blueprints as `blueprints_find` does, but return a safe dict."),
|
||||||
throws=(DatabaseError, ProgrammingError, InternalError),
|
throws=(DatabaseError, ProgrammingError, InternalError),
|
||||||
)
|
)
|
||||||
def blueprints_find_dict():
|
def blueprints_find_dict():
|
||||||
@ -148,7 +148,7 @@ def blueprints_find() -> list[BlueprintFile]:
|
|||||||
|
|
||||||
|
|
||||||
@actor(
|
@actor(
|
||||||
description=_("Find blueprints and check if they need to be created in the database"),
|
description=_("Find blueprints and check if they need to be created in the database."),
|
||||||
throws=(DatabaseError, ProgrammingError, InternalError),
|
throws=(DatabaseError, ProgrammingError, InternalError),
|
||||||
)
|
)
|
||||||
def blueprints_discovery(path: str | None = None):
|
def blueprints_discovery(path: str | None = None):
|
||||||
@ -189,7 +189,7 @@ def check_blueprint_v1_file(blueprint: BlueprintFile):
|
|||||||
apply_blueprint.send_with_options(args=(instance.pk,), rel_obj=instance)
|
apply_blueprint.send_with_options(args=(instance.pk,), rel_obj=instance)
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Apply single blueprint"))
|
@actor(description=_("Apply single blueprint."))
|
||||||
def apply_blueprint(instance_pk: UUID):
|
def apply_blueprint(instance_pk: UUID):
|
||||||
self: Task = CurrentTask.get_task()
|
self: Task = CurrentTask.get_task()
|
||||||
self.set_uid(str(instance_pk))
|
self.set_uid(str(instance_pk))
|
||||||
@ -240,7 +240,7 @@ def apply_blueprint(instance_pk: UUID):
|
|||||||
instance.save()
|
instance.save()
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Remove blueprints which couldn't be fetched"))
|
@actor(description=_("Remove blueprints which couldn't be fetched."))
|
||||||
def clear_failed_blueprints():
|
def clear_failed_blueprints():
|
||||||
# Exclude OCI blueprints as those might be temporarily unavailable
|
# Exclude OCI blueprints as those might be temporarily unavailable
|
||||||
for blueprint in BlueprintInstance.objects.exclude(path__startswith=OCI_PREFIX):
|
for blueprint in BlueprintInstance.objects.exclude(path__startswith=OCI_PREFIX):
|
||||||
|
@ -20,7 +20,7 @@ from authentik.tasks.models import Task
|
|||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Remove expired objects"))
|
@actor(description=_("Remove expired objects."))
|
||||||
def clean_expired_models():
|
def clean_expired_models():
|
||||||
self: Task = CurrentTask.get_task()
|
self: Task = CurrentTask.get_task()
|
||||||
for cls in ExpiringModel.__subclasses__():
|
for cls in ExpiringModel.__subclasses__():
|
||||||
@ -35,7 +35,7 @@ def clean_expired_models():
|
|||||||
self.info(f"Expired {amount} {cls._meta.verbose_name_plural}")
|
self.info(f"Expired {amount} {cls._meta.verbose_name_plural}")
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Remove temporary users created by SAML Sources"))
|
@actor(description=_("Remove temporary users created by SAML Sources."))
|
||||||
def clean_temporary_users():
|
def clean_temporary_users():
|
||||||
self: Task = CurrentTask.get_task()
|
self: Task = CurrentTask.get_task()
|
||||||
_now = datetime.now()
|
_now = datetime.now()
|
||||||
|
@ -37,7 +37,7 @@ def ensure_certificate_valid(body: str):
|
|||||||
return body
|
return body
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Discover, import and update certificates from the filesystem"))
|
@actor(description=_("Discover, import and update certificates from the filesystem."))
|
||||||
def certificate_discovery():
|
def certificate_discovery():
|
||||||
self: Task = CurrentTask.get_task()
|
self: Task = CurrentTask.get_task()
|
||||||
certs = {}
|
certs = {}
|
||||||
|
@ -62,7 +62,7 @@ def _check_app_access(stream: Stream, event_data: dict) -> bool:
|
|||||||
return engine.passing
|
return engine.passing
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Send an SSF event"))
|
@actor(description=_("Send an SSF event."))
|
||||||
def _send_ssf_event(stream_uuid: UUID, event_data: dict[str, Any]):
|
def _send_ssf_event(stream_uuid: UUID, event_data: dict[str, Any]):
|
||||||
self: Task = CurrentTask.get_task()
|
self: Task = CurrentTask.get_task()
|
||||||
|
|
||||||
|
@ -23,7 +23,7 @@ from authentik.tasks.models import Task
|
|||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Check if policies attached to NotificationRule match event"))
|
@actor(description=_("Check if policies attached to NotificationRule match event."))
|
||||||
def event_trigger_handler(event_uuid: UUID, trigger_name: str):
|
def event_trigger_handler(event_uuid: UUID, trigger_name: str):
|
||||||
"""Check if policies attached to NotificationRule match event"""
|
"""Check if policies attached to NotificationRule match event"""
|
||||||
event: Event = Event.objects.filter(event_uuid=event_uuid).first()
|
event: Event = Event.objects.filter(event_uuid=event_uuid).first()
|
||||||
@ -80,7 +80,7 @@ def event_trigger_handler(event_uuid: UUID, trigger_name: str):
|
|||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Send notification"))
|
@actor(description=_("Send notification."))
|
||||||
def notification_transport(transport_pk: int, event_pk: str, user_pk: int, trigger_pk: str):
|
def notification_transport(transport_pk: int, event_pk: str, user_pk: int, trigger_pk: str):
|
||||||
"""Send notification over specified transport"""
|
"""Send notification over specified transport"""
|
||||||
event = Event.objects.filter(pk=event_pk).first()
|
event = Event.objects.filter(pk=event_pk).first()
|
||||||
@ -101,7 +101,7 @@ def notification_transport(transport_pk: int, event_pk: str, user_pk: int, trigg
|
|||||||
transport.send(notification)
|
transport.send(notification)
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Cleanup events for GDPR compliance"))
|
@actor(description=_("Cleanup events for GDPR compliance."))
|
||||||
def gdpr_cleanup(user_pk: int):
|
def gdpr_cleanup(user_pk: int):
|
||||||
"""cleanup events from gdpr_compliance"""
|
"""cleanup events from gdpr_compliance"""
|
||||||
events = Event.objects.filter(user__pk=user_pk)
|
events = Event.objects.filter(user__pk=user_pk)
|
||||||
@ -109,7 +109,7 @@ def gdpr_cleanup(user_pk: int):
|
|||||||
events.delete()
|
events.delete()
|
||||||
|
|
||||||
|
|
||||||
@actor
|
@actor(description=_("Cleanup seen notifications and notifications whose event expired."))
|
||||||
def notification_cleanup():
|
def notification_cleanup():
|
||||||
"""Cleanup seen notifications and notifications whose event expired."""
|
"""Cleanup seen notifications and notifications whose event expired."""
|
||||||
self: Task = CurrentTask.get_task()
|
self: Task = CurrentTask.get_task()
|
||||||
|
@ -84,7 +84,7 @@ def controller_for_outpost(outpost: Outpost) -> type[BaseController] | None:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Update cached state of a service connection"))
|
@actor(description=_("Update cached state of a service connection."))
|
||||||
def outpost_service_connection_monitor(connection_pk: Any):
|
def outpost_service_connection_monitor(connection_pk: Any):
|
||||||
"""Update cached state of a service connection"""
|
"""Update cached state of a service connection"""
|
||||||
connection: OutpostServiceConnection = (
|
connection: OutpostServiceConnection = (
|
||||||
@ -109,7 +109,7 @@ def outpost_service_connection_monitor(connection_pk: Any):
|
|||||||
cache.set(connection.state_key, state, timeout=None)
|
cache.set(connection.state_key, state, timeout=None)
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Create/update/monitor/delete the deployment of an Outpost"))
|
@actor(description=_("Create/update/monitor/delete the deployment of an Outpost."))
|
||||||
def outpost_controller(outpost_pk: str, action: str = "up", from_cache: bool = False):
|
def outpost_controller(outpost_pk: str, action: str = "up", from_cache: bool = False):
|
||||||
"""Create/update/monitor/delete the deployment of an Outpost"""
|
"""Create/update/monitor/delete the deployment of an Outpost"""
|
||||||
self: Task = CurrentTask.get_task()
|
self: Task = CurrentTask.get_task()
|
||||||
@ -142,7 +142,7 @@ def outpost_controller(outpost_pk: str, action: str = "up", from_cache: bool = F
|
|||||||
self.info(log)
|
self.info(log)
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Ensure that all Outposts have valid Service Accounts and Tokens"))
|
@actor(description=_("Ensure that all Outposts have valid Service Accounts and Tokens."))
|
||||||
def outpost_token_ensurer():
|
def outpost_token_ensurer():
|
||||||
"""
|
"""
|
||||||
Periodically ensure that all Outposts have valid Service Accounts and Tokens
|
Periodically ensure that all Outposts have valid Service Accounts and Tokens
|
||||||
@ -155,7 +155,7 @@ def outpost_token_ensurer():
|
|||||||
self.info(f"Successfully checked {len(all_outposts)} Outposts.")
|
self.info(f"Successfully checked {len(all_outposts)} Outposts.")
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("If an Outpost is saved, ensure that token is created/updated"))
|
@actor(description=_("If an Outpost is saved, ensure that token is created/updated."))
|
||||||
def outpost_post_save(model_class: str, model_pk: Any):
|
def outpost_post_save(model_class: str, model_pk: Any):
|
||||||
"""If an Outpost is saved, Ensure that token is created/updated
|
"""If an Outpost is saved, Ensure that token is created/updated
|
||||||
|
|
||||||
@ -268,7 +268,7 @@ def outpost_connection_discovery():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Terminate session on all outposts"))
|
@actor(description=_("Terminate session on all outposts."))
|
||||||
def outpost_session_end(session_id: str):
|
def outpost_session_end(session_id: str):
|
||||||
layer = get_channel_layer()
|
layer = get_channel_layer()
|
||||||
hashed_session_id = hash_session_key(session_id)
|
hashed_session_id = hash_session_key(session_id)
|
||||||
|
@ -10,7 +10,7 @@ from authentik.providers.oauth2.id_token import hash_session_key
|
|||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Terminate session on Proxy outpost"))
|
@actor(description=_("Terminate session on Proxy outpost."))
|
||||||
def proxy_on_logout(session_id: str):
|
def proxy_on_logout(session_id: str):
|
||||||
layer = get_channel_layer()
|
layer = get_channel_layer()
|
||||||
hashed_session_id = hash_session_key(session_id)
|
hashed_session_id = hash_session_key(session_id)
|
||||||
|
@ -17,7 +17,7 @@ LOGGER = get_logger()
|
|||||||
CACHE_KEY_STATUS = "goauthentik.io/sources/kerberos/status/"
|
CACHE_KEY_STATUS = "goauthentik.io/sources/kerberos/status/"
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Check connectivity for Kerberos sources"))
|
@actor(description=_("Check connectivity for Kerberos sources."))
|
||||||
def kerberos_connectivity_check(pk: str):
|
def kerberos_connectivity_check(pk: str):
|
||||||
"""Check connectivity for Kerberos Sources"""
|
"""Check connectivity for Kerberos Sources"""
|
||||||
# 2 hour timeout, this task should run every hour
|
# 2 hour timeout, this task should run every hour
|
||||||
@ -31,7 +31,7 @@ def kerberos_connectivity_check(pk: str):
|
|||||||
|
|
||||||
@actor(
|
@actor(
|
||||||
time_limit=(60 * 60 * CONFIG.get_int("sources.kerberos.task_timeout_hours")) * 2.5 * 1000,
|
time_limit=(60 * 60 * CONFIG.get_int("sources.kerberos.task_timeout_hours")) * 2.5 * 1000,
|
||||||
description=_("Sync Kerberos source"),
|
description=_("Sync Kerberos source."),
|
||||||
)
|
)
|
||||||
def kerberos_sync(pk: str):
|
def kerberos_sync(pk: str):
|
||||||
self: Task = CurrentTask.get_task()
|
self: Task = CurrentTask.get_task()
|
||||||
|
@ -34,7 +34,7 @@ CACHE_KEY_PREFIX = "goauthentik.io/sources/ldap/page/"
|
|||||||
CACHE_KEY_STATUS = "goauthentik.io/sources/ldap/status/"
|
CACHE_KEY_STATUS = "goauthentik.io/sources/ldap/status/"
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Check connectivity for LDAP sources"))
|
@actor(description=_("Check connectivity for LDAP sources."))
|
||||||
def ldap_connectivity_check(pk: str | None = None):
|
def ldap_connectivity_check(pk: str | None = None):
|
||||||
"""Check connectivity for LDAP Sources"""
|
"""Check connectivity for LDAP Sources"""
|
||||||
timeout = 60 * 60 * 2
|
timeout = 60 * 60 * 2
|
||||||
@ -50,7 +50,7 @@ def ldap_connectivity_check(pk: str | None = None):
|
|||||||
# group in parallel and then membership, then deletions, so 3x is to cover the serial tasks,
|
# group in parallel and then membership, then deletions, so 3x is to cover the serial tasks,
|
||||||
# and 0.5x on top of that to give some more leeway
|
# and 0.5x on top of that to give some more leeway
|
||||||
time_limit=(60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000) * 3.5,
|
time_limit=(60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000) * 3.5,
|
||||||
description=_("Sync LDAP source"),
|
description=_("Sync LDAP source."),
|
||||||
)
|
)
|
||||||
def ldap_sync(source_pk: str):
|
def ldap_sync(source_pk: str):
|
||||||
"""Sync a single source"""
|
"""Sync a single source"""
|
||||||
@ -119,7 +119,7 @@ def ldap_sync_paginator(source: LDAPSource, sync: type[BaseLDAPSynchronizer]) ->
|
|||||||
|
|
||||||
@actor(
|
@actor(
|
||||||
time_limit=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000,
|
time_limit=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000,
|
||||||
description=_("Sync page for LDAP source"),
|
description=_("Sync page for LDAP source."),
|
||||||
)
|
)
|
||||||
def ldap_sync_page(source_pk: str, sync_class: str, page_cache_key: str):
|
def ldap_sync_page(source_pk: str, sync_class: str, page_cache_key: str):
|
||||||
"""Synchronization of an LDAP Source"""
|
"""Synchronization of an LDAP Source"""
|
||||||
|
@ -17,7 +17,7 @@ LOGGER = get_logger()
|
|||||||
|
|
||||||
@actor(
|
@actor(
|
||||||
description=_(
|
description=_(
|
||||||
"Update OAuth sources' config from well_known, and JWKS info from the configured URL"
|
"Update OAuth sources' config from well_known, and JWKS info from the configured URL."
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
def update_well_known_jwks():
|
def update_well_known_jwks():
|
||||||
|
@ -12,7 +12,7 @@ from authentik.tasks.models import Task
|
|||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Check the validity of a Plex source"))
|
@actor(description=_("Check the validity of a Plex source."))
|
||||||
def check_plex_token(source_pk: str):
|
def check_plex_token(source_pk: str):
|
||||||
"""Check the validity of a Plex source."""
|
"""Check the validity of a Plex source."""
|
||||||
self: Task = CurrentTask.get_task()
|
self: Task = CurrentTask.get_task()
|
||||||
|
@ -30,7 +30,7 @@ def mds_ca() -> bytes:
|
|||||||
return _raw_root.read()
|
return _raw_root.read()
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Background task to import FIDO Alliance MDS blob and AAGUIDs into database"))
|
@actor(description=_("Background task to import FIDO Alliance MDS blob and AAGUIDs into database."))
|
||||||
def webauthn_mds_import(force=False):
|
def webauthn_mds_import(force=False):
|
||||||
"""Background task to import FIDO Alliance MDS blob and AAGUIDs into database"""
|
"""Background task to import FIDO Alliance MDS blob and AAGUIDs into database"""
|
||||||
self: Task = CurrentTask.get_task()
|
self: Task = CurrentTask.get_task()
|
||||||
|
@ -49,7 +49,7 @@ def get_email_body(email: EmailMultiAlternatives) -> str:
|
|||||||
return email.body
|
return email.body
|
||||||
|
|
||||||
|
|
||||||
@actor(description=_("Send email"))
|
@actor(description=_("Send email."))
|
||||||
def send_mail(
|
def send_mail(
|
||||||
message: dict[Any, Any],
|
message: dict[Any, Any],
|
||||||
stage_class_path: str | None = None,
|
stage_class_path: str | None = None,
|
||||||
|
@ -16,12 +16,16 @@ from rest_framework.request import Request
|
|||||||
from rest_framework.response import Response
|
from rest_framework.response import Response
|
||||||
from rest_framework.serializers import SerializerMethodField
|
from rest_framework.serializers import SerializerMethodField
|
||||||
from rest_framework.viewsets import GenericViewSet
|
from rest_framework.viewsets import GenericViewSet
|
||||||
|
from structlog.stdlib import get_logger
|
||||||
|
|
||||||
from authentik.core.api.utils import ModelSerializer
|
from authentik.core.api.utils import ModelSerializer
|
||||||
from authentik.rbac.decorators import permission_required
|
from authentik.rbac.decorators import permission_required
|
||||||
from authentik.tasks.schedules.models import Schedule
|
from authentik.tasks.schedules.models import Schedule
|
||||||
|
|
||||||
|
|
||||||
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
|
||||||
class ScheduleSerializer(ModelSerializer):
|
class ScheduleSerializer(ModelSerializer):
|
||||||
rel_obj_app_label = ReadOnlyField(source="rel_obj_content_type.app_label")
|
rel_obj_app_label = ReadOnlyField(source="rel_obj_content_type.app_label")
|
||||||
rel_obj_model = ReadOnlyField(source="rel_obj_content_type.model")
|
rel_obj_model = ReadOnlyField(source="rel_obj_content_type.model")
|
||||||
@ -51,9 +55,15 @@ class ScheduleSerializer(ModelSerializer):
|
|||||||
try:
|
try:
|
||||||
actor: Actor = get_broker().get_actor(instance.actor_name)
|
actor: Actor = get_broker().get_actor(instance.actor_name)
|
||||||
except ActorNotFound:
|
except ActorNotFound:
|
||||||
return "FIXME this shouldn't happen"
|
LOGGER.warning("Could not find actor for schedule", schedule=instance)
|
||||||
|
return None
|
||||||
if "description" not in actor.options:
|
if "description" not in actor.options:
|
||||||
return "no doc"
|
LOGGER.warning(
|
||||||
|
"Could not find description for actor",
|
||||||
|
schedule=instance,
|
||||||
|
actor=actor.actor_name,
|
||||||
|
)
|
||||||
|
return None
|
||||||
return actor.options["description"]
|
return actor.options["description"]
|
||||||
|
|
||||||
|
|
||||||
|
9
authentik/tasks/tests.py
Normal file
9
authentik/tasks/tests.py
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
from django.test import TestCase
|
||||||
|
from dramatiq.broker import get_broker
|
||||||
|
|
||||||
|
|
||||||
|
class TestActors(TestCase):
|
||||||
|
def test_all_actors_have_description(self):
|
||||||
|
broker = get_broker()
|
||||||
|
for actor in broker.get_declared_actors():
|
||||||
|
self.assertIn("description", actor.options)
|
Reference in New Issue
Block a user