Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
Marc 'risson' Schmitt
2025-03-24 12:51:20 +01:00
parent df15a78aac
commit dbc4a2b730
10 changed files with 19 additions and 49 deletions

View File

@ -1,7 +1,6 @@
"""test admin tasks""" """test admin tasks"""
from django.core.cache import cache from django.core.cache import cache
from django.test import TestCase
from requests_mock import Mocker from requests_mock import Mocker
from authentik.admin.tasks import ( from authentik.admin.tasks import (

View File

@ -1,8 +1,10 @@
from datetime import timedelta from datetime import timedelta
import dramatiq import dramatiq
from dramatiq.middleware import AgeLimit, Callbacks, Prometheus, Retries, TimeLimit from dramatiq.encoder import PickleEncoder
from dramatiq.middleware import AgeLimit, Callbacks, Retries, TimeLimit
from authentik.blueprints.apps import ManagedAppConfig from authentik.blueprints.apps import ManagedAppConfig
from authentik.tasks.encoder import JSONPickleEncoder
class AuthentikTasksConfig(ManagedAppConfig): class AuthentikTasksConfig(ManagedAppConfig):
@ -15,7 +17,7 @@ class AuthentikTasksConfig(ManagedAppConfig):
from authentik.tasks.broker import PostgresBroker from authentik.tasks.broker import PostgresBroker
from authentik.tasks.middleware import CurrentTask from authentik.tasks.middleware import CurrentTask
dramatiq.set_encoder(JSONPickleEncoder()) dramatiq.set_encoder(PickleEncoder())
broker = PostgresBroker() broker = PostgresBroker()
# broker.add_middleware(Prometheus()) # broker.add_middleware(Prometheus())
broker.add_middleware(AgeLimit(max_age=timedelta(days=30).total_seconds() * 1000)) broker.add_middleware(AgeLimit(max_age=timedelta(days=30).total_seconds() * 1000))

View File

@ -1,5 +1,3 @@
from dramatiq.middleware import Middleware
from psycopg import sql
import functools import functools
import logging import logging
import time import time
@ -24,13 +22,14 @@ from dramatiq.broker import Broker, Consumer, MessageProxy
from dramatiq.common import compute_backoff, current_millis, dq_name, xq_name from dramatiq.common import compute_backoff, current_millis, dq_name, xq_name
from dramatiq.errors import ConnectionError, QueueJoinTimeout from dramatiq.errors import ConnectionError, QueueJoinTimeout
from dramatiq.message import Message from dramatiq.message import Message
from dramatiq.middleware import Middleware
from dramatiq.results import Results from dramatiq.results import Results
from pglock.core import _cast_lock_id from pglock.core import _cast_lock_id
from psycopg import Notify from psycopg import Notify, sql
from psycopg.errors import AdminShutdown from psycopg.errors import AdminShutdown
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
from authentik.tasks.models import Task, CHANNEL_PREFIX, ChannelIdentifier, TaskState from authentik.tasks.models import CHANNEL_PREFIX, ChannelIdentifier, Task, TaskState
from authentik.tasks.results import PostgresBackend from authentik.tasks.results import PostgresBackend
from authentik.tenants.models import Tenant from authentik.tenants.models import Tenant
from authentik.tenants.utils import get_current_tenant from authentik.tenants.utils import get_current_tenant

View File

@ -1,32 +0,0 @@
import jsonpickle
import dramatiq.encoder
from typing import Any
from dramatiq.encoder import MessageData
import orjson
class OrjsonBackend(jsonpickle.JSONBackend):
def encode(self, obj: Any, indent=None, separators=None) -> str:
return orjson.dumps(obj, option=orjson.OPT_NON_STR_KEYS).decode("utf-8")
def decode(self, string: str) -> Any:
return orjson.loads(string)
class JSONPickleEncoder(dramatiq.encoder.Encoder):
def encode(self, data: MessageData) -> bytes:
return jsonpickle.encode(
data,
backend=OrjsonBackend(),
keys=True,
warn=True,
use_base85=True,
).encode()
def decode(self, data: bytes) -> MessageData:
return jsonpickle.decode(
data.decode(),
backend=OrjsonBackend(),
keys=True,
on_missing="warn",
)

View File

@ -1,10 +1,10 @@
import os import os
from django.utils.module_loading import module_has_submodule
from authentik.lib.utils.reflection import get_apps
import sys import sys
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.utils.module_loading import module_has_submodule
from authentik.lib.utils.reflection import get_apps
class Command(BaseCommand): class Command(BaseCommand):

View File

@ -1,6 +1,8 @@
import contextvars import contextvars
from dramatiq.message import Message from dramatiq.message import Message
from dramatiq.middleware import Middleware from dramatiq.middleware import Middleware
from authentik.tasks.models import Task from authentik.tasks.models import Task

View File

@ -1,10 +1,10 @@
from django.utils.translation import gettext_lazy as _
from enum import StrEnum, auto from enum import StrEnum, auto
from uuid import uuid4 from uuid import uuid4
import pgtrigger
import pgtrigger
from django.db import models from django.db import models
from django.utils import timezone from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from authentik.lib.models import SerializerModel from authentik.lib.models import SerializerModel
from authentik.tenants.models import Tenant from authentik.tenants.models import Tenant

View File

@ -4,7 +4,7 @@ from django.utils import timezone
from dramatiq.message import Message, get_encoder from dramatiq.message import Message, get_encoder
from dramatiq.results.backend import Missing, MResult, Result, ResultBackend from dramatiq.results.backend import Missing, MResult, Result, ResultBackend
from authentik.tasks.models import Task, TaskState from authentik.tasks.models import Task
class PostgresBackend(ResultBackend): class PostgresBackend(ResultBackend):

View File

@ -2,11 +2,11 @@ import os
import sys import sys
import warnings import warnings
from authentik.lib.config import CONFIG
from cryptography.hazmat.backends.openssl.backend import backend from cryptography.hazmat.backends.openssl.backend import backend
from defusedxml import defuse_stdlib from defusedxml import defuse_stdlib
from django.utils.autoreload import DJANGO_AUTORELOAD_ENV from django.utils.autoreload import DJANGO_AUTORELOAD_ENV
from authentik.lib.config import CONFIG
from lifecycle.migrate import run_migrations from lifecycle.migrate import run_migrations
from lifecycle.wait_for_db import wait_for_db from lifecycle.wait_for_db import wait_for_db
@ -40,6 +40,6 @@ if (
): ):
run_migrations() run_migrations()
import django import django # noqa: E402
django.setup() django.setup()

View File

@ -1,5 +1,5 @@
from dramatiq import Worker, get_broker
from django.test import TransactionTestCase from django.test import TransactionTestCase
from dramatiq import Worker, get_broker
class TaskTestCase(TransactionTestCase): class TaskTestCase(TransactionTestCase):