add database aware scheduler with tenant support

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens Langhammer
2024-04-18 01:15:27 +02:00
parent 4af415f3fd
commit 3c8232b9a5
8 changed files with 86 additions and 5 deletions

View File

@ -0,0 +1,12 @@
"""Reporting app config"""
from authentik.enterprise.apps import EnterpriseConfig
class AuthentikEnterpriseReporting(EnterpriseConfig):
"""authentik enterprise reporting app config"""
name = "authentik.enterprise.reporting"
label = "authentik_reporting"
verbose_name = "authentik Enterprise.Reporting"
default = True

View File

@ -0,0 +1,18 @@
from uuid import uuid4
from django.db import models
class Report(models.Model):
report_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
name = models.TextField()
schedule = models.TextField()
def __str__(self) -> str:
return self.name
def do_the_thing(self):
pass

View File

@ -0,0 +1,35 @@
from json import dumps
from celery.schedules import crontab
from django.db.models.signals import post_save, pre_delete
from django.dispatch import receiver
from django_celery_beat.models import CrontabSchedule, PeriodicTask
from authentik.enterprise.reporting.models import Report
@receiver(post_save, sender=Report)
def report_post_save(sender, instance: Report, **_):
schedule = CrontabSchedule.from_schedule(crontab())
schedule.save()
PeriodicTask.objects.update_or_create(
name=str(instance.pk),
defaults={
"crontab": schedule,
"task": "authentik.enterprise.reporting.tasks.process_report",
"queue": "authentik_reporting",
"description": f"Report {instance.name}",
"kwargs": dumps(
{
"report_uuid": str(instance.pk),
}
),
},
)
@receiver(pre_delete, sender=Report)
def report_pre_delete(sender, instance: Report, **_):
PeriodicTask.objects.filter(name=str(instance.pk)).delete()
# Cleanup schedules without any tasks
CrontabSchedule.objects.filter(periodictask__isnull=True).delete()

View File

@ -0,0 +1,10 @@
from authentik.enterprise.reporting.models import Report
from authentik.root.celery import CELERY_APP
@CELERY_APP.task()
def process_report(report_uuid: str):
report = Report.objects.filter(pk=report_uuid).first()
if not report:
return
report.do_the_thing()

View File

@ -17,6 +17,7 @@ TENANT_APPS = [
"authentik.enterprise.providers.google_workspace",
"authentik.enterprise.providers.microsoft_entra",
"authentik.enterprise.providers.ssf",
"authentik.enterprise.reporting",
"authentik.enterprise.stages.authenticator_endpoint_gdtc",
"authentik.enterprise.stages.source",
]

View File

@ -125,6 +125,7 @@ TENANT_APPS = [
"authentik.brands",
"authentik.blueprints",
"guardian",
"django_celery_beat",
]
TENANT_MODEL = "authentik_tenants.Tenant"

View File

@ -1,14 +1,18 @@
"""Tenant-aware Celery beat scheduler"""
from tenant_schemas_celery.scheduler import (
TenantAwarePersistentScheduler as BaseTenantAwarePersistentScheduler,
)
from tenant_schemas_celery.scheduler import TenantAwareScheduleEntry
from django_celery_beat.schedulers import DatabaseScheduler, ModelEntry
from tenant_schemas_celery.scheduler import TenantAwareScheduleEntry, TenantAwareSchedulerMixin
class TenantAwarePersistentScheduler(BaseTenantAwarePersistentScheduler):
class SchedulerEntry(ModelEntry, TenantAwareScheduleEntry):
pass
class TenantAwarePersistentScheduler(TenantAwareSchedulerMixin, DatabaseScheduler):
"""Tenant-aware Celery beat scheduler"""
Entry = SchedulerEntry
@classmethod
def get_queryset(cls):
return super().get_queryset().filter(ready=True)