Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
Marc 'risson' Schmitt
2025-03-27 14:24:56 +01:00
parent af5b894e62
commit a3ebfd9bbd
10 changed files with 139 additions and 34 deletions

View File

@ -1,8 +1,5 @@
from datetime import timedelta
import dramatiq
from dramatiq.encoder import PickleEncoder
from dramatiq.middleware import AgeLimit, Retries, TimeLimit
from authentik.blueprints.apps import ManagedAppConfig
@ -21,9 +18,6 @@ class AuthentikTasksConfig(ManagedAppConfig):
broker = PostgresBroker()
broker.add_middleware(FullyQualifiedActorName())
# broker.add_middleware(Prometheus())
broker.add_middleware(AgeLimit(max_age=timedelta(days=30).total_seconds() * 1000))
broker.add_middleware(TimeLimit())
broker.add_middleware(Retries(max_retries=3))
broker.add_middleware(CurrentTask())
dramatiq.set_broker(broker)
return super().ready()

View File

@ -21,7 +21,7 @@ from dramatiq.broker import Broker, Consumer, MessageProxy
from dramatiq.common import compute_backoff, current_millis, dq_name, xq_name
from dramatiq.errors import ConnectionError, QueueJoinTimeout
from dramatiq.message import Message
from dramatiq.middleware import Middleware
from dramatiq.middleware import Middleware, Prometheus, default_middleware
from dramatiq.results import Results
from pglock.core import _cast_lock_id
from psycopg import Notify, sql
@ -80,10 +80,18 @@ class PostgresBroker(Broker):
self.logger = get_logger().bind()
self.queues = set()
self.actor_options = {
"schedule_uid",
}
self.db_alias = db_alias
self.middleware = []
self.add_middleware(DbConnectionMiddleware())
self.add_middleware(TenantMiddleware())
for middleware in default_middleware:
if middleware == Prometheus:
pass
self.add_middleware(middleware())
if results:
self.backend = PostgresBackend()
self.add_middleware(Results(backend=self.backend))
@ -160,6 +168,7 @@ class PostgresBroker(Broker):
"actor_name": message.actor_name,
"state": TaskState.QUEUED,
"message": message.encode(),
"schedule_uid": message.options.get("schedule_uid", ""),
}
create_defaults = {
**query,

View File

@ -0,0 +1,18 @@
# Generated by Django 5.0.13 on 2025-03-27 13:24
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_tasks", "0001_initial"),
]
operations = [
migrations.AddField(
model_name="task",
name="schedule_uid",
field=models.TextField(blank=True),
),
]

View File

@ -59,8 +59,9 @@ class Task(models.Model):
result = models.BinaryField(null=True, help_text=_("Task result"))
result_expiry = models.DateTimeField(null=True, help_text=_("Result expiry time"))
# Probably only have one `logs` field
schedule_uid = models.TextField(blank=True)
uid = models.TextField(blank=True)
# Probably only have one `logs` field
description = models.TextField(blank=True)
status = models.TextField(blank=True, choices=TaskStatus.choices)
messages = models.JSONField(default=list)

View File

@ -1,5 +1,7 @@
# Generated by Django 5.0.13 on 2025-03-25 17:22
# Generated by Django 5.0.13 on 2025-03-27 11:20
import authentik.tasks.schedules.models
import uuid
from django.db import migrations, models
@ -13,7 +15,13 @@ class Migration(migrations.Migration):
migrations.CreateModel(
name="Schedule",
fields=[
("id", models.TextField(editable=False, primary_key=True, serialize=False)),
(
"id",
models.UUIDField(
default=uuid.uuid4, editable=False, primary_key=True, serialize=False
),
),
("uid", models.TextField(editable=False, unique=True)),
("name", models.TextField(editable=False, help_text="Schedule display name")),
(
"actor_name",
@ -21,12 +29,18 @@ class Migration(migrations.Migration):
),
("args", models.BinaryField(help_text="Args to send to the actor")),
("kwargs", models.BinaryField(help_text="Kwargs to send to the actor")),
("crontab", models.TextField()),
(
"crontab",
models.TextField(
validators=[authentik.tasks.schedules.models.validate_crontab]
),
),
("next_run", models.DateTimeField(auto_now_add=True)),
],
options={
"verbose_name": "Schedule",
"verbose_name_plural": "Schedules",
"default_permissions": ("change", "view"),
},
),
]

View File

@ -1,21 +0,0 @@
# Generated by Django 5.0.13 on 2025-03-25 17:31
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("authentik_tasks_schedules", "0001_initial"),
]
operations = [
migrations.AlterModelOptions(
name="schedule",
options={
"default_permissions": ("change", "view"),
"verbose_name": "Schedule",
"verbose_name_plural": "Schedules",
},
),
]

View File

