diff --git a/authentik/sources/ldap/tasks.py b/authentik/sources/ldap/tasks.py index 0c23c2e913..addbbdf4e7 100644 --- a/authentik/sources/ldap/tasks.py +++ b/authentik/sources/ldap/tasks.py @@ -103,7 +103,6 @@ def ldap_sync_paginator(source: LDAPSource, sync: type[BaseLDAPSynchronizer], ** return signatures -# Need to store results to be able to wait for the task above @actor(time_limit=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000) def ldap_sync_page(source_pk: str, sync_class: str, page_cache_key: str): """Synchronization of an LDAP Source""" diff --git a/authentik/tasks/apps.py b/authentik/tasks/apps.py index b53d29c45a..2cebfe0202 100644 --- a/authentik/tasks/apps.py +++ b/authentik/tasks/apps.py @@ -1,6 +1,16 @@ import dramatiq from dramatiq.broker import Broker, get_broker from dramatiq.encoder import PickleEncoder +from dramatiq.middleware import ( + AgeLimit, + Callbacks, + Pipelines, + # Prometheus, + Retries, + ShutdownNotifications, + TimeLimit, +) +from dramatiq.results.middleware import Results from authentik.blueprints.apps import ManagedAppConfig @@ -13,9 +23,19 @@ class AuthentikTasksConfig(ManagedAppConfig): def _set_dramatiq_middlewares(self, broker: Broker) -> None: from authentik.tasks.middleware import CurrentTask, FullyQualifiedActorName + from authentik.tasks.results import PostgresBackend - broker.add_middleware(FullyQualifiedActorName()) + # TODO: fixme # broker.add_middleware(Prometheus()) + broker.add_middleware(AgeLimit()) + # Task timeout, 5 minutes by default for all tasks + broker.add_middleware(TimeLimit(time_limit=600 * 1000)) + broker.add_middleware(ShutdownNotifications()) + broker.add_middleware(Callbacks()) + broker.add_middleware(Pipelines()) + broker.add_middleware(Retries()) + broker.add_middleware(Results(backend=PostgresBackend(), store_results=True)) + broker.add_middleware(FullyQualifiedActorName()) broker.add_middleware(CurrentTask()) def ready(self) -> None: diff --git a/authentik/tasks/broker.py b/authentik/tasks/broker.py index c9ffebffca..ec2d2d1d1d 100644 --- a/authentik/tasks/broker.py +++ b/authentik/tasks/broker.py @@ -26,18 +26,17 @@ from dramatiq.middleware import ( Callbacks, Middleware, Pipelines, + Prometheus, Retries, ShutdownNotifications, TimeLimit, ) -from dramatiq.results import Results from pglock.core import _cast_lock_id from psycopg import Notify, sql from psycopg.errors import AdminShutdown from structlog.stdlib import get_logger from authentik.tasks.models import CHANNEL_PREFIX, ChannelIdentifier, Task, TaskState -from authentik.tasks.results import PostgresBackend from authentik.tasks.schedules.scheduler import Scheduler from authentik.tenants.models import Tenant from authentik.tenants.utils import get_current_tenant @@ -87,7 +86,13 @@ class TenantMiddleware(Middleware): class PostgresBroker(Broker): - def __init__(self, *args, db_alias: str = DEFAULT_DB_ALIAS, results: bool = True, **kwargs): + def __init__( + self, + *args, + middleware: list[Middleware] | None = None, + db_alias: str = DEFAULT_DB_ALIAS, + **kwargs, + ): super().__init__(*args, middleware=[], **kwargs) self.logger = get_logger().bind() @@ -100,18 +105,19 @@ class PostgresBroker(Broker): self.middleware = [] self.add_middleware(DbConnectionMiddleware()) self.add_middleware(TenantMiddleware()) - for middleware in ( - AgeLimit, - TimeLimit, - ShutdownNotifications, - Callbacks, - Pipelines, - Retries, - ): - self.add_middleware(middleware()) - if results: - self.backend = PostgresBackend() - self.add_middleware(Results(backend=self.backend)) + if middleware is None: + for m in ( + Prometheus, + AgeLimit, + TimeLimit, + ShutdownNotifications, + Callbacks, + Pipelines, + Retries, + ): + self.add_middleware(m()) + for m in middleware or []: + self.add_middleware(m) @property def connection(self) -> DatabaseWrapper: diff --git a/authentik/tasks/results.py b/authentik/tasks/results.py index 38c1e8a342..f397637532 100644 --- a/authentik/tasks/results.py +++ b/authentik/tasks/results.py @@ -1,7 +1,7 @@ from django.db import DEFAULT_DB_ALIAS from django.db.models import QuerySet from django.utils import timezone -from dramatiq.message import Message, get_encoder +from dramatiq.message import Message from dramatiq.results.backend import Missing, MResult, Result, ResultBackend from authentik.tasks.models import Task @@ -29,9 +29,8 @@ class PostgresBackend(ResultBackend): return self.encoder.decode(data) def _store(self, message_key: str, result: Result, ttl: int) -> None: - encoder = get_encoder() self.query_set.filter(message_id=message_key).update( mtime=timezone.now(), - result=encoder.encode(result), + result=self.encoder.encode(result), result_expiry=timezone.now() + timezone.timedelta(milliseconds=ttl), )