start using things from the package

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
Marc 'risson' Schmitt
2025-06-16 16:19:21 +02:00
parent 5ae69f5987
commit 1cfaddf49d
12 changed files with 36 additions and 224 deletions

View File

@ -6,7 +6,7 @@ PWD = $(shell pwd)
UID = $(shell id -u) UID = $(shell id -u)
GID = $(shell id -g) GID = $(shell id -g)
NPM_VERSION = $(shell python -m scripts.generate_semver) NPM_VERSION = $(shell python -m scripts.generate_semver)
PY_SOURCES = authentik tests scripts lifecycle .github PY_SOURCES = authentik packages tests scripts lifecycle .github
DOCKER_IMAGE ?= "authentik:test" DOCKER_IMAGE ?= "authentik:test"
GEN_API_TS = gen-ts-api GEN_API_TS = gen-ts-api

View File

@ -64,6 +64,7 @@ SHARED_APPS = [
"pgactivity", "pgactivity",
"pglock", "pglock",
"channels", "channels",
"django_dramatiq_postgres",
"authentik.tasks", "authentik.tasks",
] ]
TENANT_APPS = [ TENANT_APPS = [
@ -361,7 +362,8 @@ DRAMATIQ = {
"dramatiq.middleware.time_limit.TimeLimit", "dramatiq.middleware.time_limit.TimeLimit",
{ {
# 5 minutes task timeout by default for all tasks # 5 minutes task timeout by default for all tasks
"time_limit": 600 * 1000, "time_limit": 600
* 1000,
}, },
), ),
("dramatiq.middleware.shutdown.ShutdownNotifications", {}), ("dramatiq.middleware.shutdown.ShutdownNotifications", {}),
@ -371,7 +373,8 @@ DRAMATIQ = {
# TODO: results # TODO: results
("authentik.tasks.middleware.FullyQualifiedActorName", {}), ("authentik.tasks.middleware.FullyQualifiedActorName", {}),
("authentik.tasks.middleware.CurrentTask", {}), ("authentik.tasks.middleware.CurrentTask", {}),
) ),
"task_class": "authentik.tasks.models.Task",
} }

View File

@ -1,4 +1,4 @@
# Generated by Django 5.1.10 on 2025-06-10 13:46 # Generated by Django 5.1.11 on 2025-06-16 13:59
import django.db.models.deletion import django.db.models.deletion
import django.utils.timezone import django.utils.timezone
@ -74,6 +74,7 @@ class Migration(migrations.Migration):
"verbose_name": "Task", "verbose_name": "Task",
"verbose_name_plural": "Tasks", "verbose_name_plural": "Tasks",
"permissions": [("retrigger_task", "Restart failed task")], "permissions": [("retrigger_task", "Restart failed task")],
"abstract": False,
"default_permissions": ("view",), "default_permissions": ("view",),
"indexes": [ "indexes": [
models.Index(fields=["state", "mtime"], name="authentik_t_state_bb4a31_idx"), models.Index(fields=["state", "mtime"], name="authentik_t_state_bb4a31_idx"),
@ -91,8 +92,8 @@ class Migration(migrations.Migration):
sql=pgtrigger.compiler.UpsertTriggerSql( sql=pgtrigger.compiler.UpsertTriggerSql(
condition="WHEN (NEW.\"state\" = 'queued')", condition="WHEN (NEW.\"state\" = 'queued')",
constraint="CONSTRAINT", constraint="CONSTRAINT",
func="\n PERFORM pg_notify(\n 'authentik.tasks.' || NEW.queue_name || '.enqueue',\n NEW.message_id::text\n );\n RETURN NEW;\n ", func="\n PERFORM pg_notify(\n 'dramatiq.tasks.tasks.' || NEW.queue_name || '.enqueue',\n NEW.message_id::text\n );\n RETURN NEW;\n ",
hash="0a9ee3db61e4d63fd72b31322fbb821706dd8a78", hash="e53355a30f439252c7250e5296947f59de6ca57e",
operation="INSERT OR UPDATE", operation="INSERT OR UPDATE",
pgid="pgtrigger_notify_enqueueing_0bc94", pgid="pgtrigger_notify_enqueueing_0bc94",
table="authentik_tasks_task", table="authentik_tasks_task",
@ -101,4 +102,18 @@ class Migration(migrations.Migration):
), ),
), ),
), ),
pgtrigger.migrations.AddTrigger(
model_name="task",
trigger=pgtrigger.compiler.Trigger(
name="update_aggregated_status",
sql=pgtrigger.compiler.UpsertTriggerSql(
func="\n NEW.aggregated_status := CASE\n WHEN NEW.state != 'done' THEN NEW.state\n ELSE COALESCE((\n SELECT CASE\n WHEN bool_or(msg->>'log_level' = 'error') THEN 'error'\n WHEN bool_or(msg->>'log_level' = 'warning') THEN 'warning'\n WHEN bool_or(msg->>'log_level' = 'info') THEN 'info'\n ELSE 'done'\n END\n FROM jsonb_array_elements(NEW._messages) AS msg\n ), 'done')\n END;\n\n RETURN NEW;\n ",
hash="ebc09bc08c1624966c0c58a52f243fe25a842058",
operation="INSERT OR UPDATE",
pgid="pgtrigger_update_aggregated_status_f18c4",
table="authentik_tasks_task",
when="BEFORE",
),
),
),
] ]

