From a3ebfd9bbd19f05fd154cb071db769209d71f533 Mon Sep 17 00:00:00 2001 From: Marc 'risson' Schmitt Date: Thu, 27 Mar 2025 14:24:56 +0100 Subject: [PATCH] wip Signed-off-by: Marc 'risson' Schmitt --- authentik/tasks/apps.py | 6 -- authentik/tasks/broker.py | 11 +++- .../migrations/0002_task_schedule_uid.py | 18 ++++++ authentik/tasks/models.py | 3 +- .../schedules/migrations/0001_initial.py | 20 ++++++- .../migrations/0002_alter_schedule_options.py | 21 ------- authentik/tasks/schedules/models.py | 23 +++++++- authentik/tasks/schedules/scheduler.py | 56 +++++++++++++++++++ pyproject.toml | 1 + uv.lock | 14 +++++ 10 files changed, 139 insertions(+), 34 deletions(-) create mode 100644 authentik/tasks/migrations/0002_task_schedule_uid.py delete mode 100644 authentik/tasks/schedules/migrations/0002_alter_schedule_options.py create mode 100644 authentik/tasks/schedules/scheduler.py diff --git a/authentik/tasks/apps.py b/authentik/tasks/apps.py index 0db967d213..96eb430cdd 100644 --- a/authentik/tasks/apps.py +++ b/authentik/tasks/apps.py @@ -1,8 +1,5 @@ -from datetime import timedelta - import dramatiq from dramatiq.encoder import PickleEncoder -from dramatiq.middleware import AgeLimit, Retries, TimeLimit from authentik.blueprints.apps import ManagedAppConfig @@ -21,9 +18,6 @@ class AuthentikTasksConfig(ManagedAppConfig): broker = PostgresBroker() broker.add_middleware(FullyQualifiedActorName()) # broker.add_middleware(Prometheus()) - broker.add_middleware(AgeLimit(max_age=timedelta(days=30).total_seconds() * 1000)) - broker.add_middleware(TimeLimit()) - broker.add_middleware(Retries(max_retries=3)) broker.add_middleware(CurrentTask()) dramatiq.set_broker(broker) return super().ready() diff --git a/authentik/tasks/broker.py b/authentik/tasks/broker.py index d45bf006ca..461ddbb182 100644 --- a/authentik/tasks/broker.py +++ b/authentik/tasks/broker.py @@ -21,7 +21,7 @@ from dramatiq.broker import Broker, Consumer, MessageProxy from dramatiq.common import compute_backoff, current_millis, dq_name, xq_name from dramatiq.errors import ConnectionError, QueueJoinTimeout from dramatiq.message import Message -from dramatiq.middleware import Middleware +from dramatiq.middleware import Middleware, Prometheus, default_middleware from dramatiq.results import Results from pglock.core import _cast_lock_id from psycopg import Notify, sql @@ -80,10 +80,18 @@ class PostgresBroker(Broker): self.logger = get_logger().bind() self.queues = set() + self.actor_options = { + "schedule_uid", + } self.db_alias = db_alias + self.middleware = [] self.add_middleware(DbConnectionMiddleware()) self.add_middleware(TenantMiddleware()) + for middleware in default_middleware: + if middleware == Prometheus: + pass + self.add_middleware(middleware()) if results: self.backend = PostgresBackend() self.add_middleware(Results(backend=self.backend)) @@ -160,6 +168,7 @@ class PostgresBroker(Broker): "actor_name": message.actor_name, "state": TaskState.QUEUED, "message": message.encode(), + "schedule_uid": message.options.get("schedule_uid", ""), } create_defaults = { **query, diff --git a/authentik/tasks/migrations/0002_task_schedule_uid.py b/authentik/tasks/migrations/0002_task_schedule_uid.py new file mode 100644 index 0000000000..43d4f9fb57 --- /dev/null +++ b/authentik/tasks/migrations/0002_task_schedule_uid.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0.13 on 2025-03-27 13:24 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("authentik_tasks", "0001_initial"), + ] + + operations = [ + migrations.AddField( + model_name="task", + name="schedule_uid", + field=models.TextField(blank=True), + ), + ] diff --git a/authentik/tasks/models.py b/authentik/tasks/models.py index 68b8909d60..aed2d06206 100644 --- a/authentik/tasks/models.py +++ b/authentik/tasks/models.py @@ -59,8 +59,9 @@ class Task(models.Model): result = models.BinaryField(null=True, help_text=_("Task result")) result_expiry = models.DateTimeField(null=True, help_text=_("Result expiry time")) - # Probably only have one `logs` field + schedule_uid = models.TextField(blank=True) uid = models.TextField(blank=True) + # Probably only have one `logs` field description = models.TextField(blank=True) status = models.TextField(blank=True, choices=TaskStatus.choices) messages = models.JSONField(default=list) diff --git a/authentik/tasks/schedules/migrations/0001_initial.py b/authentik/tasks/schedules/migrations/0001_initial.py index 142d3d3c48..acf79f7753 100644 --- a/authentik/tasks/schedules/migrations/0001_initial.py +++ b/authentik/tasks/schedules/migrations/0001_initial.py @@ -1,5 +1,7 @@ -# Generated by Django 5.0.13 on 2025-03-25 17:22 +# Generated by Django 5.0.13 on 2025-03-27 11:20 +import authentik.tasks.schedules.models +import uuid from django.db import migrations, models @@ -13,7 +15,13 @@ class Migration(migrations.Migration): migrations.CreateModel( name="Schedule", fields=[ - ("id", models.TextField(editable=False, primary_key=True, serialize=False)), + ( + "id", + models.UUIDField( + default=uuid.uuid4, editable=False, primary_key=True, serialize=False + ), + ), + ("uid", models.TextField(editable=False, unique=True)), ("name", models.TextField(editable=False, help_text="Schedule display name")), ( "actor_name", @@ -21,12 +29,18 @@ class Migration(migrations.Migration): ), ("args", models.BinaryField(help_text="Args to send to the actor")), ("kwargs", models.BinaryField(help_text="Kwargs to send to the actor")), - ("crontab", models.TextField()), + ( + "crontab", + models.TextField( + validators=[authentik.tasks.schedules.models.validate_crontab] + ), + ), ("next_run", models.DateTimeField(auto_now_add=True)), ], options={ "verbose_name": "Schedule", "verbose_name_plural": "Schedules", + "default_permissions": ("change", "view"), }, ), ] diff --git a/authentik/tasks/schedules/migrations/0002_alter_schedule_options.py b/authentik/tasks/schedules/migrations/0002_alter_schedule_options.py deleted file mode 100644 index 023f368589..0000000000 --- a/authentik/tasks/schedules/migrations/0002_alter_schedule_options.py +++ /dev/null @@ -1,21 +0,0 @@ -# Generated by Django 5.0.13 on 2025-03-25 17:31 - -from django.db import migrations - - -class Migration(migrations.Migration): - - dependencies = [ - ("authentik_tasks_schedules", "0001_initial"), - ] - - operations = [ - migrations.AlterModelOptions( - name="schedule", - options={ - "default_permissions": ("change", "view"), - "verbose_name": "Schedule", - "verbose_name_plural": "Schedules", - }, - ), - ] diff --git a/authentik/tasks/schedules/models.py b/authentik/tasks/schedules/models.py index 6d76969b27..77c55bc685 100644 --- a/authentik/tasks/schedules/models.py +++ b/authentik/tasks/schedules/models.py @@ -1,11 +1,27 @@ +from uuid import uuid4 +from cron_converter import Cron + +from django.core.exceptions import ValidationError from django.db import models from django.utils.translation import gettext_lazy as _ +from django.utils.timezone import datetime from authentik.lib.models import SerializerModel +def validate_crontab(value): + try: + Cron(value) + except ValueError as exc: + raise ValidationError( + _("%(value)s is not a valid crontab"), + params={"value": value}, + ) from exc + + class Schedule(SerializerModel): - id = models.TextField(primary_key=True, editable=False) + id = models.UUIDField(primary_key=True, default=uuid4, editable=False) + uid = models.TextField(unique=True, editable=False) name = models.TextField(editable=False, help_text=_("Schedule display name")) @@ -13,7 +29,7 @@ class Schedule(SerializerModel): args = models.BinaryField(editable=False, help_text=_("Args to send to the actor")) kwargs = models.BinaryField(editable=False, help_text=_("Kwargs to send to the actor")) - crontab = models.TextField() + crontab = models.TextField(validators=[validate_crontab]) next_run = models.DateTimeField(auto_now_add=True) @@ -33,3 +49,6 @@ class Schedule(SerializerModel): from authentik.tasks.schedules.api import ScheduleSerializer return ScheduleSerializer + + def calculate_next_run(self, next_run: datetime) -> datetime: + return Cron(self.crontab).schedule(next_run).next() diff --git a/authentik/tasks/schedules/scheduler.py b/authentik/tasks/schedules/scheduler.py new file mode 100644 index 0000000000..bf8d79c8de --- /dev/null +++ b/authentik/tasks/schedules/scheduler.py @@ -0,0 +1,56 @@ +from django.db import router, transaction +from structlog.stdlib import get_logger +from authentik.tasks.schedules.models import Schedule +from django.utils.timezone import now +from dramatiq.broker import Broker +import pickle + +from authentik.tenants.models import Tenant +import pglock + + +LOGGER = get_logger() + + +class Scheduler: + def __init__(self, broker: Broker): + self.broker = broker + + def process_schedule(self, schedule: Schedule): + next_run = schedule.next_run + while True: + next_run = schedule.calculate_next_run(next_run) + if next_run > now(): + break + schedule.next_run = next_run + + actor = self.broker.get_actor(schedule.actor_name) + actor.send_with_options( + args=pickle.loads(schedule.args), + kwargs=pickle.loads(schedule.kwargs), + options={ + "schedule_uid": schedule.uid, + }, + ) + + schedule.save() + + def run_per_tenant(self, tenant: Tenant): + with pglock.advisory( + lock_id=f"goauthentik.io/{tenant.schema_name}/tasks/scheduler", + side_effect=pglock.Return, + timeout=0, + ) as lock_acquired: + if not lock_acquired: + LOGGER.debug( + "Failed to acquire lock for tasks scheduling, skipping", + tenant=tenant.schema_name, + ) + with transaction.atomic(using=router.db_for_write(Schedule)): + for schedule in Schedule.objects.select_for_update().filter(next_run__lt=now()): + self.process_schedule(schedule) + + def run(self): + for tenant in Tenant.objects.filter(enabled=True): + with tenant: + self.run_per_tenant(tenant) diff --git a/pyproject.toml b/pyproject.toml index 8db497c1f7..434bfd2ce7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "celery", "channels", "channels-redis", + "cron-converter", "cryptography", "dacite", "deepmerge", diff --git a/uv.lock b/uv.lock index 2ec1933157..2afa008cc5 100644 --- a/uv.lock +++ b/uv.lock @@ -169,6 +169,7 @@ dependencies = [ { name = "celery" }, { name = "channels" }, { name = "channels-redis" }, + { name = "cron-converter" }, { name = "cryptography" }, { name = "dacite" }, { name = "deepmerge" }, @@ -270,6 +271,7 @@ requires-dist = [ { name = "celery" }, { name = "channels" }, { name = "channels-redis" }, + { name = "cron-converter" }, { name = "cryptography" }, { name = "dacite" }, { name = "deepmerge" }, @@ -844,6 +846,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fb/b2/f655700e1024dec98b10ebaafd0cedbc25e40e4abe62a3c8e2ceef4f8f0a/coverage-7.6.12-py3-none-any.whl", hash = "sha256:eb8668cfbc279a536c633137deeb9435d2962caec279c3f8cf8b91fff6ff8953", size = 200552 }, ] +[[package]] +name = "cron-converter" +version = "1.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "python-dateutil" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c1/45/549d071e7bde4d3bb6a566b1a116e3b79803df916c3499d27509b214a965/cron_converter-1.2.1.tar.gz", hash = "sha256:6766c6ba44b8236201ac03030f314fd655343c1c4848ce216458e8d340066c59", size = 14313 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2e/76/2a477e17b7c5c49e81bdc711aab7ba9a2a661c54b7c5021e0c1c01abb0e0/cron_converter-1.2.1-py3-none-any.whl", hash = "sha256:4604e356c15a8fbe76a86bb42508f611ad3cade7dd65e2d6f601c2e0d5226ffc", size = 13338 }, +] + [[package]] name = "cryptography" version = "44.0.2"