move schedules to its package

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
Marc 'risson' Schmitt
2025-06-19 18:21:59 +02:00
parent 4c491cf221
commit e6614a0705
10 changed files with 205 additions and 195 deletions

View File

@ -123,6 +123,7 @@ TENANT_APPS = [
"authentik.stages.user_login",
"authentik.stages.user_logout",
"authentik.stages.user_write",
"authentik.tasks.schedules",
"authentik.brands",
"authentik.blueprints",
"guardian",
@ -526,9 +527,5 @@ for _app in set(SHARED_APPS + TENANT_APPS):
_update_settings(f"{_app}.settings")
_update_settings("data.user_settings")
# Import schedules after other apps since it relies on tasks and ScheduledModel being
# registered for its startup.
TENANT_APPS.append("authentik.tasks.schedules")
SHARED_APPS = list(OrderedDict.fromkeys(SHARED_APPS + TENANT_APPS))
INSTALLED_APPS = list(OrderedDict.fromkeys(SHARED_APPS + TENANT_APPS))

View File

@ -1,4 +1,4 @@
# Generated by Django 5.1.11 on 2025-06-16 13:59
# Generated by Django 5.1.11 on 2025-06-19 16:21
import django.db.models.deletion
import django.utils.timezone
@ -92,8 +92,8 @@ class Migration(migrations.Migration):
sql=pgtrigger.compiler.UpsertTriggerSql(
condition="WHEN (NEW.\"state\" = 'queued')",
constraint="CONSTRAINT",
func="\n PERFORM pg_notify(\n 'dramatiq.tasks.tasks.' || NEW.queue_name || '.enqueue',\n NEW.message_id::text\n );\n RETURN NEW;\n ",
hash="e53355a30f439252c7250e5296947f59de6ca57e",
func="\n PERFORM pg_notify(\n 'authentik.tasks.' || NEW.queue_name || '.enqueue',\n NEW.message_id::text\n );\n RETURN NEW;\n ",
hash="0a9ee3db61e4d63fd72b31322fbb821706dd8a78",
operation="INSERT OR UPDATE",
pgid="pgtrigger_notify_enqueueing_0bc94",
table="authentik_tasks_task",

View File

@ -1,36 +0,0 @@
# Generated by Django 5.1.11 on 2025-06-18 15:02
import pgtrigger.compiler
import pgtrigger.migrations
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("authentik_tasks", "0001_initial"),
]
operations = [
pgtrigger.migrations.RemoveTrigger(
model_name="task",
name="notify_enqueueing",
),
pgtrigger.migrations.AddTrigger(
model_name="task",
trigger=pgtrigger.compiler.Trigger(
name="notify_enqueueing",
sql=pgtrigger.compiler.UpsertTriggerSql(
condition="WHEN (NEW.\"state\" = 'queued')",
constraint="CONSTRAINT",
func="\n PERFORM pg_notify(\n 'authentik.tasks.' || NEW.queue_name || '.enqueue',\n NEW.message_id::text\n );\n RETURN NEW;\n ",
hash="0a9ee3db61e4d63fd72b31322fbb821706dd8a78",
operation="INSERT OR UPDATE",
pgid="pgtrigger_notify_enqueueing_0bc94",
table="authentik_tasks_task",
timing="DEFERRABLE INITIALLY DEFERRED",
when="AFTER",
),
),
),
]

View File

@ -2,7 +2,7 @@ from enum import StrEnum, auto
from uuid import UUID
import pgtrigger
from django.contrib.contenttypes.fields import ContentType, GenericForeignKey
from django.contrib.contenttypes.fields import ContentType, GenericForeignKey, GenericRelation
from django.db import models
from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.models import TaskBase
@ -113,3 +113,14 @@ class Task(SerializerModel, TaskBase):
def error(self, message: str | Exception, save: bool = False, **attributes):
self.log("error", message, save=save, **attributes)
class TasksModel(models.Model):
tasks = GenericRelation(
Task,
content_type_field="rel_obj_content_type",
object_id_field="rel_obj_id",
)
class Meta:
abstract = True

View File

@ -1,7 +1,9 @@
# Generated by Django 5.1.9 on 2025-06-05 16:39
# Generated by Django 5.1.11 on 2025-06-19 16:21
import authentik.tasks.schedules.models
import django.db.models.deletion
import django_dramatiq_postgres.models
import pgtrigger.compiler
import pgtrigger.migrations
import uuid
from django.db import migrations, models
@ -24,12 +26,6 @@ class Migration(migrations.Migration):
default=uuid.uuid4, editable=False, primary_key=True, serialize=False
),
),
(
"uid",
models.TextField(
editable=False, help_text="Unique schedule identifier", unique=True
),
),
(
"actor_name",
models.TextField(editable=False, help_text="Dramatiq actor to call"),
@ -37,16 +33,22 @@ class Migration(migrations.Migration):
("args", models.BinaryField(help_text="Args to send to the actor")),
("kwargs", models.BinaryField(help_text="Kwargs to send to the actor")),
("options", models.BinaryField(help_text="Options to send to the actor")),
("rel_obj_id", models.TextField(null=True)),
(
"crontab",
models.TextField(
help_text="When to schedule tasks",
validators=[authentik.tasks.schedules.models.validate_crontab],
validators=[django_dramatiq_postgres.models.validate_crontab],
),
),
("paused", models.BooleanField(default=False, help_text="Pause this schedule")),
("next_run", models.DateTimeField(auto_now_add=True)),
(
"uid",
models.TextField(
editable=False, help_text="Unique schedule identifier", unique=True
),
),
("rel_obj_id", models.TextField(null=True)),
(
"rel_obj_content_type",
models.ForeignKey(
@ -60,6 +62,7 @@ class Migration(migrations.Migration):
"verbose_name": "Schedule",
"verbose_name_plural": "Schedules",
"permissions": [("send_schedule", "Manually trigger a schedule")],
"abstract": False,
"default_permissions": ("change", "view"),
"indexes": [
models.Index(
@ -69,4 +72,19 @@ class Migration(migrations.Migration):
],
},
),
pgtrigger.migrations.AddTrigger(
model_name="schedule",
trigger=pgtrigger.compiler.Trigger(
name="set_next_run_on_paused",
sql=pgtrigger.compiler.UpsertTriggerSql(
condition='WHEN (NEW."paused" AND NOT OLD."paused")',
func="\n NEW.next_run = to_timestamp(0);\n RETURN NEW;\n ",
hash="7fe580a86de70723522cfcbac712785984000f92",
operation="UPDATE",
pgid="pgtrigger_set_next_run_on_paused_95c6d",
table="authentik_tasks_schedules_schedule",
when="BEFORE",
),
),
),
]

View File

@ -1,30 +0,0 @@
# Generated by Django 5.1.10 on 2025-06-11 12:57
import pgtrigger.compiler
import pgtrigger.migrations
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("authentik_tasks_schedules", "0001_initial"),
]
operations = [
pgtrigger.migrations.AddTrigger(
model_name="schedule",
trigger=pgtrigger.compiler.Trigger(
name="set_next_run_on_paused",
sql=pgtrigger.compiler.UpsertTriggerSql(
condition='WHEN (NEW."paused" AND NOT OLD."paused")',
func="\n NEW.next_run = to_timestamp(0);\n RETURN NEW;\n ",
hash="7fe580a86de70723522cfcbac712785984000f92",
operation="UPDATE",
pgid="pgtrigger_set_next_run_on_paused_95c6d",
table="authentik_tasks_schedules_schedule",
when="BEFORE",
),
),
),
]