View File

@ -1,32 +0,0 @@
# Generated by Django 5.1.10 on 2025-06-10 14:17
import pgtrigger.compiler
import pgtrigger.migrations
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("authentik_tasks", "0001_initial"),
]
operations = [
pgtrigger.migrations.AddTrigger(
model_name="task",
trigger=pgtrigger.compiler.Trigger(
name="update_aggregated_status",
sql=pgtrigger.compiler.UpsertTriggerSql(
constraint="CONSTRAINT",
declare="DECLARE aggregated_status TEXT; max_log_level TEXT;",
func="\n NEW.aggregated_status := CASE\n WHEN NEW.status != 'done' THEN NEW.status\n ELSE COALESCE((\n SELECT CASE\n WHEN bool_or(msg->'log_level' = 'error') THEN 'error'\n WHEN bool_or(msg->'log_level' = 'warning') THEN 'warning'\n WHEN bool_or(msg->'log_level' = 'info') THEN 'info'\n ELSE 'done'\n END\n FROM jsonb_array_elements(NEW._messages) AS msg\n ), 'done')\n END;\n\n RETURN NEW;\n ",
hash="9f97b7e85dd6428402da79d4f748740f0a3ac88c",
operation="INSERT OR UPDATE",
pgid="pgtrigger_update_aggregated_status_f18c4",
table="authentik_tasks_task",
timing="DEFERRABLE INITIALLY IMMEDIATE",
when="AFTER",
),
),
),
]

View File

@ -1,36 +0,0 @@
# Generated by Django 5.1.10 on 2025-06-10 14:17
import pgtrigger.compiler
import pgtrigger.migrations
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("authentik_tasks", "0002_task_update_aggregated_status"),
]
operations = [
pgtrigger.migrations.RemoveTrigger(
model_name="task",
name="update_aggregated_status",
),
pgtrigger.migrations.AddTrigger(
model_name="task",
trigger=pgtrigger.compiler.Trigger(
name="update_aggregated_status",
sql=pgtrigger.compiler.UpsertTriggerSql(
constraint="CONSTRAINT",
declare="DECLARE aggregated_status TEXT; max_log_level TEXT;",
func="\n NEW.aggregated_status := CASE\n WHEN NEW.state != 'done' THEN NEW.state\n ELSE COALESCE((\n SELECT CASE\n WHEN bool_or(msg->'log_level' = 'error') THEN 'error'\n WHEN bool_or(msg->'log_level' = 'warning') THEN 'warning'\n WHEN bool_or(msg->'log_level' = 'info') THEN 'info'\n ELSE 'done'\n END\n FROM jsonb_array_elements(NEW._messages) AS msg\n ), 'done')\n END;\n\n RETURN NEW;\n ",
hash="7187c511fa7d22f8c34f2068c37b9bf2e51b9e40",
operation="INSERT OR UPDATE",
pgid="pgtrigger_update_aggregated_status_f18c4",
table="authentik_tasks_task",
timing="DEFERRABLE INITIALLY IMMEDIATE",
when="AFTER",
),
),
),
]

