diff --git a/authentik/admin/apps.py b/authentik/admin/apps.py index bc9748a6f9..afd998ccda 100644 --- a/authentik/admin/apps.py +++ b/authentik/admin/apps.py @@ -4,6 +4,7 @@ from prometheus_client import Info from authentik.blueprints.apps import ManagedAppConfig from authentik.lib.utils.time import fqdn_rand +from authentik.tasks.schedules.lib import ScheduleSpec PROM_INFO = Info("authentik_version", "Currently running authentik version") @@ -16,10 +17,10 @@ class AuthentikAdminConfig(ManagedAppConfig): verbose_name = "authentik Admin" default = True - def get_tenant_schedules(self): + def get_tenant_schedule_specs(self) -> list[ScheduleSpec]: return [ - { - "actor_name": "authentik.admin.tasks.update_latest_version", - "crontab": f"{fqdn_rand('admin_latest_version')} * * * *", - }, + ScheduleSpec( + actor_name="authentik.admin.tasks.update_latest_version", + crontab=f"{fqdn_rand('admin_latest_version')} * * * *", + ), ] diff --git a/authentik/admin/tasks.py b/authentik/admin/tasks.py index 841aa48d9d..ae8ad3a60c 100644 --- a/authentik/admin/tasks.py +++ b/authentik/admin/tasks.py @@ -48,8 +48,8 @@ def clear_update_notifications(): @actor def update_latest_version(): - self: Task = CurrentTask.get_task() """Update latest version info""" + self: Task = CurrentTask.get_task() if CONFIG.get_bool("disable_update_check"): cache.set(VERSION_CACHE_KEY, VERSION_NULL, VERSION_CACHE_TIMEOUT) self.set_status(TaskStatus.WARNING, "Version check disabled.") diff --git a/authentik/blueprints/apps.py b/authentik/blueprints/apps.py index 30080b3484..6cfe1e77db 100644 --- a/authentik/blueprints/apps.py +++ b/authentik/blueprints/apps.py @@ -1,16 +1,15 @@ """authentik Blueprints app""" -import pickle # nosec from collections.abc import Callable from importlib import import_module from inspect import ismethod -from typing import Any from django.apps import AppConfig from django.db import DatabaseError, InternalError, ProgrammingError from structlog.stdlib import BoundLogger, get_logger from authentik.root.signals import startup +from authentik.tasks.schedules.lib import ScheduleSpec class ManagedAppConfig(AppConfig): @@ -82,34 +81,17 @@ class ManagedAppConfig(AppConfig): func._authentik_managed_reconcile = ManagedAppConfig.RECONCILE_GLOBAL_CATEGORY return func - def get_tenant_schedules(self) -> list[dict[str, Any]]: + def get_tenant_schedule_specs(self) -> list[ScheduleSpec]: + """Get a list of schedule specs that must exist in each tenant""" return [] - def get_global_schedules(self) -> list[dict[str, Any]]: + def get_global_schedule_specs(self) -> list[ScheduleSpec]: + """Get a list of schedule specs that must exist in the default tenant""" return [] - def _reconcile_schedules(self, schedules: list[dict[str, Any]]): - from authentik.tasks.schedules.models import Schedule - + def _reconcile_schedules(self, schedules: list[ScheduleSpec]): for schedule in schedules: - query = { - "uid": schedule.get("uid", schedule["actor_name"]), - } - defaults = { - **query, - "actor_name": schedule["actor_name"], - "args": pickle.dumps(schedule.get("args", ())), - "kwargs": pickle.dumps(schedule.get("kwargs", {})), - } - create_defaults = { - **defaults, - "crontab": schedule["crontab"], - } - Schedule.objects.update_or_create( - **query, - defaults=defaults, - create_defaults=create_defaults, - ) + schedule.update_or_create() def _reconcile_tenant(self) -> None: """reconcile ourselves for tenanted methods""" @@ -123,7 +105,7 @@ class ManagedAppConfig(AppConfig): for tenant in tenants: with tenant: self._reconcile(self.RECONCILE_TENANT_CATEGORY) - self._reconcile_schedules(self.get_tenant_schedules()) + self._reconcile_schedules(self.get_tenant_schedule_specs()) def _reconcile_global(self) -> None: """ @@ -134,7 +116,7 @@ class ManagedAppConfig(AppConfig): with schema_context(get_public_schema_name()): self._reconcile(self.RECONCILE_GLOBAL_CATEGORY) - self._reconcile_schedules(self.get_global_schedules()) + self._reconcile_schedules(self.get_global_schedule_specs()) class AuthentikBlueprintsConfig(ManagedAppConfig): diff --git a/authentik/crypto/apps.py b/authentik/crypto/apps.py index cdb01b3a1b..1cf33acf82 100644 --- a/authentik/crypto/apps.py +++ b/authentik/crypto/apps.py @@ -4,6 +4,8 @@ from datetime import UTC, datetime from authentik.blueprints.apps import ManagedAppConfig from authentik.lib.generators import generate_id +from authentik.lib.utils.time import fqdn_rand +from authentik.tasks.schedules.lib import ScheduleSpec MANAGED_KEY = "goauthentik.io/crypto/jwt-managed" @@ -67,3 +69,11 @@ class AuthentikCryptoConfig(ManagedAppConfig): "key_data": builder.private_key, }, ) + + def get_tenant_schedule_specs(self) -> list[ScheduleSpec]: + return [ + ScheduleSpec( + actor_name="authentik.crypto.tasks.certificate_discovery", + crontab=f"{fqdn_rand('crypto_certificate_discovery')} * * * *", + ), + ] diff --git a/authentik/crypto/settings.py b/authentik/crypto/settings.py deleted file mode 100644 index 8316e9b84d..0000000000 --- a/authentik/crypto/settings.py +++ /dev/null @@ -1,13 +0,0 @@ -"""Crypto task Settings""" - -from celery.schedules import crontab - -from authentik.lib.utils.time import fqdn_rand - -CELERY_BEAT_SCHEDULE = { - "crypto_certificate_discovery": { - "task": "authentik.crypto.tasks.certificate_discovery", - "schedule": crontab(minute=fqdn_rand("crypto_certificate_discovery"), hour="*"), - "options": {"queue": "authentik_scheduled"}, - }, -} diff --git a/authentik/crypto/tasks.py b/authentik/crypto/tasks.py index bce3f998c7..4d5a0af4ba 100644 --- a/authentik/crypto/tasks.py +++ b/authentik/crypto/tasks.py @@ -7,13 +7,13 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.x509.base import load_pem_x509_certificate from django.utils.translation import gettext_lazy as _ +from dramatiq.actor import actor from structlog.stdlib import get_logger from authentik.crypto.models import CertificateKeyPair -from authentik.events.models import TaskStatus -from authentik.events.system_tasks import SystemTask, prefill_task from authentik.lib.config import CONFIG -from authentik.root.celery import CELERY_APP +from authentik.tasks.middleware import CurrentTask +from authentik.tasks.models import Task, TaskStatus LOGGER = get_logger() @@ -36,10 +36,10 @@ def ensure_certificate_valid(body: str): return body -@CELERY_APP.task(bind=True, base=SystemTask) -@prefill_task -def certificate_discovery(self: SystemTask): +@actor +def certificate_discovery(): """Discover, import and update certificates from the filesystem""" + self: Task = CurrentTask.get_task() certs = {} private_keys = {} discovered = 0 diff --git a/authentik/outposts/models.py b/authentik/outposts/models.py index 4032892fe8..a8a4efbf26 100644 --- a/authentik/outposts/models.py +++ b/authentik/outposts/models.py @@ -37,6 +37,8 @@ from authentik.lib.models import InheritanceForeignKey, SerializerModel from authentik.lib.sentry import SentryIgnoredException from authentik.lib.utils.errors import exception_to_string from authentik.outposts.controllers.k8s.utils import get_namespace +from authentik.tasks.schedules.lib import ScheduleSpec +from authentik.tasks.schedules.models import ScheduledModel OUR_VERSION = parse(__version__) OUTPOST_HELLO_INTERVAL = 10 @@ -113,7 +115,7 @@ class OutpostServiceConnectionState: healthy: bool -class OutpostServiceConnection(models.Model): +class OutpostServiceConnection(ScheduledModel, models.Model): """Connection details for an Outpost Controller, like Docker or Kubernetes""" uuid = models.UUIDField(default=uuid4, editable=False, primary_key=True) @@ -143,11 +145,11 @@ class OutpostServiceConnection(models.Model): @property def state(self) -> OutpostServiceConnectionState: """Get state of service connection""" - from authentik.outposts.tasks import outpost_service_connection_state + from authentik.outposts.tasks import outpost_service_connection_monitor state = cache.get(self.state_key, None) if not state: - outpost_service_connection_state.delay(self.pk) + outpost_service_connection_monitor.send(self.pk) return OutpostServiceConnectionState("", False) return state @@ -158,6 +160,18 @@ class OutpostServiceConnection(models.Model): # since the response doesn't use the correct inheritance return "" + @property + def schedule_specs(self) -> list[ScheduleSpec]: + return [ + ScheduleSpec( + uid=self.pk, + actor_name="authentik.outposts.tasks.outpost_service_connection_monitor", + args=(self.pk,), + crontab="3-59/15 * * * *", + description=_(f"Update cached state of service connection {self.name}"), + ), + ] + class DockerServiceConnection(SerializerModel, OutpostServiceConnection): """Service Connection to a Docker endpoint""" diff --git a/authentik/outposts/settings.py b/authentik/outposts/settings.py index c29f9f64ab..06f903f8ae 100644 --- a/authentik/outposts/settings.py +++ b/authentik/outposts/settings.py @@ -10,11 +10,6 @@ CELERY_BEAT_SCHEDULE = { "schedule": crontab(minute=fqdn_rand("outposts_controller"), hour="*/4"), "options": {"queue": "authentik_scheduled"}, }, - "outposts_service_connection_check": { - "task": "authentik.outposts.tasks.outpost_service_connection_monitor", - "schedule": crontab(minute="3-59/15"), - "options": {"queue": "authentik_scheduled"}, - }, "outpost_token_ensurer": { "task": "authentik.outposts.tasks.outpost_token_ensurer", "schedule": crontab(minute=fqdn_rand("outpost_token_ensurer"), hour="*/8"), diff --git a/authentik/outposts/tasks.py b/authentik/outposts/tasks.py index e09dcf769f..a497bc7b2a 100644 --- a/authentik/outposts/tasks.py +++ b/authentik/outposts/tasks.py @@ -13,6 +13,7 @@ from django.db import DatabaseError, InternalError, ProgrammingError from django.db.models.base import Model from django.utils.text import slugify from docker.constants import DEFAULT_UNIX_SOCKET +from dramatiq.actor import actor from kubernetes.config.incluster_config import SERVICE_TOKEN_FILENAME from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION from structlog.stdlib import get_logger @@ -77,8 +78,8 @@ def controller_for_outpost(outpost: Outpost) -> type[BaseController] | None: return None -@CELERY_APP.task() -def outpost_service_connection_state(connection_pk: Any): +@actor +def outpost_service_connection_monitor(connection_pk: Any): """Update cached state of a service connection""" connection: OutpostServiceConnection = ( OutpostServiceConnection.objects.filter(pk=connection_pk).select_subclasses().first() @@ -102,23 +103,6 @@ def outpost_service_connection_state(connection_pk: Any): cache.set(connection.state_key, state, timeout=None) -@CELERY_APP.task( - bind=True, - base=SystemTask, - throws=(DatabaseError, ProgrammingError, InternalError), -) -@prefill_task -def outpost_service_connection_monitor(self: SystemTask): - """Regularly check the state of Outpost Service Connections""" - connections = OutpostServiceConnection.objects.all() - for connection in connections.iterator(): - outpost_service_connection_state.delay(connection.pk) - self.set_status( - TaskStatus.SUCCESSFUL, - f"Successfully updated {len(connections)} connections.", - ) - - @CELERY_APP.task( throws=(DatabaseError, ProgrammingError, InternalError), ) @@ -198,7 +182,7 @@ def outpost_post_save(model_class: str, model_pk: Any): if isinstance(instance, OutpostServiceConnection): LOGGER.debug("triggering ServiceConnection state update", instance=instance) - outpost_service_connection_state.delay(str(instance.pk)) + outpost_service_connection_monitor.send(str(instance.pk)) for field in instance._meta.get_fields(): # Each field is checked if it has a `related_model` attribute (when ForeginKeys or M2Ms) diff --git a/authentik/tasks/schedules/api.py b/authentik/tasks/schedules/api.py index 614b369226..a986a64dba 100644 --- a/authentik/tasks/schedules/api.py +++ b/authentik/tasks/schedules/api.py @@ -1,8 +1,12 @@ +from dramatiq.actor import Actor +from dramatiq.broker import get_broker +from dramatiq.errors import ActorNotFound from rest_framework.mixins import ( ListModelMixin, RetrieveModelMixin, UpdateModelMixin, ) +from rest_framework.serializers import SerializerMethodField from rest_framework.viewsets import GenericViewSet from authentik.core.api.utils import ModelSerializer @@ -10,6 +14,8 @@ from authentik.tasks.schedules.models import Schedule class ScheduleSerializer(ModelSerializer): + description = SerializerMethodField() + class Meta: model = Schedule fields = [ @@ -18,8 +24,20 @@ class ScheduleSerializer(ModelSerializer): "actor_name", "crontab", "next_run", + "description", ] + def get_description(self, instance: Schedule) -> str | None: + if instance.rel_obj: + for spec in instance.rel_obj.schedule_specs: + if instance.uid == spec.get_uid(): + return spec.description + try: + actor: Actor = get_broker().get_actor(instance.actor_name) + except ActorNotFound: + return None + return actor.fn.__doc__ + class ScheduleViewSet( RetrieveModelMixin, diff --git a/authentik/tasks/schedules/apps.py b/authentik/tasks/schedules/apps.py index 0b806efe42..52a013d6fe 100644 --- a/authentik/tasks/schedules/apps.py +++ b/authentik/tasks/schedules/apps.py @@ -6,3 +6,14 @@ class AuthentikTasksSchedulesConfig(ManagedAppConfig): label = "authentik_tasks_schedules" verbose_name = "authentik Tasks Schedules" default = True + + def get_tenant_schedule_specs(self): + from authentik.tasks.schedules.models import ScheduledModel + + schedules = [] + for Model in ScheduledModel.__subclasses__(): + for obj in Model.objects.all(): + for spec in obj.schedule_specs: + spec.rel_obj = obj + schedules.append(spec) + return schedules diff --git a/authentik/tasks/schedules/lib.py b/authentik/tasks/schedules/lib.py new file mode 100644 index 0000000000..6956ea3122 --- /dev/null +++ b/authentik/tasks/schedules/lib.py @@ -0,0 +1,53 @@ +import pickle # nosec +from collections.abc import Iterable +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class ScheduleSpec: + actor_name: str + crontab: str + uid: str | None = None + + args: Iterable[Any] = field(default_factory=tuple) + kwargs: dict[str, Any] = field(default_factory=dict) + + rel_obj: Any | None = None + + description: Any | str | None = None + + def get_uid(self) -> str: + if self.uid is not None: + return f"{self.actor_name}:{self.uid}" + return self.actor_name + + def get_args(self) -> bytes: + return pickle.dumps(self.args) + + def get_kwargs(self) -> bytes: + return pickle.dumps(self.kwargs) + + def update_or_create(self): + from authentik.tasks.schedules.models import Schedule + + query = { + "uid": self.get_uid(), + } + defaults = { + **query, + "actor_name": self.actor_name, + "args": self.get_args(), + "kwargs": self.get_kwargs(), + } + create_defaults = { + **defaults, + "crontab": self.crontab, + "rel_obj": self.rel_obj, + } + + Schedule.objects.update_or_create( + **query, + defaults=defaults, + create_defaults=create_defaults, + ) diff --git a/authentik/tasks/schedules/migrations/0002_schedule_rel_obj_content_type_schedule_rel_obj_id_and_more.py b/authentik/tasks/schedules/migrations/0002_schedule_rel_obj_content_type_schedule_rel_obj_id_and_more.py new file mode 100644 index 0000000000..b900e347d3 --- /dev/null +++ b/authentik/tasks/schedules/migrations/0002_schedule_rel_obj_content_type_schedule_rel_obj_id_and_more.py @@ -0,0 +1,35 @@ +# Generated by Django 5.0.13 on 2025-03-27 16:15 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("authentik_tasks_schedules", "0001_initial"), + ("contenttypes", "0002_remove_content_type_name"), + ] + + operations = [ + migrations.AddField( + model_name="schedule", + name="rel_obj_content_type", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.CASCADE, + to="contenttypes.contenttype", + ), + ), + migrations.AddField( + model_name="schedule", + name="rel_obj_id", + field=models.TextField(null=True), + ), + migrations.AddIndex( + model_name="schedule", + index=models.Index( + fields=["rel_obj_content_type", "rel_obj_id"], name="authentik_t_rel_obj_575af2_idx" + ), + ), + ] diff --git a/authentik/tasks/schedules/models.py b/authentik/tasks/schedules/models.py index 67b7653786..5c386253ec 100644 --- a/authentik/tasks/schedules/models.py +++ b/authentik/tasks/schedules/models.py @@ -1,12 +1,15 @@ from uuid import uuid4 from cron_converter import Cron +from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation +from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError from django.db import models from django.utils.timezone import datetime from django.utils.translation import gettext_lazy as _ from authentik.lib.models import SerializerModel +from authentik.tasks.schedules.lib import ScheduleSpec def validate_crontab(value): @@ -27,6 +30,10 @@ class Schedule(SerializerModel): args = models.BinaryField(editable=False, help_text=_("Args to send to the actor")) kwargs = models.BinaryField(editable=False, help_text=_("Kwargs to send to the actor")) + rel_obj_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True) + rel_obj_id = models.TextField(null=True) + rel_obj = GenericForeignKey("rel_obj_content_type", "rel_obj_id") + crontab = models.TextField(validators=[validate_crontab], help_text=_("When to schedule tasks")) next_run = models.DateTimeField(auto_now_add=True, editable=False) @@ -38,6 +45,7 @@ class Schedule(SerializerModel): "change", "view", ) + indexes = (models.Index(fields=("rel_obj_content_type", "rel_obj_id")),) def __str__(self): return self.uid @@ -50,3 +58,16 @@ class Schedule(SerializerModel): def calculate_next_run(self, next_run: datetime) -> datetime: return Cron(self.crontab).schedule(next_run).next() + + +class ScheduledModel(models.Model): + schedules = GenericRelation( + Schedule, content_type_field="rel_obj_content_type", object_id_field="rel_obj_id" + ) + + class Meta: + abstract = True + + @property + def schedule_specs(self) -> list[ScheduleSpec]: + raise NotImplementedError diff --git a/authentik/tasks/schedules/signals.py b/authentik/tasks/schedules/signals.py new file mode 100644 index 0000000000..d97cb21731 --- /dev/null +++ b/authentik/tasks/schedules/signals.py @@ -0,0 +1,13 @@ +from django.db.models.signals import post_save +from django.dispatch import receiver + +from authentik.tasks.schedules.models import ScheduledModel + + +@receiver(post_save) +def post_save_schedule_mixin(sender, instance: ScheduledModel, **_): + if not isinstance(instance, ScheduledModel): + return + for spec in instance.schedule_specs: + spec.rel_obj = instance + spec.update_or_create()