View File

@ -1,54 +1,22 @@
import pickle # nosec
from uuid import uuid4
import pgtrigger
from cron_converter import Cron
from django.apps import apps
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.db import models
from django.utils.timezone import datetime
from django.utils.translation import gettext_lazy as _
from dramatiq.actor import Actor
from dramatiq.broker import Broker, get_broker
from dramatiq.message import Message
from django_dramatiq_postgres.models import ScheduleBase
from authentik.lib.models import SerializerModel
from authentik.tasks.schedules.lib import ScheduleSpec
def validate_crontab(value):
try:
Cron(value)
except ValueError as exc:
raise ValidationError(
_("%(value)s is not a valid crontab"),
params={"value": value},
) from exc
class Schedule(SerializerModel):
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
class Schedule(SerializerModel, ScheduleBase):
uid = models.TextField(unique=True, editable=False, help_text=_("Unique schedule identifier"))
actor_name = models.TextField(editable=False, help_text=_("Dramatiq actor to call"))
args = models.BinaryField(editable=False, help_text=_("Args to send to the actor"))
kwargs = models.BinaryField(editable=False, help_text=_("Kwargs to send to the actor"))
options = models.BinaryField(editable=False, help_text=_("Options to send to the actor"))
rel_obj_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True)
rel_obj_id = models.TextField(null=True)
rel_obj = GenericForeignKey("rel_obj_content_type", "rel_obj_id")
crontab = models.TextField(validators=[validate_crontab], help_text=_("When to schedule tasks"))
paused = models.BooleanField(default=False, help_text=_("Pause this schedule"))
next_run = models.DateTimeField(auto_now_add=True, editable=False)
class Meta:
verbose_name = _("Schedule")
verbose_name_plural = _("Schedules")
class Meta(ScheduleBase.Meta):
default_permissions = (
"change",
"view",
@ -57,29 +25,9 @@ class Schedule(SerializerModel):
("send_schedule", _("Manually trigger a schedule")),
]
indexes = (models.Index(fields=("rel_obj_content_type", "rel_obj_id")),)
triggers = (
pgtrigger.Trigger(
name="set_next_run_on_paused",
operation=pgtrigger.Update,
when=pgtrigger.Before,
condition=pgtrigger.Q(new__paused=True) & pgtrigger.Q(old__paused=False),
func="""
NEW.next_run = to_timestamp(0);
RETURN NEW;
""",
),
)
def __str__(self):
return self.uid
@classmethod
def dispatch_by_actor(cls, actor: Actor):
"""Dispatch a schedule by looking up its actor.
Only available for schedules without custom arguments."""
schedule = cls.objects.filter(actor_name=actor.actor_name, paused=False).first()
if schedule:
schedule.send()
return f"Schedule {self.actor_name}:{self.uid}"
@property
def serializer(self):
@ -87,20 +35,6 @@ class Schedule(SerializerModel):
return ScheduleSerializer
def send(self, broker: Broker | None = None) -> Message:
broker = broker or get_broker()
actor: Actor = broker.get_actor(self.actor_name)
return actor.send_with_options(
args=pickle.loads(self.args), # nosec
kwargs=pickle.loads(self.kwargs), # nosec
rel_obj=self,
**pickle.loads(self.options), # nosec
)
# TODO: actually do loop here
def calculate_next_run(self, next_run: datetime) -> datetime:
return Cron(self.crontab).schedule(next_run).next()
class ScheduledModel(models.Model):
schedules = GenericRelation(

View File

@ -1,53 +1,24 @@
import pglock
from django.db import router, transaction
from django.utils.timezone import now, timedelta
from dramatiq.broker import Broker
from django_dramatiq_postgres.scheduler import Scheduler as SchedulerBase
from structlog.stdlib import get_logger
from authentik.tasks.schedules.models import Schedule
from authentik.tenants.models import Tenant
LOGGER = get_logger()
class Scheduler:
def __init__(self, broker: Broker):
self.broker = broker
def process_schedule(self, schedule: Schedule):
next_run = schedule.next_run
while True:
next_run = schedule.calculate_next_run(next_run)
if next_run > now():
break
# Force to calculate the one after
next_run += timedelta(minutes=2)
schedule.next_run = next_run
schedule.send(self.broker)
schedule.save()
def run_per_tenant(self, tenant: Tenant):
with pglock.advisory(
lock_id=f"goauthentik.io/{tenant.schema_name}/tasks/scheduler",
class Scheduler(SchedulerBase):
def _lock(self, tenant: Tenant) -> pglock.advisory:
return pglock.advisory(
lock_id=f"authentik.scheduler/{tenant.schema_name}",
side_effect=pglock.Return,
timeout=0,
) as lock_acquired:
if not lock_acquired:
LOGGER.debug(
"Failed to acquire lock for tasks scheduling, skipping",
tenant=tenant.schema_name,
)
return
with transaction.atomic(using=router.db_for_write(Schedule)):
for schedule in Schedule.objects.select_for_update().filter(
next_run__lt=now(),
paused=False,
):
self.process_schedule(schedule)
def run(self):
for tenant in Tenant.objects.filter(ready=True):
with tenant:
self.run_per_tenant(tenant)
with self._lock(tenant) as lock_acquired:
if not lock_acquired:
return
self._run()

View File

@ -1,10 +1,18 @@
import pickle # nosec
from enum import StrEnum, auto
from uuid import uuid4
import pgtrigger
from cron_converter import Cron
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.db import models
from django.utils import timezone
from django.utils.timezone import datetime, now, timedelta
from django.utils.translation import gettext_lazy as _
from dramatiq.actor import Actor
from dramatiq.broker import Broker, get_broker
from dramatiq.message import Message
from django_dramatiq_postgres.conf import Conf
@ -36,7 +44,7 @@ class TaskBase(models.Model):
choices=TaskState.choices,
help_text=_("Task status"),
)
mtime = models.DateTimeField(default=timezone.now, help_text=_("Task last modified time"))
mtime = models.DateTimeField(default=now, help_text=_("Task last modified time"))
result = models.BinaryField(null=True, help_text=_("Task result"))
result_expiry = models.DateTimeField(null=True, help_text=_("Result expiry time"))
@ -65,3 +73,90 @@ class TaskBase(models.Model):
def __str__(self):
return str(self.message_id)
def validate_crontab(value):
try:
Cron(value)
except ValueError as exc:
raise ValidationError(
_("%(value)s is not a valid crontab"),
params={"value": value},
) from exc
class ScheduleBase(models.Model):
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
actor_name = models.TextField(editable=False, help_text=_("Dramatiq actor to call"))
args = models.BinaryField(editable=False, help_text=_("Args to send to the actor"))
kwargs = models.BinaryField(editable=False, help_text=_("Kwargs to send to the actor"))
options = models.BinaryField(editable=False, help_text=_("Options to send to the actor"))
rel_obj_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True)
rel_obj_id = models.TextField(null=True)
rel_obj = GenericForeignKey("rel_obj_content_type", "rel_obj_id")
crontab = models.TextField(validators=[validate_crontab], help_text=_("When to schedule tasks"))
paused = models.BooleanField(default=False, help_text=_("Pause this schedule"))
next_run = models.DateTimeField(auto_now_add=True, editable=False)
class Meta:
abstract = True
verbose_name = _("Schedule")
verbose_name_plural = _("Schedules")
triggers = (
pgtrigger.Trigger(
name="set_next_run_on_paused",
operation=pgtrigger.Update,
when=pgtrigger.Before,
condition=pgtrigger.Q(new__paused=True) & pgtrigger.Q(old__paused=False),
func="""
NEW.next_run = to_timestamp(0);
RETURN NEW;
""",
),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._original_crontab = self.crontab
def __str__(self):
return f"Schedule {self.actor_name} ({self.id})"
def save(self, *args, **kwargs):
if self.crontab != self._original_crontab:
self.next_run = self.compute_next_run(now())
super().save(*args, **kwargs)
self._original_crontab = self.crontab
@classmethod
def dispatch_by_actor(cls, actor: Actor):
"""Dispatch a schedule by looking up its actor.
Only available for schedules without custom arguments."""
schedule = cls.objects.filter(actor_name=actor.actor_name, paused=False).first()
if schedule:
schedule.send()
def send(self, broker: Broker | None = None) -> Message:
broker = broker or get_broker()
actor: Actor = broker.get_actor(self.actor_name)
return actor.send_with_options(
args=pickle.loads(self.args), # nosec
kwargs=pickle.loads(self.kwargs), # nosec
rel_obj=self,
**pickle.loads(self.options), # nosec
)
def compute_next_run(self, next_run: datetime | None = None) -> datetime:
next_run: datetime = self.next_run if not next_run else next_run
while True:
next_run = Cron(self.crontab).schedule(next_run).next()
if next_run > now():
return next_run
# Force to calculate the one after
next_run += timedelta(minutes=1)

View File

@ -0,0 +1,50 @@
import pglock
from django.db import router, transaction
from django.db.models import QuerySet
from django.utils.functional import cached_property
from django.utils.module_loading import import_string
from django.utils.timezone import now
from dramatiq.broker import Broker
from dramatiq.logging import get_logger
from django_dramatiq_postgres.conf import Conf
from django_dramatiq_postgres.models import ScheduleBase
class Scheduler:
def __init__(self, broker: Broker):
self.logger = get_logger(__name__, type(self))
self.broker = broker
@cached_property
def model(self) -> type[ScheduleBase]:
return import_string(Conf().task_class)
@property
def query_set(self) -> QuerySet:
return self.model.objects.filter(paused=False)
def process_schedule(self, schedule: ScheduleBase):
schedule.next_run = schedule.compute_next_run()
schedule.send(self.broker)
schedule.save()
def _lock(self) -> pglock.advisory:
return pglock.advisory(
lock_id=f"{Conf().channel_prefix}.scheduler",
side_effect=pglock.Return,
timeout=0,
)
def _run(self):
with transaction.atomic(using=router.db_for_write(self.model)):
for schedule in self.query_set.select_for_update().filter(
next_run__lt=now(),
):
self.process_schedule(schedule)
def run(self):
with self._lock() as lock_acquired:
if not lock_acquired:
return
self._run()