From e6614a0705756b93922b4a70df01ea385ebca0a9 Mon Sep 17 00:00:00 2001 From: Marc 'risson' Schmitt Date: Thu, 19 Jun 2025 18:21:59 +0200 Subject: [PATCH] move schedules to its package Signed-off-by: Marc 'risson' Schmitt --- authentik/root/settings.py | 5 +- authentik/tasks/migrations/0001_initial.py | 6 +- ...otify_enqueueing_task_notify_enqueueing.py | 36 ------- authentik/tasks/models.py | 13 ++- .../schedules/migrations/0001_initial.py | 38 +++++-- .../0002_schedule_set_next_run_on_paused.py | 30 ------ authentik/tasks/schedules/models.py | 74 +------------- authentik/tasks/schedules/scheduler.py | 49 ++------- .../django_dramatiq_postgres/models.py | 99 ++++++++++++++++++- .../django_dramatiq_postgres/scheduler.py | 50 ++++++++++ 10 files changed, 205 insertions(+), 195 deletions(-) delete mode 100644 authentik/tasks/migrations/0002_remove_task_notify_enqueueing_task_notify_enqueueing.py delete mode 100644 authentik/tasks/schedules/migrations/0002_schedule_set_next_run_on_paused.py create mode 100644 packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py diff --git a/authentik/root/settings.py b/authentik/root/settings.py index 8cc93187f1..9f3b42fc77 100644 --- a/authentik/root/settings.py +++ b/authentik/root/settings.py @@ -123,6 +123,7 @@ TENANT_APPS = [ "authentik.stages.user_login", "authentik.stages.user_logout", "authentik.stages.user_write", + "authentik.tasks.schedules", "authentik.brands", "authentik.blueprints", "guardian", @@ -526,9 +527,5 @@ for _app in set(SHARED_APPS + TENANT_APPS): _update_settings(f"{_app}.settings") _update_settings("data.user_settings") -# Import schedules after other apps since it relies on tasks and ScheduledModel being -# registered for its startup. -TENANT_APPS.append("authentik.tasks.schedules") - SHARED_APPS = list(OrderedDict.fromkeys(SHARED_APPS + TENANT_APPS)) INSTALLED_APPS = list(OrderedDict.fromkeys(SHARED_APPS + TENANT_APPS)) diff --git a/authentik/tasks/migrations/0001_initial.py b/authentik/tasks/migrations/0001_initial.py index 825b3e2448..cc4fe0b425 100644 --- a/authentik/tasks/migrations/0001_initial.py +++ b/authentik/tasks/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.11 on 2025-06-16 13:59 +# Generated by Django 5.1.11 on 2025-06-19 16:21 import django.db.models.deletion import django.utils.timezone @@ -92,8 +92,8 @@ class Migration(migrations.Migration): sql=pgtrigger.compiler.UpsertTriggerSql( condition="WHEN (NEW.\"state\" = 'queued')", constraint="CONSTRAINT", - func="\n PERFORM pg_notify(\n 'dramatiq.tasks.tasks.' || NEW.queue_name || '.enqueue',\n NEW.message_id::text\n );\n RETURN NEW;\n ", - hash="e53355a30f439252c7250e5296947f59de6ca57e", + func="\n PERFORM pg_notify(\n 'authentik.tasks.' || NEW.queue_name || '.enqueue',\n NEW.message_id::text\n );\n RETURN NEW;\n ", + hash="0a9ee3db61e4d63fd72b31322fbb821706dd8a78", operation="INSERT OR UPDATE", pgid="pgtrigger_notify_enqueueing_0bc94", table="authentik_tasks_task", diff --git a/authentik/tasks/migrations/0002_remove_task_notify_enqueueing_task_notify_enqueueing.py b/authentik/tasks/migrations/0002_remove_task_notify_enqueueing_task_notify_enqueueing.py deleted file mode 100644 index e452e466d4..0000000000 --- a/authentik/tasks/migrations/0002_remove_task_notify_enqueueing_task_notify_enqueueing.py +++ /dev/null @@ -1,36 +0,0 @@ -# Generated by Django 5.1.11 on 2025-06-18 15:02 - -import pgtrigger.compiler -import pgtrigger.migrations -from django.db import migrations - - -class Migration(migrations.Migration): - - dependencies = [ - ("authentik_tasks", "0001_initial"), - ] - - operations = [ - pgtrigger.migrations.RemoveTrigger( - model_name="task", - name="notify_enqueueing", - ), - pgtrigger.migrations.AddTrigger( - model_name="task", - trigger=pgtrigger.compiler.Trigger( - name="notify_enqueueing", - sql=pgtrigger.compiler.UpsertTriggerSql( - condition="WHEN (NEW.\"state\" = 'queued')", - constraint="CONSTRAINT", - func="\n PERFORM pg_notify(\n 'authentik.tasks.' || NEW.queue_name || '.enqueue',\n NEW.message_id::text\n );\n RETURN NEW;\n ", - hash="0a9ee3db61e4d63fd72b31322fbb821706dd8a78", - operation="INSERT OR UPDATE", - pgid="pgtrigger_notify_enqueueing_0bc94", - table="authentik_tasks_task", - timing="DEFERRABLE INITIALLY DEFERRED", - when="AFTER", - ), - ), - ), - ] diff --git a/authentik/tasks/models.py b/authentik/tasks/models.py index f735d7ff40..1bb12f5076 100644 --- a/authentik/tasks/models.py +++ b/authentik/tasks/models.py @@ -2,7 +2,7 @@ from enum import StrEnum, auto from uuid import UUID import pgtrigger -from django.contrib.contenttypes.fields import ContentType, GenericForeignKey +from django.contrib.contenttypes.fields import ContentType, GenericForeignKey, GenericRelation from django.db import models from django.utils.translation import gettext_lazy as _ from django_dramatiq_postgres.models import TaskBase @@ -113,3 +113,14 @@ class Task(SerializerModel, TaskBase): def error(self, message: str | Exception, save: bool = False, **attributes): self.log("error", message, save=save, **attributes) + + +class TasksModel(models.Model): + tasks = GenericRelation( + Task, + content_type_field="rel_obj_content_type", + object_id_field="rel_obj_id", + ) + + class Meta: + abstract = True diff --git a/authentik/tasks/schedules/migrations/0001_initial.py b/authentik/tasks/schedules/migrations/0001_initial.py index 9375216dd0..c5fc306390 100644 --- a/authentik/tasks/schedules/migrations/0001_initial.py +++ b/authentik/tasks/schedules/migrations/0001_initial.py @@ -1,7 +1,9 @@ -# Generated by Django 5.1.9 on 2025-06-05 16:39 +# Generated by Django 5.1.11 on 2025-06-19 16:21 -import authentik.tasks.schedules.models import django.db.models.deletion +import django_dramatiq_postgres.models +import pgtrigger.compiler +import pgtrigger.migrations import uuid from django.db import migrations, models @@ -24,12 +26,6 @@ class Migration(migrations.Migration): default=uuid.uuid4, editable=False, primary_key=True, serialize=False ), ), - ( - "uid", - models.TextField( - editable=False, help_text="Unique schedule identifier", unique=True - ), - ), ( "actor_name", models.TextField(editable=False, help_text="Dramatiq actor to call"), @@ -37,16 +33,22 @@ class Migration(migrations.Migration): ("args", models.BinaryField(help_text="Args to send to the actor")), ("kwargs", models.BinaryField(help_text="Kwargs to send to the actor")), ("options", models.BinaryField(help_text="Options to send to the actor")), - ("rel_obj_id", models.TextField(null=True)), ( "crontab", models.TextField( help_text="When to schedule tasks", - validators=[authentik.tasks.schedules.models.validate_crontab], + validators=[django_dramatiq_postgres.models.validate_crontab], ), ), ("paused", models.BooleanField(default=False, help_text="Pause this schedule")), ("next_run", models.DateTimeField(auto_now_add=True)), + ( + "uid", + models.TextField( + editable=False, help_text="Unique schedule identifier", unique=True + ), + ), + ("rel_obj_id", models.TextField(null=True)), ( "rel_obj_content_type", models.ForeignKey( @@ -60,6 +62,7 @@ class Migration(migrations.Migration): "verbose_name": "Schedule", "verbose_name_plural": "Schedules", "permissions": [("send_schedule", "Manually trigger a schedule")], + "abstract": False, "default_permissions": ("change", "view"), "indexes": [ models.Index( @@ -69,4 +72,19 @@ class Migration(migrations.Migration): ], }, ), + pgtrigger.migrations.AddTrigger( + model_name="schedule", + trigger=pgtrigger.compiler.Trigger( + name="set_next_run_on_paused", + sql=pgtrigger.compiler.UpsertTriggerSql( + condition='WHEN (NEW."paused" AND NOT OLD."paused")', + func="\n NEW.next_run = to_timestamp(0);\n RETURN NEW;\n ", + hash="7fe580a86de70723522cfcbac712785984000f92", + operation="UPDATE", + pgid="pgtrigger_set_next_run_on_paused_95c6d", + table="authentik_tasks_schedules_schedule", + when="BEFORE", + ), + ), + ), ] diff --git a/authentik/tasks/schedules/migrations/0002_schedule_set_next_run_on_paused.py b/authentik/tasks/schedules/migrations/0002_schedule_set_next_run_on_paused.py deleted file mode 100644 index e8ab6dcee2..0000000000 --- a/authentik/tasks/schedules/migrations/0002_schedule_set_next_run_on_paused.py +++ /dev/null @@ -1,30 +0,0 @@ -# Generated by Django 5.1.10 on 2025-06-11 12:57 - -import pgtrigger.compiler -import pgtrigger.migrations -from django.db import migrations - - -class Migration(migrations.Migration): - - dependencies = [ - ("authentik_tasks_schedules", "0001_initial"), - ] - - operations = [ - pgtrigger.migrations.AddTrigger( - model_name="schedule", - trigger=pgtrigger.compiler.Trigger( - name="set_next_run_on_paused", - sql=pgtrigger.compiler.UpsertTriggerSql( - condition='WHEN (NEW."paused" AND NOT OLD."paused")', - func="\n NEW.next_run = to_timestamp(0);\n RETURN NEW;\n ", - hash="7fe580a86de70723522cfcbac712785984000f92", - operation="UPDATE", - pgid="pgtrigger_set_next_run_on_paused_95c6d", - table="authentik_tasks_schedules_schedule", - when="BEFORE", - ), - ), - ), - ] diff --git a/authentik/tasks/schedules/models.py b/authentik/tasks/schedules/models.py index e2d4f16d3d..740efc9bd0 100644 --- a/authentik/tasks/schedules/models.py +++ b/authentik/tasks/schedules/models.py @@ -1,54 +1,22 @@ -import pickle # nosec -from uuid import uuid4 - -import pgtrigger -from cron_converter import Cron from django.apps import apps from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation from django.contrib.contenttypes.models import ContentType -from django.core.exceptions import ValidationError from django.db import models -from django.utils.timezone import datetime from django.utils.translation import gettext_lazy as _ -from dramatiq.actor import Actor -from dramatiq.broker import Broker, get_broker -from dramatiq.message import Message +from django_dramatiq_postgres.models import ScheduleBase from authentik.lib.models import SerializerModel from authentik.tasks.schedules.lib import ScheduleSpec -def validate_crontab(value): - try: - Cron(value) - except ValueError as exc: - raise ValidationError( - _("%(value)s is not a valid crontab"), - params={"value": value}, - ) from exc - - -class Schedule(SerializerModel): - id = models.UUIDField(primary_key=True, default=uuid4, editable=False) +class Schedule(SerializerModel, ScheduleBase): uid = models.TextField(unique=True, editable=False, help_text=_("Unique schedule identifier")) - actor_name = models.TextField(editable=False, help_text=_("Dramatiq actor to call")) - args = models.BinaryField(editable=False, help_text=_("Args to send to the actor")) - kwargs = models.BinaryField(editable=False, help_text=_("Kwargs to send to the actor")) - options = models.BinaryField(editable=False, help_text=_("Options to send to the actor")) - rel_obj_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True) rel_obj_id = models.TextField(null=True) rel_obj = GenericForeignKey("rel_obj_content_type", "rel_obj_id") - crontab = models.TextField(validators=[validate_crontab], help_text=_("When to schedule tasks")) - paused = models.BooleanField(default=False, help_text=_("Pause this schedule")) - - next_run = models.DateTimeField(auto_now_add=True, editable=False) - - class Meta: - verbose_name = _("Schedule") - verbose_name_plural = _("Schedules") + class Meta(ScheduleBase.Meta): default_permissions = ( "change", "view", @@ -57,29 +25,9 @@ class Schedule(SerializerModel): ("send_schedule", _("Manually trigger a schedule")), ] indexes = (models.Index(fields=("rel_obj_content_type", "rel_obj_id")),) - triggers = ( - pgtrigger.Trigger( - name="set_next_run_on_paused", - operation=pgtrigger.Update, - when=pgtrigger.Before, - condition=pgtrigger.Q(new__paused=True) & pgtrigger.Q(old__paused=False), - func=""" - NEW.next_run = to_timestamp(0); - RETURN NEW; - """, - ), - ) def __str__(self): - return self.uid - - @classmethod - def dispatch_by_actor(cls, actor: Actor): - """Dispatch a schedule by looking up its actor. - Only available for schedules without custom arguments.""" - schedule = cls.objects.filter(actor_name=actor.actor_name, paused=False).first() - if schedule: - schedule.send() + return f"Schedule {self.actor_name}:{self.uid}" @property def serializer(self): @@ -87,20 +35,6 @@ class Schedule(SerializerModel): return ScheduleSerializer - def send(self, broker: Broker | None = None) -> Message: - broker = broker or get_broker() - actor: Actor = broker.get_actor(self.actor_name) - return actor.send_with_options( - args=pickle.loads(self.args), # nosec - kwargs=pickle.loads(self.kwargs), # nosec - rel_obj=self, - **pickle.loads(self.options), # nosec - ) - - # TODO: actually do loop here - def calculate_next_run(self, next_run: datetime) -> datetime: - return Cron(self.crontab).schedule(next_run).next() - class ScheduledModel(models.Model): schedules = GenericRelation( diff --git a/authentik/tasks/schedules/scheduler.py b/authentik/tasks/schedules/scheduler.py index dc75c117e1..ee135f4789 100644 --- a/authentik/tasks/schedules/scheduler.py +++ b/authentik/tasks/schedules/scheduler.py @@ -1,53 +1,24 @@ import pglock -from django.db import router, transaction -from django.utils.timezone import now, timedelta -from dramatiq.broker import Broker +from django_dramatiq_postgres.scheduler import Scheduler as SchedulerBase from structlog.stdlib import get_logger -from authentik.tasks.schedules.models import Schedule from authentik.tenants.models import Tenant LOGGER = get_logger() -class Scheduler: - def __init__(self, broker: Broker): - self.broker = broker - - def process_schedule(self, schedule: Schedule): - next_run = schedule.next_run - while True: - next_run = schedule.calculate_next_run(next_run) - if next_run > now(): - break - # Force to calculate the one after - next_run += timedelta(minutes=2) - schedule.next_run = next_run - - schedule.send(self.broker) - - schedule.save() - - def run_per_tenant(self, tenant: Tenant): - with pglock.advisory( - lock_id=f"goauthentik.io/{tenant.schema_name}/tasks/scheduler", +class Scheduler(SchedulerBase): + def _lock(self, tenant: Tenant) -> pglock.advisory: + return pglock.advisory( + lock_id=f"authentik.scheduler/{tenant.schema_name}", side_effect=pglock.Return, timeout=0, - ) as lock_acquired: - if not lock_acquired: - LOGGER.debug( - "Failed to acquire lock for tasks scheduling, skipping", - tenant=tenant.schema_name, - ) - return - with transaction.atomic(using=router.db_for_write(Schedule)): - for schedule in Schedule.objects.select_for_update().filter( - next_run__lt=now(), - paused=False, - ): - self.process_schedule(schedule) + ) def run(self): for tenant in Tenant.objects.filter(ready=True): with tenant: - self.run_per_tenant(tenant) + with self._lock(tenant) as lock_acquired: + if not lock_acquired: + return + self._run() diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/models.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/models.py index 5cf09dfac1..a25f3d1f41 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/models.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/models.py @@ -1,10 +1,18 @@ +import pickle # nosec from enum import StrEnum, auto from uuid import uuid4 import pgtrigger +from cron_converter import Cron +from django.contrib.contenttypes.fields import GenericForeignKey +from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError from django.db import models -from django.utils import timezone +from django.utils.timezone import datetime, now, timedelta from django.utils.translation import gettext_lazy as _ +from dramatiq.actor import Actor +from dramatiq.broker import Broker, get_broker +from dramatiq.message import Message from django_dramatiq_postgres.conf import Conf @@ -36,7 +44,7 @@ class TaskBase(models.Model): choices=TaskState.choices, help_text=_("Task status"), ) - mtime = models.DateTimeField(default=timezone.now, help_text=_("Task last modified time")) + mtime = models.DateTimeField(default=now, help_text=_("Task last modified time")) result = models.BinaryField(null=True, help_text=_("Task result")) result_expiry = models.DateTimeField(null=True, help_text=_("Result expiry time")) @@ -65,3 +73,90 @@ class TaskBase(models.Model): def __str__(self): return str(self.message_id) + + +def validate_crontab(value): + try: + Cron(value) + except ValueError as exc: + raise ValidationError( + _("%(value)s is not a valid crontab"), + params={"value": value}, + ) from exc + + +class ScheduleBase(models.Model): + id = models.UUIDField(primary_key=True, default=uuid4, editable=False) + + actor_name = models.TextField(editable=False, help_text=_("Dramatiq actor to call")) + args = models.BinaryField(editable=False, help_text=_("Args to send to the actor")) + kwargs = models.BinaryField(editable=False, help_text=_("Kwargs to send to the actor")) + options = models.BinaryField(editable=False, help_text=_("Options to send to the actor")) + + rel_obj_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True) + rel_obj_id = models.TextField(null=True) + rel_obj = GenericForeignKey("rel_obj_content_type", "rel_obj_id") + + crontab = models.TextField(validators=[validate_crontab], help_text=_("When to schedule tasks")) + paused = models.BooleanField(default=False, help_text=_("Pause this schedule")) + + next_run = models.DateTimeField(auto_now_add=True, editable=False) + + class Meta: + abstract = True + verbose_name = _("Schedule") + verbose_name_plural = _("Schedules") + triggers = ( + pgtrigger.Trigger( + name="set_next_run_on_paused", + operation=pgtrigger.Update, + when=pgtrigger.Before, + condition=pgtrigger.Q(new__paused=True) & pgtrigger.Q(old__paused=False), + func=""" + NEW.next_run = to_timestamp(0); + RETURN NEW; + """, + ), + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._original_crontab = self.crontab + + def __str__(self): + return f"Schedule {self.actor_name} ({self.id})" + + def save(self, *args, **kwargs): + if self.crontab != self._original_crontab: + self.next_run = self.compute_next_run(now()) + + super().save(*args, **kwargs) + + self._original_crontab = self.crontab + + @classmethod + def dispatch_by_actor(cls, actor: Actor): + """Dispatch a schedule by looking up its actor. + Only available for schedules without custom arguments.""" + schedule = cls.objects.filter(actor_name=actor.actor_name, paused=False).first() + if schedule: + schedule.send() + + def send(self, broker: Broker | None = None) -> Message: + broker = broker or get_broker() + actor: Actor = broker.get_actor(self.actor_name) + return actor.send_with_options( + args=pickle.loads(self.args), # nosec + kwargs=pickle.loads(self.kwargs), # nosec + rel_obj=self, + **pickle.loads(self.options), # nosec + ) + + def compute_next_run(self, next_run: datetime | None = None) -> datetime: + next_run: datetime = self.next_run if not next_run else next_run + while True: + next_run = Cron(self.crontab).schedule(next_run).next() + if next_run > now(): + return next_run + # Force to calculate the one after + next_run += timedelta(minutes=1) diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py new file mode 100644 index 0000000000..56c0db78ad --- /dev/null +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py @@ -0,0 +1,50 @@ +import pglock +from django.db import router, transaction +from django.db.models import QuerySet +from django.utils.functional import cached_property +from django.utils.module_loading import import_string +from django.utils.timezone import now +from dramatiq.broker import Broker +from dramatiq.logging import get_logger + +from django_dramatiq_postgres.conf import Conf +from django_dramatiq_postgres.models import ScheduleBase + + +class Scheduler: + def __init__(self, broker: Broker): + self.logger = get_logger(__name__, type(self)) + self.broker = broker + + @cached_property + def model(self) -> type[ScheduleBase]: + return import_string(Conf().task_class) + + @property + def query_set(self) -> QuerySet: + return self.model.objects.filter(paused=False) + + def process_schedule(self, schedule: ScheduleBase): + schedule.next_run = schedule.compute_next_run() + schedule.send(self.broker) + schedule.save() + + def _lock(self) -> pglock.advisory: + return pglock.advisory( + lock_id=f"{Conf().channel_prefix}.scheduler", + side_effect=pglock.Return, + timeout=0, + ) + + def _run(self): + with transaction.atomic(using=router.db_for_write(self.model)): + for schedule in self.query_set.select_for_update().filter( + next_run__lt=now(), + ): + self.process_schedule(schedule) + + def run(self): + with self._lock() as lock_acquired: + if not lock_acquired: + return + self._run()