diff --git a/authentik/admin/apps.py b/authentik/admin/apps.py index ab215937f7..82de9d5d30 100644 --- a/authentik/admin/apps.py +++ b/authentik/admin/apps.py @@ -39,7 +39,7 @@ class AuthentikAdminConfig(ManagedAppConfig): return [ ScheduleSpec( - actor_name=update_latest_version.actor_name, + actor=update_latest_version, crontab=f"{fqdn_rand('admin_latest_version')} * * * *", ), ] diff --git a/authentik/blueprints/apps.py b/authentik/blueprints/apps.py index 6efc65a69d..873e28a729 100644 --- a/authentik/blueprints/apps.py +++ b/authentik/blueprints/apps.py @@ -150,14 +150,16 @@ class AuthentikBlueprintsConfig(ManagedAppConfig): @property def tenant_schedule_specs(self) -> list[ScheduleSpec]: + from authentik.blueprints.v1.tasks import blueprints_discovery, clear_failed_blueprints + return [ ScheduleSpec( - actor_name="authentik.blueprints.v1.tasks.blueprints_discovery", + actor=blueprints_discovery, crontab=f"{fqdn_rand('blueprints_v1_discover')} * * * *", run_on_startup=True, ), ScheduleSpec( - actor_name="authentik.blueprints.v1.tasks.clear_failed_blueprints", + actor=clear_failed_blueprints, crontab=f"{fqdn_rand('blueprints_v1_cleanup')} * * * *", run_on_startup=True, ), diff --git a/authentik/blueprints/v1/tasks.py b/authentik/blueprints/v1/tasks.py index e75ff781c4..da2df8cd65 100644 --- a/authentik/blueprints/v1/tasks.py +++ b/authentik/blueprints/v1/tasks.py @@ -90,13 +90,7 @@ class BlueprintEventHandler(FileSystemEventHandler): LOGGER.debug("new blueprint file created, starting discovery") for tenant in Tenant.objects.filter(ready=True): with tenant: - schedule = Schedule.objects.filter( - actor_name=blueprints_discovery.actor_name, - paused=False, - ).first() - if schedule: - schedule.send() - # Schedule was paused or doesn't exist, no dispatch + Schedule.dispatch_by_actor(blueprints_discovery) def on_modified(self, event: FileSystemEvent): """Process file modification""" diff --git a/authentik/core/apps.py b/authentik/core/apps.py index ac4ba951f3..5ad176cf54 100644 --- a/authentik/core/apps.py +++ b/authentik/core/apps.py @@ -28,13 +28,15 @@ class AuthentikCoreConfig(ManagedAppConfig): @property def tenant_schedule_specs(self) -> list[ScheduleSpec]: + from authentik.core.tasks import clean_expired_models, clean_temporary_users + return [ ScheduleSpec( - actor_name="authentik.core.tasks.clean_expired_models", + actor=clean_expired_models, crontab="2-59/5 * * * *", ), ScheduleSpec( - actor_name="authentik.core.tasks.clean_temporary_users", + actor=clean_temporary_users, crontab="9-59/5 * * * *", ), ] diff --git a/authentik/crypto/apps.py b/authentik/crypto/apps.py index 832c7f1a22..915fc2d889 100644 --- a/authentik/crypto/apps.py +++ b/authentik/crypto/apps.py @@ -72,9 +72,11 @@ class AuthentikCryptoConfig(ManagedAppConfig): @property def tenant_schedule_specs(self) -> list[ScheduleSpec]: + from authentik.crypto.tasks import certificate_discovery + return [ ScheduleSpec( - actor_name="authentik.crypto.tasks.certificate_discovery", + actor=certificate_discovery, crontab=f"{fqdn_rand('crypto_certificate_discovery')} * * * *", ), ] diff --git a/authentik/enterprise/apps.py b/authentik/enterprise/apps.py index d1d9963670..fd6d392829 100644 --- a/authentik/enterprise/apps.py +++ b/authentik/enterprise/apps.py @@ -35,7 +35,7 @@ class AuthentikEnterpriseConfig(EnterpriseConfig): return [ ScheduleSpec( - actor_name=enterprise_update_usage.actor_name, + actor=enterprise_update_usage, crontab=f"{fqdn_rand('enterprise_update_usage')} */2 * * *", ), ] diff --git a/authentik/enterprise/policies/unique_password/apps.py b/authentik/enterprise/policies/unique_password/apps.py index 8834fe0af9..6421129eb7 100644 --- a/authentik/enterprise/policies/unique_password/apps.py +++ b/authentik/enterprise/policies/unique_password/apps.py @@ -20,11 +20,11 @@ class AuthentikEnterprisePoliciesUniquePasswordConfig(EnterpriseConfig): return [ ScheduleSpec( - actor_name=trim_password_histories.actor_name, + actor=trim_password_histories, crontab=f"{fqdn_rand('policies_unique_password_trim')} */12 * * *", ), ScheduleSpec( - actor_name=check_and_purge_password_history.actor_name, + actor=check_and_purge_password_history, crontab=f"{fqdn_rand('policies_unique_password_purge')} */24 * * *", ), ] diff --git a/authentik/enterprise/providers/google_workspace/models.py b/authentik/enterprise/providers/google_workspace/models.py index 306d5f5e82..d22d50bc9a 100644 --- a/authentik/enterprise/providers/google_workspace/models.py +++ b/authentik/enterprise/providers/google_workspace/models.py @@ -7,6 +7,7 @@ from django.db import models from django.db.models import QuerySet from django.templatetags.static import static from django.utils.translation import gettext_lazy as _ +from dramatiq.actor import Actor from google.oauth2.service_account import Credentials from rest_framework.serializers import Serializer @@ -112,8 +113,10 @@ class GoogleWorkspaceProvider(OutgoingSyncProvider, ScheduledModel, BackchannelP ) @property - def sync_task(self) -> str: - return "authentik.enterprise.providers.google_workspace.tasks.google_workspace_sync" + def sync_actor(self) -> Actor: + from authentik.enterprise.providers.google_workspace.tasks import google_workspace_sync + + return google_workspace_sync def client_for_model( self, diff --git a/authentik/enterprise/providers/microsoft_entra/models.py b/authentik/enterprise/providers/microsoft_entra/models.py index 48c42b876e..a7d549fab8 100644 --- a/authentik/enterprise/providers/microsoft_entra/models.py +++ b/authentik/enterprise/providers/microsoft_entra/models.py @@ -8,6 +8,7 @@ from django.db import models from django.db.models import QuerySet from django.templatetags.static import static from django.utils.translation import gettext_lazy as _ +from dramatiq.actor import Actor from rest_framework.serializers import Serializer from authentik.core.models import ( @@ -101,8 +102,10 @@ class MicrosoftEntraProvider(OutgoingSyncProvider, ScheduledModel, BackchannelPr ) @property - def sync_task(self) -> str: - return "authentik.enterprise.providers.microsoft_entra.tasks.microsoft_entra_sync" + def sync_actor(self) -> Actor: + from authentik.enterprise.providers.microsoft_entra.tasks import microsoft_entra_sync + + return microsoft_entra_sync def client_for_model( self, diff --git a/authentik/lib/sync/outgoing/models.py b/authentik/lib/sync/outgoing/models.py index f6c8134d39..1a09fda329 100644 --- a/authentik/lib/sync/outgoing/models.py +++ b/authentik/lib/sync/outgoing/models.py @@ -1,5 +1,6 @@ from typing import Any, Self +from dramatiq.actor import Actor import pglock from django.core.paginator import Paginator from django.db import connection, models @@ -60,14 +61,14 @@ class OutgoingSyncProvider(ScheduledModel, Model): ) @property - def sync_task(self) -> str: + def sync_actor(self) -> Actor: raise NotImplementedError @property def schedule_specs(self) -> list[ScheduleSpec]: return [ ScheduleSpec( - actor_name=self.sync_task, + actor=self.sync_actor, uid=self.pk, args=(self.pk,), options={ diff --git a/authentik/outposts/apps.py b/authentik/outposts/apps.py index 07d0ac8696..0f9d99bc2c 100644 --- a/authentik/outposts/apps.py +++ b/authentik/outposts/apps.py @@ -69,7 +69,7 @@ class AuthentikOutpostConfig(ManagedAppConfig): return [ ScheduleSpec( - actor_name=outpost_token_ensurer.actor_name, + actor=outpost_token_ensurer, crontab=f"{fqdn_rand('outpost_token_ensurer')} */8 * * *", ), ] @@ -80,7 +80,7 @@ class AuthentikOutpostConfig(ManagedAppConfig): return [ ScheduleSpec( - actor_name=outpost_connection_discovery.actor_name, + actor=outpost_connection_discovery, crontab=f"{fqdn_rand('outpost_connection_discovery')} */8 * * *", run_on_startup=True, ), diff --git a/authentik/outposts/models.py b/authentik/outposts/models.py index 871de3cff6..a0b6a513dc 100644 --- a/authentik/outposts/models.py +++ b/authentik/outposts/models.py @@ -169,7 +169,7 @@ class OutpostServiceConnection(ScheduledModel, models.Model): return [ ScheduleSpec( - actor_name=outpost_service_connection_monitor.actor_name, + actor=outpost_service_connection_monitor, uid=self.pk, args=(self.pk,), crontab="3-59/15 * * * *", @@ -321,7 +321,7 @@ class Outpost(ScheduledModel, SerializerModel, ManagedModel): return [ ScheduleSpec( - actor_name=outpost_controller.actor_name, + actor=outpost_controller, uid=self.pk, args=(self.pk,), kwargs={"action": "up", "from_cache": False}, diff --git a/authentik/providers/scim/models.py b/authentik/providers/scim/models.py index 3088dfc7bc..84e4bec300 100644 --- a/authentik/providers/scim/models.py +++ b/authentik/providers/scim/models.py @@ -7,6 +7,7 @@ from django.db import models from django.db.models import QuerySet from django.templatetags.static import static from django.utils.translation import gettext_lazy as _ +from dramatiq.actor import Actor from rest_framework.serializers import Serializer from authentik.core.models import BackchannelProvider, Group, PropertyMapping, User, UserTypes @@ -101,8 +102,10 @@ class SCIMProvider(OutgoingSyncProvider, ScheduledModel, BackchannelProvider): return static("authentik/sources/scim.png") @property - def sync_task(self) -> str: - return "authentik.providers.scim.tasks.scim_sync" + def sync_actor(self) -> Actor: + from authentik.providers.scim.tasks import scim_sync + + return scim_sync def client_for_model( self, model: type[User | Group | SCIMProviderUser | SCIMProviderGroup] diff --git a/authentik/sources/kerberos/models.py b/authentik/sources/kerberos/models.py index c415e37496..c5ebab1492 100644 --- a/authentik/sources/kerberos/models.py +++ b/authentik/sources/kerberos/models.py @@ -140,16 +140,18 @@ class KerberosSource(ScheduledModel, Source): @property def schedule_specs(self) -> list[ScheduleSpec]: + from authentik.sources.kerberos.tasks import kerberos_sync, kerberos_connectivity_check + return [ ScheduleSpec( - actor_name="authentik.sources.kerberos.tasks.kerberos_sync", + actor=kerberos_sync, uid=self.pk, args=(self.pk,), crontab=f"{fqdn_rand('kerberos_sync/' + str(self.pk))} */2 * * *", description=_(f"Sync Kerberos source '{self.name}'"), ), ScheduleSpec( - actor_name="authentik.sources.kerberos.tasks.kerberos_connectivity_check", + actor=kerberos_connectivity_check, uid=self.pk, args=(self.pk,), crontab=f"{fqdn_rand('kerberos_connectivity_check/' + str(self.pk))} * * * *", diff --git a/authentik/sources/ldap/models.py b/authentik/sources/ldap/models.py index f051cb9f3c..c63a14fff6 100644 --- a/authentik/sources/ldap/models.py +++ b/authentik/sources/ldap/models.py @@ -164,16 +164,18 @@ class LDAPSource(ScheduledModel, Source): @property def schedule_specs(self) -> list[ScheduleSpec]: + from authentik.sources.ldap.tasks import ldap_sync, ldap_connectivity_check + return [ ScheduleSpec( - actor_name="authentik.sources.ldap.tasks.ldap_sync", + actor=ldap_sync, uid=self.pk, args=(self.pk,), crontab=f"{fqdn_rand('ldap_sync/' + str(self.pk))} */2 * * *", description=_(f"Sync LDAP source '{self.name}'"), ), ScheduleSpec( - actor_name="authentik.sources.ldap.tasks.ldap_connectivity_check", + actor=ldap_connectivity_check, uid=self.pk, args=(self.pk,), crontab=f"{fqdn_rand('ldap_connectivity_check/' + str(self.pk))} * * * *", diff --git a/authentik/sources/oauth/apps.py b/authentik/sources/oauth/apps.py index f91985c1bb..096958ac6b 100644 --- a/authentik/sources/oauth/apps.py +++ b/authentik/sources/oauth/apps.py @@ -45,9 +45,11 @@ class AuthentikSourceOAuthConfig(ManagedAppConfig): @property def tenant_schedule_specs(self) -> list[ScheduleSpec]: + from authentik.sources.oauth.tasks import update_well_known_jwks + return [ ScheduleSpec( - actor_name="authentik.sources.oauth.tasks.update_well_known_jwks", + actor=update_well_known_jwks, crontab=f"{fqdn_rand('update_well_known_jwks')} */3 * * *", ), ] diff --git a/authentik/sources/plex/models.py b/authentik/sources/plex/models.py index 7e1d5d78ff..db5c7ae12f 100644 --- a/authentik/sources/plex/models.py +++ b/authentik/sources/plex/models.py @@ -77,9 +77,11 @@ class PlexSource(ScheduledModel, Source): @property def schedule_specs(self) -> list[ScheduleSpec]: + from authentik.sources.plex.tasks import check_plex_token + return [ ScheduleSpec( - actor_name="authentik.sources.plex.tasks.check_plex_token", + actor=check_plex_token, uid=self.pk, args=(self.pk,), crontab=f"{fqdn_rand(self.pk)} */3 * * *", diff --git a/authentik/stages/authenticator_webauthn/apps.py b/authentik/stages/authenticator_webauthn/apps.py index 641b22c751..402327c826 100644 --- a/authentik/stages/authenticator_webauthn/apps.py +++ b/authentik/stages/authenticator_webauthn/apps.py @@ -15,9 +15,11 @@ class AuthentikStageAuthenticatorWebAuthnConfig(ManagedAppConfig): @property def tenant_schedule_specs(self) -> list[ScheduleSpec]: + from authentik.stages.authenticator_webauthn.tasks import webauthn_mds_import + return [ ScheduleSpec( - actor_name="authentik.stages.authenticator_webauthn.tasks.webauthn_mds_import", + actor=webauthn_mds_import, crontab=f"{fqdn_rand('webauthn_mds_import')} {fqdn_rand('webauthn_mds_import', 24)} * * {fqdn_rand('webauthn_mds_import', 7)}", # noqa: E501 ), ] diff --git a/authentik/tasks/schedules/lib.py b/authentik/tasks/schedules/lib.py index 83bf4c9520..4881602aa7 100644 --- a/authentik/tasks/schedules/lib.py +++ b/authentik/tasks/schedules/lib.py @@ -3,13 +3,15 @@ from collections.abc import Iterable from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any +from dramatiq.actor import Actor + if TYPE_CHECKING: from authentik.tasks.schedules.models import Schedule @dataclass class ScheduleSpec: - actor_name: str + actor: Actor crontab: str uid: str | None = None @@ -27,8 +29,8 @@ class ScheduleSpec: def get_uid(self) -> str: if self.uid is not None: - return f"{self.actor_name}:{self.uid}" - return self.actor_name + return f"{self.actor.actor_name}:{self.uid}" + return self.actor.actor_name def get_args(self) -> bytes: return pickle.dumps(self.args) @@ -47,7 +49,7 @@ class ScheduleSpec: } defaults = { **query, - "actor_name": self.actor_name, + "actor_name": self.actor.actor_name, "args": self.get_args(), "kwargs": self.get_kwargs(), "options": self.get_options(),