@ -1,11 +1,27 @@
from uuid import uuid4
from cron_converter import Cron
from django.core.exceptions import ValidationError
from django.db import models
from django.utils.translation import gettext_lazy as _
from django.utils.timezone import datetime
from authentik.lib.models import SerializerModel
def validate_crontab(value):
try:
Cron(value)
except ValueError as exc:
raise ValidationError(
_("%(value)s is not a valid crontab"),
params={"value": value},
) from exc
class Schedule(SerializerModel):
id = models.TextField(primary_key=True, editable=False)
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
uid = models.TextField(unique=True, editable=False)
name = models.TextField(editable=False, help_text=_("Schedule display name"))
@ -13,7 +29,7 @@ class Schedule(SerializerModel):
args = models.BinaryField(editable=False, help_text=_("Args to send to the actor"))
kwargs = models.BinaryField(editable=False, help_text=_("Kwargs to send to the actor"))
crontab = models.TextField()
crontab = models.TextField(validators=[validate_crontab])
next_run = models.DateTimeField(auto_now_add=True)
@ -33,3 +49,6 @@ class Schedule(SerializerModel):
from authentik.tasks.schedules.api import ScheduleSerializer
return ScheduleSerializer
def calculate_next_run(self, next_run: datetime) -> datetime:
return Cron(self.crontab).schedule(next_run).next()

View File

@ -0,0 +1,56 @@
from django.db import router, transaction
from structlog.stdlib import get_logger
from authentik.tasks.schedules.models import Schedule
from django.utils.timezone import now
from dramatiq.broker import Broker
import pickle
from authentik.tenants.models import Tenant
import pglock
LOGGER = get_logger()
class Scheduler:
def __init__(self, broker: Broker):
self.broker = broker
def process_schedule(self, schedule: Schedule):
next_run = schedule.next_run
while True:
next_run = schedule.calculate_next_run(next_run)
if next_run > now():
break
schedule.next_run = next_run
actor = self.broker.get_actor(schedule.actor_name)
actor.send_with_options(
args=pickle.loads(schedule.args),
kwargs=pickle.loads(schedule.kwargs),
options={
"schedule_uid": schedule.uid,
},
)
schedule.save()
def run_per_tenant(self, tenant: Tenant):
with pglock.advisory(
lock_id=f"goauthentik.io/{tenant.schema_name}/tasks/scheduler",
side_effect=pglock.Return,
timeout=0,
) as lock_acquired:
if not lock_acquired:
LOGGER.debug(
"Failed to acquire lock for tasks scheduling, skipping",
tenant=tenant.schema_name,
)
with transaction.atomic(using=router.db_for_write(Schedule)):
for schedule in Schedule.objects.select_for_update().filter(next_run__lt=now()):
self.process_schedule(schedule)
def run(self):
for tenant in Tenant.objects.filter(enabled=True):
with tenant:
self.run_per_tenant(tenant)

View File

@ -9,6 +9,7 @@ dependencies = [
"celery",
"channels",
"channels-redis",
"cron-converter",
"cryptography",
"dacite",
"deepmerge",

14
uv.lock generated
View File

@ -169,6 +169,7 @@ dependencies = [
{ name = "celery" },
{ name = "channels" },
{ name = "channels-redis" },
{ name = "cron-converter" },
{ name = "cryptography" },
{ name = "dacite" },
{ name = "deepmerge" },
@ -270,6 +271,7 @@ requires-dist = [
{ name = "celery" },
{ name = "channels" },
{ name = "channels-redis" },
{ name = "cron-converter" },
{ name = "cryptography" },
{ name = "dacite" },
{ name = "deepmerge" },
@ -844,6 +846,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/fb/b2/f655700e1024dec98b10ebaafd0cedbc25e40e4abe62a3c8e2ceef4f8f0a/coverage-7.6.12-py3-none-any.whl", hash = "sha256:eb8668cfbc279a536c633137deeb9435d2962caec279c3f8cf8b91fff6ff8953", size = 200552 },
]
[[package]]
name = "cron-converter"
version = "1.2.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "python-dateutil" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c1/45/549d071e7bde4d3bb6a566b1a116e3b79803df916c3499d27509b214a965/cron_converter-1.2.1.tar.gz", hash = "sha256:6766c6ba44b8236201ac03030f314fd655343c1c4848ce216458e8d340066c59", size = 14313 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2e/76/2a477e17b7c5c49e81bdc711aab7ba9a2a661c54b7c5021e0c1c01abb0e0/cron_converter-1.2.1-py3-none-any.whl", hash = "sha256:4604e356c15a8fbe76a86bb42508f611ad3cade7dd65e2d6f601c2e0d5226ffc", size = 13338 },
]
[[package]]
name = "cryptography"
version = "44.0.2"