From 1cfaddf49d99637b1ad1286686df1c7b619c1fb7 Mon Sep 17 00:00:00 2001 From: Marc 'risson' Schmitt Date: Mon, 16 Jun 2025 16:19:21 +0200 Subject: [PATCH] start using things from the package Signed-off-by: Marc 'risson' Schmitt --- Makefile | 2 +- authentik/root/settings.py | 7 ++- authentik/tasks/migrations/0001_initial.py | 21 +++++++-- .../0002_task_update_aggregated_status.py | 32 ------------- ..._task_update_aggregated_status_and_more.py | 36 --------------- ..._task_update_aggregated_status_and_more.py | 36 --------------- ..._task_update_aggregated_status_and_more.py | 35 -------------- ..._task_update_aggregated_status_and_more.py | 33 ------------- authentik/tasks/models.py | 46 +++---------------- .../django_dramatiq_postgres/apps.py | 2 +- .../django_dramatiq_postgres/broker.py | 8 ++-- .../django_dramatiq_postgres/models.py | 2 +- 12 files changed, 36 insertions(+), 224 deletions(-) delete mode 100644 authentik/tasks/migrations/0002_task_update_aggregated_status.py delete mode 100644 authentik/tasks/migrations/0003_remove_task_update_aggregated_status_and_more.py delete mode 100644 authentik/tasks/migrations/0004_remove_task_update_aggregated_status_and_more.py delete mode 100644 authentik/tasks/migrations/0005_remove_task_update_aggregated_status_and_more.py delete mode 100644 authentik/tasks/migrations/0006_remove_task_update_aggregated_status_and_more.py diff --git a/Makefile b/Makefile index a8ef77e592..e7d5a8dc00 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ PWD = $(shell pwd) UID = $(shell id -u) GID = $(shell id -g) NPM_VERSION = $(shell python -m scripts.generate_semver) -PY_SOURCES = authentik tests scripts lifecycle .github +PY_SOURCES = authentik packages tests scripts lifecycle .github DOCKER_IMAGE ?= "authentik:test" GEN_API_TS = gen-ts-api diff --git a/authentik/root/settings.py b/authentik/root/settings.py index a80e8974f4..4a27306de5 100644 --- a/authentik/root/settings.py +++ b/authentik/root/settings.py @@ -64,6 +64,7 @@ SHARED_APPS = [ "pgactivity", "pglock", "channels", + "django_dramatiq_postgres", "authentik.tasks", ] TENANT_APPS = [ @@ -361,7 +362,8 @@ DRAMATIQ = { "dramatiq.middleware.time_limit.TimeLimit", { # 5 minutes task timeout by default for all tasks - "time_limit": 600 * 1000, + "time_limit": 600 + * 1000, }, ), ("dramatiq.middleware.shutdown.ShutdownNotifications", {}), @@ -371,7 +373,8 @@ DRAMATIQ = { # TODO: results ("authentik.tasks.middleware.FullyQualifiedActorName", {}), ("authentik.tasks.middleware.CurrentTask", {}), - ) + ), + "task_class": "authentik.tasks.models.Task", } diff --git a/authentik/tasks/migrations/0001_initial.py b/authentik/tasks/migrations/0001_initial.py index 6c2526a005..825b3e2448 100644 --- a/authentik/tasks/migrations/0001_initial.py +++ b/authentik/tasks/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.10 on 2025-06-10 13:46 +# Generated by Django 5.1.11 on 2025-06-16 13:59 import django.db.models.deletion import django.utils.timezone @@ -74,6 +74,7 @@ class Migration(migrations.Migration): "verbose_name": "Task", "verbose_name_plural": "Tasks", "permissions": [("retrigger_task", "Restart failed task")], + "abstract": False, "default_permissions": ("view",), "indexes": [ models.Index(fields=["state", "mtime"], name="authentik_t_state_bb4a31_idx"), @@ -91,8 +92,8 @@ class Migration(migrations.Migration): sql=pgtrigger.compiler.UpsertTriggerSql( condition="WHEN (NEW.\"state\" = 'queued')", constraint="CONSTRAINT", - func="\n PERFORM pg_notify(\n 'authentik.tasks.' || NEW.queue_name || '.enqueue',\n NEW.message_id::text\n );\n RETURN NEW;\n ", - hash="0a9ee3db61e4d63fd72b31322fbb821706dd8a78", + func="\n PERFORM pg_notify(\n 'dramatiq.tasks.tasks.' || NEW.queue_name || '.enqueue',\n NEW.message_id::text\n );\n RETURN NEW;\n ", + hash="e53355a30f439252c7250e5296947f59de6ca57e", operation="INSERT OR UPDATE", pgid="pgtrigger_notify_enqueueing_0bc94", table="authentik_tasks_task", @@ -101,4 +102,18 @@ class Migration(migrations.Migration): ), ), ), + pgtrigger.migrations.AddTrigger( + model_name="task", + trigger=pgtrigger.compiler.Trigger( + name="update_aggregated_status", + sql=pgtrigger.compiler.UpsertTriggerSql( + func="\n NEW.aggregated_status := CASE\n WHEN NEW.state != 'done' THEN NEW.state\n ELSE COALESCE((\n SELECT CASE\n WHEN bool_or(msg->>'log_level' = 'error') THEN 'error'\n WHEN bool_or(msg->>'log_level' = 'warning') THEN 'warning'\n WHEN bool_or(msg->>'log_level' = 'info') THEN 'info'\n ELSE 'done'\n END\n FROM jsonb_array_elements(NEW._messages) AS msg\n ), 'done')\n END;\n\n RETURN NEW;\n ", + hash="ebc09bc08c1624966c0c58a52f243fe25a842058", + operation="INSERT OR UPDATE", + pgid="pgtrigger_update_aggregated_status_f18c4", + table="authentik_tasks_task", + when="BEFORE", + ), + ), + ), ] diff --git a/authentik/tasks/migrations/0002_task_update_aggregated_status.py b/authentik/tasks/migrations/0002_task_update_aggregated_status.py deleted file mode 100644 index 4528a48f03..0000000000 --- a/authentik/tasks/migrations/0002_task_update_aggregated_status.py +++ /dev/null @@ -1,32 +0,0 @@ -# Generated by Django 5.1.10 on 2025-06-10 14:17 - -import pgtrigger.compiler -import pgtrigger.migrations -from django.db import migrations - - -class Migration(migrations.Migration): - - dependencies = [ - ("authentik_tasks", "0001_initial"), - ] - - operations = [ - pgtrigger.migrations.AddTrigger( - model_name="task", - trigger=pgtrigger.compiler.Trigger( - name="update_aggregated_status", - sql=pgtrigger.compiler.UpsertTriggerSql( - constraint="CONSTRAINT", - declare="DECLARE aggregated_status TEXT; max_log_level TEXT;", - func="\n NEW.aggregated_status := CASE\n WHEN NEW.status != 'done' THEN NEW.status\n ELSE COALESCE((\n SELECT CASE\n WHEN bool_or(msg->'log_level' = 'error') THEN 'error'\n WHEN bool_or(msg->'log_level' = 'warning') THEN 'warning'\n WHEN bool_or(msg->'log_level' = 'info') THEN 'info'\n ELSE 'done'\n END\n FROM jsonb_array_elements(NEW._messages) AS msg\n ), 'done')\n END;\n\n RETURN NEW;\n ", - hash="9f97b7e85dd6428402da79d4f748740f0a3ac88c", - operation="INSERT OR UPDATE", - pgid="pgtrigger_update_aggregated_status_f18c4", - table="authentik_tasks_task", - timing="DEFERRABLE INITIALLY IMMEDIATE", - when="AFTER", - ), - ), - ), - ] diff --git a/authentik/tasks/migrations/0003_remove_task_update_aggregated_status_and_more.py b/authentik/tasks/migrations/0003_remove_task_update_aggregated_status_and_more.py deleted file mode 100644 index be716959f7..0000000000 --- a/authentik/tasks/migrations/0003_remove_task_update_aggregated_status_and_more.py +++ /dev/null @@ -1,36 +0,0 @@ -# Generated by Django 5.1.10 on 2025-06-10 14:17 - -import pgtrigger.compiler -import pgtrigger.migrations -from django.db import migrations - - -class Migration(migrations.Migration): - - dependencies = [ - ("authentik_tasks", "0002_task_update_aggregated_status"), - ] - - operations = [ - pgtrigger.migrations.RemoveTrigger( - model_name="task", - name="update_aggregated_status", - ), - pgtrigger.migrations.AddTrigger( - model_name="task", - trigger=pgtrigger.compiler.Trigger( - name="update_aggregated_status", - sql=pgtrigger.compiler.UpsertTriggerSql( - constraint="CONSTRAINT", - declare="DECLARE aggregated_status TEXT; max_log_level TEXT;", - func="\n NEW.aggregated_status := CASE\n WHEN NEW.state != 'done' THEN NEW.state\n ELSE COALESCE((\n SELECT CASE\n WHEN bool_or(msg->'log_level' = 'error') THEN 'error'\n WHEN bool_or(msg->'log_level' = 'warning') THEN 'warning'\n WHEN bool_or(msg->'log_level' = 'info') THEN 'info'\n ELSE 'done'\n END\n FROM jsonb_array_elements(NEW._messages) AS msg\n ), 'done')\n END;\n\n RETURN NEW;\n ", - hash="7187c511fa7d22f8c34f2068c37b9bf2e51b9e40", - operation="INSERT OR UPDATE", - pgid="pgtrigger_update_aggregated_status_f18c4", - table="authentik_tasks_task", - timing="DEFERRABLE INITIALLY IMMEDIATE", - when="AFTER", - ), - ), - ), - ] diff --git a/authentik/tasks/migrations/0004_remove_task_update_aggregated_status_and_more.py b/authentik/tasks/migrations/0004_remove_task_update_aggregated_status_and_more.py deleted file mode 100644 index e0584ad25e..0000000000 --- a/authentik/tasks/migrations/0004_remove_task_update_aggregated_status_and_more.py +++ /dev/null @@ -1,36 +0,0 @@ -# Generated by Django 5.1.10 on 2025-06-10 14:19 - -import pgtrigger.compiler -import pgtrigger.migrations -from django.db import migrations - - -class Migration(migrations.Migration): - - dependencies = [ - ("authentik_tasks", "0003_remove_task_update_aggregated_status_and_more"), - ] - - operations = [ - pgtrigger.migrations.RemoveTrigger( - model_name="task", - name="update_aggregated_status", - ), - pgtrigger.migrations.AddTrigger( - model_name="task", - trigger=pgtrigger.compiler.Trigger( - name="update_aggregated_status", - sql=pgtrigger.compiler.UpsertTriggerSql( - constraint="CONSTRAINT", - declare="DECLARE aggregated_status TEXT; max_log_level TEXT;", - func="\n NEW.aggregated_status := CASE\n WHEN NEW.state != 'done' THEN NEW.state\n ELSE COALESCE((\n SELECT CASE\n WHEN bool_or(msg->>'log_level' = 'error') THEN 'error'\n WHEN bool_or(msg->>'log_level' = 'warning') THEN 'warning'\n WHEN bool_or(msg->>'log_level' = 'info') THEN 'info'\n ELSE 'done'\n END\n FROM jsonb_array_elements(NEW._messages) AS msg\n ), 'done')\n END;\n\n RETURN NEW;\n ", - hash="6f01e43ff57b11081bff98b7e7b296ccaaa7cf7f", - operation="INSERT OR UPDATE", - pgid="pgtrigger_update_aggregated_status_f18c4", - table="authentik_tasks_task", - timing="DEFERRABLE INITIALLY IMMEDIATE", - when="AFTER", - ), - ), - ), - ] diff --git a/authentik/tasks/migrations/0005_remove_task_update_aggregated_status_and_more.py b/authentik/tasks/migrations/0005_remove_task_update_aggregated_status_and_more.py deleted file mode 100644 index aab319618b..0000000000 --- a/authentik/tasks/migrations/0005_remove_task_update_aggregated_status_and_more.py +++ /dev/null @@ -1,35 +0,0 @@ -# Generated by Django 5.1.10 on 2025-06-10 14:26 - -import pgtrigger.compiler -import pgtrigger.migrations -from django.db import migrations - - -class Migration(migrations.Migration): - - dependencies = [ - ("authentik_tasks", "0004_remove_task_update_aggregated_status_and_more"), - ] - - operations = [ - pgtrigger.migrations.RemoveTrigger( - model_name="task", - name="update_aggregated_status", - ), - pgtrigger.migrations.AddTrigger( - model_name="task", - trigger=pgtrigger.compiler.Trigger( - name="update_aggregated_status", - sql=pgtrigger.compiler.UpsertTriggerSql( - constraint="CONSTRAINT", - func="\n NEW.aggregated_status := CASE\n WHEN NEW.state != 'done' THEN NEW.state\n ELSE COALESCE((\n SELECT CASE\n WHEN bool_or(msg->>'log_level' = 'error') THEN 'error'\n WHEN bool_or(msg->>'log_level' = 'warning') THEN 'warning'\n WHEN bool_or(msg->>'log_level' = 'info') THEN 'info'\n ELSE 'done'\n END\n FROM jsonb_array_elements(NEW._messages) AS msg\n ), 'done')\n END;\n\n RETURN NEW;\n ", - hash="328d7b7a131530f4ecc68f1cadb0146cf948fd03", - operation="INSERT OR UPDATE", - pgid="pgtrigger_update_aggregated_status_f18c4", - table="authentik_tasks_task", - timing="DEFERRABLE INITIALLY IMMEDIATE", - when="AFTER", - ), - ), - ), - ] diff --git a/authentik/tasks/migrations/0006_remove_task_update_aggregated_status_and_more.py b/authentik/tasks/migrations/0006_remove_task_update_aggregated_status_and_more.py deleted file mode 100644 index a484ab80de..0000000000 --- a/authentik/tasks/migrations/0006_remove_task_update_aggregated_status_and_more.py +++ /dev/null @@ -1,33 +0,0 @@ -# Generated by Django 5.1.10 on 2025-06-10 14:33 - -import pgtrigger.compiler -import pgtrigger.migrations -from django.db import migrations - - -class Migration(migrations.Migration): - - dependencies = [ - ("authentik_tasks", "0005_remove_task_update_aggregated_status_and_more"), - ] - - operations = [ - pgtrigger.migrations.RemoveTrigger( - model_name="task", - name="update_aggregated_status", - ), - pgtrigger.migrations.AddTrigger( - model_name="task", - trigger=pgtrigger.compiler.Trigger( - name="update_aggregated_status", - sql=pgtrigger.compiler.UpsertTriggerSql( - func="\n NEW.aggregated_status := CASE\n WHEN NEW.state != 'done' THEN NEW.state\n ELSE COALESCE((\n SELECT CASE\n WHEN bool_or(msg->>'log_level' = 'error') THEN 'error'\n WHEN bool_or(msg->>'log_level' = 'warning') THEN 'warning'\n WHEN bool_or(msg->>'log_level' = 'info') THEN 'info'\n ELSE 'done'\n END\n FROM jsonb_array_elements(NEW._messages) AS msg\n ), 'done')\n END;\n\n RETURN NEW;\n ", - hash="ebc09bc08c1624966c0c58a52f243fe25a842058", - operation="INSERT OR UPDATE", - pgid="pgtrigger_update_aggregated_status_f18c4", - table="authentik_tasks_task", - when="BEFORE", - ), - ), - ), - ] diff --git a/authentik/tasks/models.py b/authentik/tasks/models.py index c3d5822bf1..f735d7ff40 100644 --- a/authentik/tasks/models.py +++ b/authentik/tasks/models.py @@ -1,11 +1,11 @@ from enum import StrEnum, auto -from uuid import UUID, uuid4 +from uuid import UUID import pgtrigger from django.contrib.contenttypes.fields import ContentType, GenericForeignKey from django.db import models -from django.utils import timezone from django.utils.translation import gettext_lazy as _ +from django_dramatiq_postgres.models import TaskBase from authentik.events.logs import LogEvent from authentik.events.utils import sanitize_item @@ -30,26 +30,12 @@ class TaskState(models.TextChoices): DONE = "done" -class Task(SerializerModel): - message_id = models.UUIDField(primary_key=True, default=uuid4) - queue_name = models.TextField(default="default", help_text=_("Queue name")) - +class Task(SerializerModel, TaskBase): tenant = models.ForeignKey( Tenant, on_delete=models.CASCADE, help_text=_("Tenant this task belongs to"), ) - actor_name = models.TextField(help_text=_("Dramatiq actor name")) - message = models.BinaryField(null=True, help_text=_("Message body")) - state = models.CharField( - default=TaskState.QUEUED, - choices=TaskState.choices, - help_text=_("Task status"), - ) - mtime = models.DateTimeField(default=timezone.now, help_text=_("Task last modified time")) - - result = models.BinaryField(null=True, help_text=_("Task result")) - result_expiry = models.DateTimeField(null=True, help_text=_("Result expiry time")) rel_obj_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True) rel_obj_id = models.TextField(null=True) @@ -60,32 +46,15 @@ class Task(SerializerModel): aggregated_status = models.TextField() - class Meta: - verbose_name = _("Task") - verbose_name_plural = _("Tasks") + class Meta(TaskBase.Meta): default_permissions = ("view",) permissions = [ ("retrigger_task", _("Restart failed task")), ] - indexes = ( - models.Index(fields=("state", "mtime")), + indexes = TaskBase.Meta.indexes + ( models.Index(fields=("rel_obj_content_type", "rel_obj_id")), ) - triggers = ( - pgtrigger.Trigger( - name="notify_enqueueing", - operation=pgtrigger.Insert | pgtrigger.Update, - when=pgtrigger.After, - condition=pgtrigger.Q(new__state=TaskState.QUEUED), - timing=pgtrigger.Deferred, - func=f""" - PERFORM pg_notify( - '{CHANNEL_PREFIX}.' || NEW.queue_name || '.{ChannelIdentifier.ENQUEUE.value}', - NEW.message_id::text - ); - RETURN NEW; - """, # noqa: E501 - ), + triggers = TaskBase.Meta.triggers + ( pgtrigger.Trigger( name="update_aggregated_status", operation=pgtrigger.Insert | pgtrigger.Update, @@ -109,9 +78,6 @@ class Task(SerializerModel): ), ) - def __str__(self): - return str(self.message_id) - @property def uid(self) -> str: uid = str(self.actor_name) diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/apps.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/apps.py index f8bfab0ae3..f6c870813c 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/apps.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/apps.py @@ -32,7 +32,7 @@ class DjangoDramatiqPostgres(AppConfig): **broker_kwargs, ) - for middleware_class, middleware_kwargs in Conf.middlewares.items(): + for middleware_class, middleware_kwargs in Conf.middlewares: middleware: dramatiq.middleware.middleware.Middleware = import_string(middleware_class)( **middleware_kwargs, ) diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py index 3592855c40..e7a9afb215 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py @@ -6,8 +6,6 @@ from queue import Empty, Queue from random import randint from typing import Any -from django.utils.functional import cached_property -from django.utils.module_loading import import_string import tenacity from django.db import ( DEFAULT_DB_ALIAS, @@ -19,6 +17,8 @@ from django.db import ( from django.db.backends.postgresql.base import DatabaseWrapper from django.db.models import QuerySet from django.utils import timezone +from django.utils.functional import cached_property +from django.utils.module_loading import import_string from dramatiq.broker import Broker, Consumer, MessageProxy from dramatiq.common import compute_backoff, current_millis, dq_name, xq_name from dramatiq.errors import ConnectionError, QueueJoinTimeout @@ -40,7 +40,7 @@ from structlog.stdlib import get_logger from django_dramatiq_postgres.conf import Conf from django_dramatiq_postgres.middleware import DbConnectionMiddleware -from django_dramatiq_postgres.models import Task, ChannelIdentifier, TaskState, CHANNEL_PREFIX +from django_dramatiq_postgres.models import CHANNEL_PREFIX, ChannelIdentifier, TaskBase, TaskState LOGGER = get_logger() @@ -99,7 +99,7 @@ class PostgresBroker(Broker): return _PostgresConsumer @cached_property - def model(self) -> type[Task]: + def model(self) -> type[TaskBase]: return import_string(Conf.task_class) @property diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/models.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/models.py index 7df934c3a5..984408611d 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/models.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/models.py @@ -25,7 +25,7 @@ class TaskState(models.TextChoices): DONE = "done" -class Task(models.Model): +class TaskBase(models.Model): message_id = models.UUIDField(primary_key=True, default=uuid4) queue_name = models.TextField(default="default", help_text=_("Queue name"))