ldap source: finish migrate tasks

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
Marc 'risson' Schmitt
2025-03-31 18:06:11 +02:00
parent 78180e376f
commit 39f769b150
4 changed files with 44 additions and 20 deletions

View File

@ -103,7 +103,6 @@ def ldap_sync_paginator(source: LDAPSource, sync: type[BaseLDAPSynchronizer], **
return signatures
# Need to store results to be able to wait for the task above
@actor(time_limit=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000)
def ldap_sync_page(source_pk: str, sync_class: str, page_cache_key: str):
"""Synchronization of an LDAP Source"""

View File

@ -1,6 +1,16 @@
import dramatiq
from dramatiq.broker import Broker, get_broker
from dramatiq.encoder import PickleEncoder
from dramatiq.middleware import (
AgeLimit,
Callbacks,
Pipelines,
# Prometheus,
Retries,
ShutdownNotifications,
TimeLimit,
)
from dramatiq.results.middleware import Results
from authentik.blueprints.apps import ManagedAppConfig
@ -13,9 +23,19 @@ class AuthentikTasksConfig(ManagedAppConfig):
def _set_dramatiq_middlewares(self, broker: Broker) -> None:
from authentik.tasks.middleware import CurrentTask, FullyQualifiedActorName
from authentik.tasks.results import PostgresBackend
broker.add_middleware(FullyQualifiedActorName())
# TODO: fixme
# broker.add_middleware(Prometheus())
broker.add_middleware(AgeLimit())
# Task timeout, 5 minutes by default for all tasks
broker.add_middleware(TimeLimit(time_limit=600 * 1000))
broker.add_middleware(ShutdownNotifications())
broker.add_middleware(Callbacks())
broker.add_middleware(Pipelines())
broker.add_middleware(Retries())
broker.add_middleware(Results(backend=PostgresBackend(), store_results=True))
broker.add_middleware(FullyQualifiedActorName())
broker.add_middleware(CurrentTask())
def ready(self) -> None:

View File

@ -26,18 +26,17 @@ from dramatiq.middleware import (
Callbacks,
Middleware,
Pipelines,
Prometheus,
Retries,
ShutdownNotifications,
TimeLimit,
)
from dramatiq.results import Results
from pglock.core import _cast_lock_id
from psycopg import Notify, sql
from psycopg.errors import AdminShutdown
from structlog.stdlib import get_logger
from authentik.tasks.models import CHANNEL_PREFIX, ChannelIdentifier, Task, TaskState
from authentik.tasks.results import PostgresBackend
from authentik.tasks.schedules.scheduler import Scheduler
from authentik.tenants.models import Tenant
from authentik.tenants.utils import get_current_tenant
@ -87,7 +86,13 @@ class TenantMiddleware(Middleware):
class PostgresBroker(Broker):
def __init__(self, *args, db_alias: str = DEFAULT_DB_ALIAS, results: bool = True, **kwargs):
def __init__(
self,
*args,
middleware: list[Middleware] | None = None,
db_alias: str = DEFAULT_DB_ALIAS,
**kwargs,
):
super().__init__(*args, middleware=[], **kwargs)
self.logger = get_logger().bind()
@ -100,18 +105,19 @@ class PostgresBroker(Broker):
self.middleware = []
self.add_middleware(DbConnectionMiddleware())
self.add_middleware(TenantMiddleware())
for middleware in (
AgeLimit,
TimeLimit,
ShutdownNotifications,
Callbacks,
Pipelines,
Retries,
):
self.add_middleware(middleware())
if results:
self.backend = PostgresBackend()
self.add_middleware(Results(backend=self.backend))
if middleware is None:
for m in (
Prometheus,
AgeLimit,
TimeLimit,
ShutdownNotifications,
Callbacks,
Pipelines,
Retries,
):
self.add_middleware(m())
for m in middleware or []:
self.add_middleware(m)
@property
def connection(self) -> DatabaseWrapper:

View File

@ -1,7 +1,7 @@
from django.db import DEFAULT_DB_ALIAS
from django.db.models import QuerySet
from django.utils import timezone
from dramatiq.message import Message, get_encoder
from dramatiq.message import Message
from dramatiq.results.backend import Missing, MResult, Result, ResultBackend
from authentik.tasks.models import Task
@ -29,9 +29,8 @@ class PostgresBackend(ResultBackend):
return self.encoder.decode(data)
def _store(self, message_key: str, result: Result, ttl: int) -> None:
encoder = get_encoder()
self.query_set.filter(message_id=message_key).update(
mtime=timezone.now(),
result=encoder.encode(result),
result=self.encoder.encode(result),
result_expiry=timezone.now() + timezone.timedelta(milliseconds=ttl),
)