View File

@ -1,36 +0,0 @@
# Generated by Django 5.1.10 on 2025-06-10 14:19
import pgtrigger.compiler
import pgtrigger.migrations
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("authentik_tasks", "0003_remove_task_update_aggregated_status_and_more"),
]
operations = [
pgtrigger.migrations.RemoveTrigger(
model_name="task",
name="update_aggregated_status",
),
pgtrigger.migrations.AddTrigger(
model_name="task",
trigger=pgtrigger.compiler.Trigger(
name="update_aggregated_status",
sql=pgtrigger.compiler.UpsertTriggerSql(
constraint="CONSTRAINT",
declare="DECLARE aggregated_status TEXT; max_log_level TEXT;",
func="\n NEW.aggregated_status := CASE\n WHEN NEW.state != 'done' THEN NEW.state\n ELSE COALESCE((\n SELECT CASE\n WHEN bool_or(msg->>'log_level' = 'error') THEN 'error'\n WHEN bool_or(msg->>'log_level' = 'warning') THEN 'warning'\n WHEN bool_or(msg->>'log_level' = 'info') THEN 'info'\n ELSE 'done'\n END\n FROM jsonb_array_elements(NEW._messages) AS msg\n ), 'done')\n END;\n\n RETURN NEW;\n ",
hash="6f01e43ff57b11081bff98b7e7b296ccaaa7cf7f",
operation="INSERT OR UPDATE",
pgid="pgtrigger_update_aggregated_status_f18c4",
table="authentik_tasks_task",
timing="DEFERRABLE INITIALLY IMMEDIATE",
when="AFTER",
),
),
),
]

View File

@ -1,35 +0,0 @@
# Generated by Django 5.1.10 on 2025-06-10 14:26
import pgtrigger.compiler
import pgtrigger.migrations
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("authentik_tasks", "0004_remove_task_update_aggregated_status_and_more"),
]
operations = [
pgtrigger.migrations.RemoveTrigger(
model_name="task",
name="update_aggregated_status",
),
pgtrigger.migrations.AddTrigger(
model_name="task",
trigger=pgtrigger.compiler.Trigger(
name="update_aggregated_status",
sql=pgtrigger.compiler.UpsertTriggerSql(
constraint="CONSTRAINT",
func="\n NEW.aggregated_status := CASE\n WHEN NEW.state != 'done' THEN NEW.state\n ELSE COALESCE((\n SELECT CASE\n WHEN bool_or(msg->>'log_level' = 'error') THEN 'error'\n WHEN bool_or(msg->>'log_level' = 'warning') THEN 'warning'\n WHEN bool_or(msg->>'log_level' = 'info') THEN 'info'\n ELSE 'done'\n END\n FROM jsonb_array_elements(NEW._messages) AS msg\n ), 'done')\n END;\n\n RETURN NEW;\n ",
hash="328d7b7a131530f4ecc68f1cadb0146cf948fd03",
operation="INSERT OR UPDATE",
pgid="pgtrigger_update_aggregated_status_f18c4",
table="authentik_tasks_task",
timing="DEFERRABLE INITIALLY IMMEDIATE",
when="AFTER",
),
),
),
]

View File

