@ -3,6 +3,7 @@
|
|||||||
from prometheus_client import Info
|
from prometheus_client import Info
|
||||||
|
|
||||||
from authentik.blueprints.apps import ManagedAppConfig
|
from authentik.blueprints.apps import ManagedAppConfig
|
||||||
|
from authentik.lib.utils.time import fqdn_rand
|
||||||
|
|
||||||
PROM_INFO = Info("authentik_version", "Currently running authentik version")
|
PROM_INFO = Info("authentik_version", "Currently running authentik version")
|
||||||
|
|
||||||
@ -14,3 +15,11 @@ class AuthentikAdminConfig(ManagedAppConfig):
|
|||||||
label = "authentik_admin"
|
label = "authentik_admin"
|
||||||
verbose_name = "authentik Admin"
|
verbose_name = "authentik Admin"
|
||||||
default = True
|
default = True
|
||||||
|
|
||||||
|
def get_tenant_schedules(self):
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
"actor_name": "authentik.admin.tasks.update_latest_version",
|
||||||
|
"crontab": f"{fqdn_rand('admin_latest_version')} * * * *",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
"""authentik Blueprints app"""
|
"""authentik Blueprints app"""
|
||||||
|
|
||||||
|
import pickle # nosec
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from importlib import import_module
|
from importlib import import_module
|
||||||
from inspect import ismethod
|
from inspect import ismethod
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from django.apps import AppConfig
|
from django.apps import AppConfig
|
||||||
from django.db import DatabaseError, InternalError, ProgrammingError
|
from django.db import DatabaseError, InternalError, ProgrammingError
|
||||||
@ -80,6 +82,35 @@ class ManagedAppConfig(AppConfig):
|
|||||||
func._authentik_managed_reconcile = ManagedAppConfig.RECONCILE_GLOBAL_CATEGORY
|
func._authentik_managed_reconcile = ManagedAppConfig.RECONCILE_GLOBAL_CATEGORY
|
||||||
return func
|
return func
|
||||||
|
|
||||||
|
def get_tenant_schedules(self) -> list[dict[str, Any]]:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def get_global_schedules(self) -> list[dict[str, Any]]:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def _reconcile_schedules(self, schedules: list[dict[str, Any]]):
|
||||||
|
from authentik.tasks.schedules.models import Schedule
|
||||||
|
|
||||||
|
for schedule in schedules:
|
||||||
|
query = {
|
||||||
|
"uid": schedule.get("uid", schedule["actor_name"]),
|
||||||
|
}
|
||||||
|
defaults = {
|
||||||
|
**query,
|
||||||
|
"actor_name": schedule["actor_name"],
|
||||||
|
"args": pickle.dumps(schedule.get("args", ())),
|
||||||
|
"kwargs": pickle.dumps(schedule.get("kwargs", {})),
|
||||||
|
}
|
||||||
|
create_defaults = {
|
||||||
|
**defaults,
|
||||||
|
"crontab": schedule["crontab"],
|
||||||
|
}
|
||||||
|
Schedule.objects.update_or_create(
|
||||||
|
**query,
|
||||||
|
defaults=defaults,
|
||||||
|
create_defaults=create_defaults,
|
||||||
|
)
|
||||||
|
|
||||||
def _reconcile_tenant(self) -> None:
|
def _reconcile_tenant(self) -> None:
|
||||||
"""reconcile ourselves for tenanted methods"""
|
"""reconcile ourselves for tenanted methods"""
|
||||||
from authentik.tenants.models import Tenant
|
from authentik.tenants.models import Tenant
|
||||||
@ -92,6 +123,7 @@ class ManagedAppConfig(AppConfig):
|
|||||||
for tenant in tenants:
|
for tenant in tenants:
|
||||||
with tenant:
|
with tenant:
|
||||||
self._reconcile(self.RECONCILE_TENANT_CATEGORY)
|
self._reconcile(self.RECONCILE_TENANT_CATEGORY)
|
||||||
|
self._reconcile_schedules(self.get_tenant_schedules())
|
||||||
|
|
||||||
def _reconcile_global(self) -> None:
|
def _reconcile_global(self) -> None:
|
||||||
"""
|
"""
|
||||||
@ -102,6 +134,7 @@ class ManagedAppConfig(AppConfig):
|
|||||||
|
|
||||||
with schema_context(get_public_schema_name()):
|
with schema_context(get_public_schema_name()):
|
||||||
self._reconcile(self.RECONCILE_GLOBAL_CATEGORY)
|
self._reconcile(self.RECONCILE_GLOBAL_CATEGORY)
|
||||||
|
self._reconcile_schedules(self.get_global_schedules())
|
||||||
|
|
||||||
|
|
||||||
class AuthentikBlueprintsConfig(ManagedAppConfig):
|
class AuthentikBlueprintsConfig(ManagedAppConfig):
|
||||||
|
@ -30,9 +30,9 @@ from structlog.stdlib import get_logger
|
|||||||
|
|
||||||
from authentik.tasks.models import CHANNEL_PREFIX, ChannelIdentifier, Task, TaskState
|
from authentik.tasks.models import CHANNEL_PREFIX, ChannelIdentifier, Task, TaskState
|
||||||
from authentik.tasks.results import PostgresBackend
|
from authentik.tasks.results import PostgresBackend
|
||||||
|
from authentik.tasks.schedules.scheduler import Scheduler
|
||||||
from authentik.tenants.models import Tenant
|
from authentik.tenants.models import Tenant
|
||||||
from authentik.tenants.utils import get_current_tenant
|
from authentik.tenants.utils import get_current_tenant
|
||||||
from authentik.tasks.schedules.scheduler import Scheduler
|
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
||||||
@ -176,7 +176,7 @@ class PostgresBroker(Broker):
|
|||||||
**query,
|
**query,
|
||||||
**defaults,
|
**defaults,
|
||||||
}
|
}
|
||||||
self.query_set.update_or_create(
|
obj, created = self.query_set.update_or_create(
|
||||||
**query,
|
**query,
|
||||||
defaults=defaults,
|
defaults=defaults,
|
||||||
create_defaults=create_defaults,
|
create_defaults=create_defaults,
|
||||||
|
@ -107,7 +107,7 @@ class Task(SerializerModel):
|
|||||||
self.messages = list(messages)
|
self.messages = list(messages)
|
||||||
for idx, msg in enumerate(self.messages):
|
for idx, msg in enumerate(self.messages):
|
||||||
if not isinstance(msg, LogEvent):
|
if not isinstance(msg, LogEvent):
|
||||||
self.messages[idx] = LogEvent(msg, logger=self.__name__, log_level="info")
|
self.messages[idx] = LogEvent(msg, logger=str(self), log_level="info")
|
||||||
self.messages = sanitize_item(self.messages)
|
self.messages = sanitize_item(self.messages)
|
||||||
|
|
||||||
def set_error(self, exception: Exception, *messages: LogEvent | str):
|
def set_error(self, exception: Exception, *messages: LogEvent | str):
|
||||||
@ -115,6 +115,6 @@ class Task(SerializerModel):
|
|||||||
self.status = TaskStatus.ERROR
|
self.status = TaskStatus.ERROR
|
||||||
self.messages = list(messages)
|
self.messages = list(messages)
|
||||||
self.messages.extend(
|
self.messages.extend(
|
||||||
[LogEvent(exception_to_string(exception), logger=self.__name__, log_level="error")]
|
[LogEvent(exception_to_string(exception), logger=str(self), log_level="error")]
|
||||||
)
|
)
|
||||||
self.messages = sanitize_item(self.messages)
|
self.messages = sanitize_item(self.messages)
|
||||||
|
@ -14,7 +14,8 @@ class ScheduleSerializer(ModelSerializer):
|
|||||||
model = Schedule
|
model = Schedule
|
||||||
fields = [
|
fields = [
|
||||||
"id",
|
"id",
|
||||||
"name",
|
"uid",
|
||||||
|
"actor_name",
|
||||||
"crontab",
|
"crontab",
|
||||||
"next_run",
|
"next_run",
|
||||||
]
|
]
|
||||||
@ -30,6 +31,6 @@ class ScheduleViewSet(
|
|||||||
serializer_class = ScheduleSerializer
|
serializer_class = ScheduleSerializer
|
||||||
search_fields = (
|
search_fields = (
|
||||||
"id",
|
"id",
|
||||||
"name",
|
"uid",
|
||||||
)
|
)
|
||||||
ordering = ("id",)
|
ordering = ("-next_run", "uid")
|
||||||
|
@ -0,0 +1,17 @@
|
|||||||
|
# Generated by Django 5.0.13 on 2025-03-27 14:00
|
||||||
|
|
||||||
|
from django.db import migrations
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
("authentik_tasks_schedules", "0001_initial"),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.RemoveField(
|
||||||
|
model_name="schedule",
|
||||||
|
name="name",
|
||||||
|
),
|
||||||
|
]
|
@ -21,17 +21,15 @@ def validate_crontab(value):
|
|||||||
|
|
||||||
class Schedule(SerializerModel):
|
class Schedule(SerializerModel):
|
||||||
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
|
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
|
||||||
uid = models.TextField(unique=True, editable=False)
|
uid = models.TextField(unique=True, editable=False, help_text=_("Unique schedule identifier"))
|
||||||
|
|
||||||
name = models.TextField(editable=False, help_text=_("Schedule display name"))
|
|
||||||
|
|
||||||
actor_name = models.TextField(editable=False, help_text=_("Dramatiq actor to call"))
|
actor_name = models.TextField(editable=False, help_text=_("Dramatiq actor to call"))
|
||||||
args = models.BinaryField(editable=False, help_text=_("Args to send to the actor"))
|
args = models.BinaryField(editable=False, help_text=_("Args to send to the actor"))
|
||||||
kwargs = models.BinaryField(editable=False, help_text=_("Kwargs to send to the actor"))
|
kwargs = models.BinaryField(editable=False, help_text=_("Kwargs to send to the actor"))
|
||||||
|
|
||||||
crontab = models.TextField(validators=[validate_crontab])
|
crontab = models.TextField(validators=[validate_crontab], help_text=_("When to schedule tasks"))
|
||||||
|
|
||||||
next_run = models.DateTimeField(auto_now_add=True)
|
next_run = models.DateTimeField(auto_now_add=True, editable=False)
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
verbose_name = _("Schedule")
|
verbose_name = _("Schedule")
|
||||||
@ -42,7 +40,7 @@ class Schedule(SerializerModel):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.name
|
return self.uid
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def serializer(self):
|
def serializer(self):
|
||||||
|
@ -2,7 +2,8 @@ import pickle # nosec
|
|||||||
|
|
||||||
import pglock
|
import pglock
|
||||||
from django.db import router, transaction
|
from django.db import router, transaction
|
||||||
from django.utils.timezone import now
|
from django.utils.timezone import now, timedelta
|
||||||
|
from dramatiq.actor import Actor
|
||||||
from dramatiq.broker import Broker
|
from dramatiq.broker import Broker
|
||||||
from structlog.stdlib import get_logger
|
from structlog.stdlib import get_logger
|
||||||
|
|
||||||
@ -22,15 +23,15 @@ class Scheduler:
|
|||||||
next_run = schedule.calculate_next_run(next_run)
|
next_run = schedule.calculate_next_run(next_run)
|
||||||
if next_run > now():
|
if next_run > now():
|
||||||
break
|
break
|
||||||
|
# Force to calculate the one after
|
||||||
|
next_run += timedelta(minutes=2)
|
||||||
schedule.next_run = next_run
|
schedule.next_run = next_run
|
||||||
|
|
||||||
actor = self.broker.get_actor(schedule.actor_name)
|
actor: Actor = self.broker.get_actor(schedule.actor_name)
|
||||||
actor.send_with_options(
|
actor.send_with_options(
|
||||||
args=pickle.loads(schedule.args), # nosec
|
args=pickle.loads(schedule.args), # nosec
|
||||||
kwargs=pickle.loads(schedule.kwargs), # nosec
|
kwargs=pickle.loads(schedule.kwargs), # nosec
|
||||||
options={
|
schedule_uid=schedule.uid,
|
||||||
"schedule_uid": schedule.uid,
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
schedule.save()
|
schedule.save()
|
||||||
|
Reference in New Issue
Block a user