@ -1,33 +0,0 @@
# Generated by Django 5.1.10 on 2025-06-10 14:33
import pgtrigger.compiler
import pgtrigger.migrations
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("authentik_tasks", "0005_remove_task_update_aggregated_status_and_more"),
]
operations = [
pgtrigger.migrations.RemoveTrigger(
model_name="task",
name="update_aggregated_status",
),
pgtrigger.migrations.AddTrigger(
model_name="task",
trigger=pgtrigger.compiler.Trigger(
name="update_aggregated_status",
sql=pgtrigger.compiler.UpsertTriggerSql(
func="\n NEW.aggregated_status := CASE\n WHEN NEW.state != 'done' THEN NEW.state\n ELSE COALESCE((\n SELECT CASE\n WHEN bool_or(msg->>'log_level' = 'error') THEN 'error'\n WHEN bool_or(msg->>'log_level' = 'warning') THEN 'warning'\n WHEN bool_or(msg->>'log_level' = 'info') THEN 'info'\n ELSE 'done'\n END\n FROM jsonb_array_elements(NEW._messages) AS msg\n ), 'done')\n END;\n\n RETURN NEW;\n ",
hash="ebc09bc08c1624966c0c58a52f243fe25a842058",
operation="INSERT OR UPDATE",
pgid="pgtrigger_update_aggregated_status_f18c4",
table="authentik_tasks_task",
when="BEFORE",
),
),
),
]

View File

@ -1,11 +1,11 @@
from enum import StrEnum, auto from enum import StrEnum, auto
from uuid import UUID, uuid4 from uuid import UUID
import pgtrigger import pgtrigger
from django.contrib.contenttypes.fields import ContentType, GenericForeignKey from django.contrib.contenttypes.fields import ContentType, GenericForeignKey
from django.db import models from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.models import TaskBase
from authentik.events.logs import LogEvent from authentik.events.logs import LogEvent
from authentik.events.utils import sanitize_item from authentik.events.utils import sanitize_item
@ -30,26 +30,12 @@ class TaskState(models.TextChoices):
DONE = "done" DONE = "done"
class Task(SerializerModel): class Task(SerializerModel, TaskBase):
message_id = models.UUIDField(primary_key=True, default=uuid4)
queue_name = models.TextField(default="default", help_text=_("Queue name"))
tenant = models.ForeignKey( tenant = models.ForeignKey(
Tenant, Tenant,
on_delete=models.CASCADE, on_delete=models.CASCADE,
help_text=_("Tenant this task belongs to"), help_text=_("Tenant this task belongs to"),
) )
actor_name = models.TextField(help_text=_("Dramatiq actor name"))
message = models.BinaryField(null=True, help_text=_("Message body"))
state = models.CharField(
default=TaskState.QUEUED,
choices=TaskState.choices,
help_text=_("Task status"),
)
mtime = models.DateTimeField(default=timezone.now, help_text=_("Task last modified time"))
result = models.BinaryField(null=True, help_text=_("Task result"))
result_expiry = models.DateTimeField(null=True, help_text=_("Result expiry time"))
rel_obj_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True) rel_obj_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True)
rel_obj_id = models.TextField(null=True) rel_obj_id = models.TextField(null=True)
@ -60,32 +46,15 @@ class Task(SerializerModel):
aggregated_status = models.TextField() aggregated_status = models.TextField()
class Meta: class Meta(TaskBase.Meta):
verbose_name = _("Task")
verbose_name_plural = _("Tasks")
default_permissions = ("view",) default_permissions = ("view",)
permissions = [ permissions = [
("retrigger_task", _("Restart failed task")), ("retrigger_task", _("Restart failed task")),
] ]
indexes = ( indexes = TaskBase.Meta.indexes + (
models.Index(fields=("state", "mtime")),
models.Index(fields=("rel_obj_content_type", "rel_obj_id")), models.Index(fields=("rel_obj_content_type", "rel_obj_id")),
) )
triggers = ( triggers = TaskBase.Meta.triggers + (
pgtrigger.Trigger(
name="notify_enqueueing",
operation=pgtrigger.Insert | pgtrigger.Update,
when=pgtrigger.After,
condition=pgtrigger.Q(new__state=TaskState.QUEUED),
timing=pgtrigger.Deferred,
func=f"""
PERFORM pg_notify(
'{CHANNEL_PREFIX}.' || NEW.queue_name || '.{ChannelIdentifier.ENQUEUE.value}',
NEW.message_id::text
);
RETURN NEW;
""", # noqa: E501
),
pgtrigger.Trigger( pgtrigger.Trigger(
name="update_aggregated_status", name="update_aggregated_status",
operation=pgtrigger.Insert | pgtrigger.Update, operation=pgtrigger.Insert | pgtrigger.Update,
@ -109,9 +78,6 @@ class Task(SerializerModel):
), ),
) )
def __str__(self):
return str(self.message_id)
@property @property
def uid(self) -> str: def uid(self) -> str:
uid = str(self.actor_name) uid = str(self.actor_name)

View File

@ -32,7 +32,7 @@ class DjangoDramatiqPostgres(AppConfig):
**broker_kwargs, **broker_kwargs,
) )
for middleware_class, middleware_kwargs in Conf.middlewares.items(): for middleware_class, middleware_kwargs in Conf.middlewares:
middleware: dramatiq.middleware.middleware.Middleware = import_string(middleware_class)( middleware: dramatiq.middleware.middleware.Middleware = import_string(middleware_class)(
**middleware_kwargs, **middleware_kwargs,
) )

View File

@ -6,8 +6,6 @@ from queue import Empty, Queue
from random import randint from random import randint
from typing import Any from typing import Any
from django.utils.functional import cached_property
from django.utils.module_loading import import_string
import tenacity import tenacity
from django.db import ( from django.db import (
DEFAULT_DB_ALIAS, DEFAULT_DB_ALIAS,
@ -19,6 +17,8 @@ from django.db import (
from django.db.backends.postgresql.base import DatabaseWrapper from django.db.backends.postgresql.base import DatabaseWrapper
from django.db.models import QuerySet from django.db.models import QuerySet
from django.utils import timezone from django.utils import timezone
from django.utils.functional import cached_property
from django.utils.module_loading import import_string
from dramatiq.broker import Broker, Consumer, MessageProxy from dramatiq.broker import Broker, Consumer, MessageProxy
from dramatiq.common import compute_backoff, current_millis, dq_name, xq_name from dramatiq.common import compute_backoff, current_millis, dq_name, xq_name
from dramatiq.errors import ConnectionError, QueueJoinTimeout from dramatiq.errors import ConnectionError, QueueJoinTimeout
@ -40,7 +40,7 @@ from structlog.stdlib import get_logger
from django_dramatiq_postgres.conf import Conf from django_dramatiq_postgres.conf import Conf
from django_dramatiq_postgres.middleware import DbConnectionMiddleware from django_dramatiq_postgres.middleware import DbConnectionMiddleware
from django_dramatiq_postgres.models import Task, ChannelIdentifier, TaskState, CHANNEL_PREFIX from django_dramatiq_postgres.models import CHANNEL_PREFIX, ChannelIdentifier, TaskBase, TaskState
LOGGER = get_logger() LOGGER = get_logger()
@ -99,7 +99,7 @@ class PostgresBroker(Broker):
return _PostgresConsumer return _PostgresConsumer
@cached_property @cached_property
def model(self) -> type[Task]: def model(self) -> type[TaskBase]:
return import_string(Conf.task_class) return import_string(Conf.task_class)
@property @property

View File

@ -25,7 +25,7 @@ class TaskState(models.TextChoices):
DONE = "done" DONE = "done"
class Task(models.Model): class TaskBase(models.Model):
message_id = models.UUIDField(primary_key=True, default=uuid4) message_id = models.UUIDField(primary_key=True, default=uuid4)
queue_name = models.TextField(default="default", help_text=_("Queue name")) queue_name = models.TextField(default="default", help_text=_("Queue name"))