events: fix SystemTask timestamps and scheduling (#8435)
* events: fix SystemTask timestamps Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix error during prefill Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix prefill not running per tenants Signed-off-by: Jens Langhammer <jens@goauthentik.io> * run scheduled tasks on startup when needed Signed-off-by: Jens Langhammer <jens@goauthentik.io> * remove some explicit startup tasks Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix unrelated crypto warning Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix import loop on reputation policy Signed-off-by: Jens Langhammer <jens@goauthentik.io> * pass correct task params Signed-off-by: Jens Langhammer <jens@goauthentik.io> * make enterprise task monitored Signed-off-by: Jens Langhammer <jens@goauthentik.io> * slightly different formatting for task list Signed-off-by: Jens Langhammer <jens@goauthentik.io> * also pre-squash migrations Signed-off-by: Jens Langhammer <jens@goauthentik.io> --------- Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
@ -1,6 +1,6 @@
|
||||
"""authentik crypto app config"""
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
from authentik.blueprints.apps import ManagedAppConfig
|
||||
@ -47,9 +47,9 @@ class AuthentikCryptoConfig(ManagedAppConfig):
|
||||
cert: Optional[CertificateKeyPair] = CertificateKeyPair.objects.filter(
|
||||
managed=MANAGED_KEY
|
||||
).first()
|
||||
now = datetime.now()
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
if not cert or (
|
||||
now < cert.certificate.not_valid_before or now > cert.certificate.not_valid_after
|
||||
now < cert.certificate.not_valid_after_utc or now > cert.certificate.not_valid_after_utc
|
||||
):
|
||||
self._create_update_cert()
|
||||
|
||||
|
@ -1,10 +1,11 @@
|
||||
"""Enterprise tasks"""
|
||||
|
||||
from authentik.enterprise.models import LicenseKey
|
||||
from authentik.events.system_tasks import SystemTask
|
||||
from authentik.root.celery import CELERY_APP
|
||||
|
||||
|
||||
@CELERY_APP.task()
|
||||
@CELERY_APP.task(base=SystemTask)
|
||||
def calculate_license():
|
||||
"""Calculate licensing status"""
|
||||
LicenseKey.get_total().record_usage()
|
||||
|
@ -1,6 +1,5 @@
|
||||
"""Tasks API"""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from importlib import import_module
|
||||
|
||||
from django.contrib import messages
|
||||
@ -8,7 +7,14 @@ from django.utils.translation import gettext_lazy as _
|
||||
from drf_spectacular.types import OpenApiTypes
|
||||
from drf_spectacular.utils import OpenApiResponse, extend_schema
|
||||
from rest_framework.decorators import action
|
||||
from rest_framework.fields import CharField, ChoiceField, ListField, SerializerMethodField
|
||||
from rest_framework.fields import (
|
||||
CharField,
|
||||
ChoiceField,
|
||||
DateTimeField,
|
||||
FloatField,
|
||||
ListField,
|
||||
SerializerMethodField,
|
||||
)
|
||||
from rest_framework.request import Request
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.serializers import ModelSerializer
|
||||
@ -28,9 +34,9 @@ class SystemTaskSerializer(ModelSerializer):
|
||||
full_name = SerializerMethodField()
|
||||
uid = CharField(required=False)
|
||||
description = CharField()
|
||||
start_timestamp = SerializerMethodField()
|
||||
finish_timestamp = SerializerMethodField()
|
||||
duration = SerializerMethodField()
|
||||
start_timestamp = DateTimeField(read_only=True)
|
||||
finish_timestamp = DateTimeField(read_only=True)
|
||||
duration = FloatField(read_only=True)
|
||||
|
||||
status = ChoiceField(choices=[(x.value, x.name) for x in TaskStatus])
|
||||
messages = ListField(child=CharField())
|
||||
@ -41,18 +47,6 @@ class SystemTaskSerializer(ModelSerializer):
|
||||
return f"{instance.name}:{instance.uid}"
|
||||
return instance.name
|
||||
|
||||
def get_start_timestamp(self, instance: SystemTask) -> datetime:
|
||||
"""Timestamp when the task started"""
|
||||
return datetime.fromtimestamp(instance.start_timestamp, tz=timezone.utc)
|
||||
|
||||
def get_finish_timestamp(self, instance: SystemTask) -> datetime:
|
||||
"""Timestamp when the task finished"""
|
||||
return datetime.fromtimestamp(instance.finish_timestamp, tz=timezone.utc)
|
||||
|
||||
def get_duration(self, instance: SystemTask) -> float:
|
||||
"""Get the duration a task took to run"""
|
||||
return max(instance.finish_timestamp - instance.start_timestamp, 0)
|
||||
|
||||
class Meta:
|
||||
model = SystemTask
|
||||
fields = [
|
||||
|
@ -1,9 +1,12 @@
|
||||
"""authentik events app"""
|
||||
|
||||
from celery.schedules import crontab
|
||||
from prometheus_client import Gauge, Histogram
|
||||
|
||||
from authentik.blueprints.apps import ManagedAppConfig
|
||||
from authentik.lib.config import CONFIG, ENV_PREFIX
|
||||
from authentik.lib.utils.reflection import path_to_class
|
||||
from authentik.root.celery import CELERY_APP
|
||||
|
||||
# TODO: Deprecated metric - remove in 2024.2 or later
|
||||
GAUGE_TASKS = Gauge(
|
||||
@ -57,7 +60,7 @@ class AuthentikEventsConfig(ManagedAppConfig):
|
||||
message=msg,
|
||||
).save()
|
||||
|
||||
def reconcile_prefill_tasks(self):
|
||||
def reconcile_tenant_prefill_tasks(self):
|
||||
"""Prefill tasks"""
|
||||
from authentik.events.models import SystemTask
|
||||
from authentik.events.system_tasks import _prefill_tasks
|
||||
@ -67,3 +70,28 @@ class AuthentikEventsConfig(ManagedAppConfig):
|
||||
continue
|
||||
task.save()
|
||||
self.logger.debug("prefilled task", task_name=task.name)
|
||||
|
||||
def reconcile_tenant_run_scheduled_tasks(self):
|
||||
"""Run schedule tasks which are behind schedule (only applies
|
||||
to tasks of which we keep metrics)"""
|
||||
from authentik.events.models import TaskStatus
|
||||
from authentik.events.system_tasks import SystemTask as CelerySystemTask
|
||||
|
||||
for task in CELERY_APP.conf["beat_schedule"].values():
|
||||
schedule = task["schedule"]
|
||||
if not isinstance(schedule, crontab):
|
||||
continue
|
||||
task_class: CelerySystemTask = path_to_class(task["task"])
|
||||
if not isinstance(task_class, CelerySystemTask):
|
||||
continue
|
||||
db_task = task_class.db()
|
||||
if not db_task:
|
||||
continue
|
||||
due, _ = schedule.is_due(db_task.finish_timestamp)
|
||||
if due or db_task.status == TaskStatus.UNKNOWN:
|
||||
self.logger.debug("Running past-due scheduled task", task=task["task"])
|
||||
task_class.apply_async(
|
||||
args=task.get("args", None),
|
||||
kwargs=task.get("kwargs", None),
|
||||
**task.get("options", {}),
|
||||
)
|
||||
|
@ -0,0 +1,68 @@
|
||||
# Generated by Django 5.0.1 on 2024-02-07 15:42
|
||||
|
||||
import uuid
|
||||
|
||||
import django.utils.timezone
|
||||
from django.db import migrations, models
|
||||
|
||||
import authentik.core.models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
replaces = [
|
||||
("authentik_events", "0004_systemtask"),
|
||||
("authentik_events", "0005_remove_systemtask_finish_timestamp_and_more"),
|
||||
]
|
||||
|
||||
dependencies = [
|
||||
("authentik_events", "0003_rename_tenant_event_brand"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="SystemTask",
|
||||
fields=[
|
||||
(
|
||||
"expires",
|
||||
models.DateTimeField(default=authentik.core.models.default_token_duration),
|
||||
),
|
||||
("expiring", models.BooleanField(default=True)),
|
||||
(
|
||||
"uuid",
|
||||
models.UUIDField(
|
||||
default=uuid.uuid4, editable=False, primary_key=True, serialize=False
|
||||
),
|
||||
),
|
||||
("name", models.TextField()),
|
||||
("uid", models.TextField(null=True)),
|
||||
(
|
||||
"status",
|
||||
models.TextField(
|
||||
choices=[
|
||||
("unknown", "Unknown"),
|
||||
("successful", "Successful"),
|
||||
("warning", "Warning"),
|
||||
("error", "Error"),
|
||||
]
|
||||
),
|
||||
),
|
||||
("description", models.TextField(null=True)),
|
||||
("messages", models.JSONField()),
|
||||
("task_call_module", models.TextField()),
|
||||
("task_call_func", models.TextField()),
|
||||
("task_call_args", models.JSONField(default=list)),
|
||||
("task_call_kwargs", models.JSONField(default=dict)),
|
||||
("duration", models.FloatField(default=0)),
|
||||
("finish_timestamp", models.DateTimeField(default=django.utils.timezone.now)),
|
||||
("start_timestamp", models.DateTimeField(default=django.utils.timezone.now)),
|
||||
],
|
||||
options={
|
||||
"verbose_name": "System Task",
|
||||
"verbose_name_plural": "System Tasks",
|
||||
"permissions": [("run_task", "Run task")],
|
||||
"default_permissions": ["view"],
|
||||
"unique_together": {("name", "uid")},
|
||||
},
|
||||
),
|
||||
]
|
@ -0,0 +1,40 @@
|
||||
# Generated by Django 5.0.1 on 2024-02-06 18:02
|
||||
|
||||
import django.utils.timezone
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("authentik_events", "0004_systemtask"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RemoveField(
|
||||
model_name="systemtask",
|
||||
name="finish_timestamp",
|
||||
),
|
||||
migrations.RemoveField(
|
||||
model_name="systemtask",
|
||||
name="start_timestamp",
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name="systemtask",
|
||||
name="duration",
|
||||
field=models.FloatField(default=0),
|
||||
preserve_default=False,
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name="systemtask",
|
||||
name="finish_timestamp",
|
||||
field=models.DateTimeField(default=django.utils.timezone.now),
|
||||
preserve_default=False,
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name="systemtask",
|
||||
name="start_timestamp",
|
||||
field=models.DateTimeField(default=django.utils.timezone.now),
|
||||
preserve_default=False,
|
||||
),
|
||||
]
|
@ -620,8 +620,9 @@ class SystemTask(SerializerModel, ExpiringModel):
|
||||
name = models.TextField()
|
||||
uid = models.TextField(null=True)
|
||||
|
||||
start_timestamp = models.FloatField()
|
||||
finish_timestamp = models.FloatField()
|
||||
start_timestamp = models.DateTimeField()
|
||||
finish_timestamp = models.DateTimeField()
|
||||
duration = models.FloatField()
|
||||
|
||||
status = models.TextField(choices=TaskStatus.choices)
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
"""Monitored tasks"""
|
||||
|
||||
from datetime import timedelta
|
||||
from timeit import default_timer
|
||||
from datetime import datetime, timedelta
|
||||
from time import perf_counter
|
||||
from typing import Any, Optional
|
||||
|
||||
from django.utils.timezone import now
|
||||
@ -28,7 +28,9 @@ class SystemTask(TenantTask):
|
||||
_messages: list[str]
|
||||
|
||||
_uid: Optional[str]
|
||||
_start: Optional[float] = None
|
||||
# Precise start time from perf_counter
|
||||
_start_precise: Optional[float] = None
|
||||
_start: Optional[datetime] = None
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
@ -53,9 +55,17 @@ class SystemTask(TenantTask):
|
||||
self._messages = [exception_to_string(exception)]
|
||||
|
||||
def before_start(self, task_id, args, kwargs):
|
||||
self._start = default_timer()
|
||||
self._start_precise = perf_counter()
|
||||
self._start = now()
|
||||
return super().before_start(task_id, args, kwargs)
|
||||
|
||||
def db(self) -> Optional[DBSystemTask]:
|
||||
"""Get DB object for latest task"""
|
||||
return DBSystemTask.objects.filter(
|
||||
name=self.__name__,
|
||||
uid=self._uid,
|
||||
).first()
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
def after_return(self, status, retval, task_id, args: list[Any], kwargs: dict[str, Any], einfo):
|
||||
super().after_return(status, retval, task_id, args, kwargs, einfo=einfo)
|
||||
@ -72,8 +82,9 @@ class SystemTask(TenantTask):
|
||||
uid=self._uid,
|
||||
defaults={
|
||||
"description": self.__doc__,
|
||||
"start_timestamp": self._start or default_timer(),
|
||||
"finish_timestamp": default_timer(),
|
||||
"start_timestamp": self._start or now(),
|
||||
"finish_timestamp": now(),
|
||||
"duration": max(perf_counter() - self._start_precise, 0),
|
||||
"task_call_module": self.__module__,
|
||||
"task_call_func": self.__name__,
|
||||
"task_call_args": args,
|
||||
@ -96,8 +107,9 @@ class SystemTask(TenantTask):
|
||||
uid=self._uid,
|
||||
defaults={
|
||||
"description": self.__doc__,
|
||||
"start_timestamp": self._start or default_timer(),
|
||||
"finish_timestamp": default_timer(),
|
||||
"start_timestamp": self._start or now(),
|
||||
"finish_timestamp": now(),
|
||||
"duration": max(perf_counter() - self._start_precise, 0),
|
||||
"task_call_module": self.__module__,
|
||||
"task_call_func": self.__name__,
|
||||
"task_call_args": args,
|
||||
@ -123,11 +135,14 @@ def prefill_task(func):
|
||||
DBSystemTask(
|
||||
name=func.__name__,
|
||||
description=func.__doc__,
|
||||
start_timestamp=now(),
|
||||
finish_timestamp=now(),
|
||||
status=TaskStatus.UNKNOWN,
|
||||
messages=sanitize_item([_("Task has not been run yet.")]),
|
||||
task_call_module=func.__module__,
|
||||
task_call_func=func.__name__,
|
||||
expiring=False,
|
||||
duration=0,
|
||||
)
|
||||
)
|
||||
return func
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
from multiprocessing import Pipe, current_process
|
||||
from multiprocessing.connection import Connection
|
||||
from timeit import default_timer
|
||||
from time import perf_counter
|
||||
from typing import Iterator, Optional
|
||||
|
||||
from django.core.cache import cache
|
||||
@ -84,10 +84,10 @@ class PolicyEngine:
|
||||
def _check_cache(self, binding: PolicyBinding):
|
||||
if not self.use_cache:
|
||||
return False
|
||||
before = default_timer()
|
||||
before = perf_counter()
|
||||
key = cache_key(binding, self.request)
|
||||
cached_policy = cache.get(key, None)
|
||||
duration = max(default_timer() - before, 0)
|
||||
duration = max(perf_counter() - before, 0)
|
||||
if not cached_policy:
|
||||
return False
|
||||
self.logger.debug(
|
||||
|
@ -2,6 +2,8 @@
|
||||
|
||||
from authentik.blueprints.apps import ManagedAppConfig
|
||||
|
||||
CACHE_KEY_PREFIX = "goauthentik.io/policies/reputation/scores/"
|
||||
|
||||
|
||||
class AuthentikPolicyReputationConfig(ManagedAppConfig):
|
||||
"""Authentik reputation app config"""
|
||||
|
@ -19,7 +19,6 @@ from authentik.policies.types import PolicyRequest, PolicyResult
|
||||
from authentik.root.middleware import ClientIPMiddleware
|
||||
|
||||
LOGGER = get_logger()
|
||||
CACHE_KEY_PREFIX = "goauthentik.io/policies/reputation/scores/"
|
||||
|
||||
|
||||
def reputation_expiry():
|
||||
|
@ -8,7 +8,7 @@ from structlog.stdlib import get_logger
|
||||
|
||||
from authentik.core.signals import login_failed
|
||||
from authentik.lib.config import CONFIG
|
||||
from authentik.policies.reputation.models import CACHE_KEY_PREFIX
|
||||
from authentik.policies.reputation.apps import CACHE_KEY_PREFIX
|
||||
from authentik.policies.reputation.tasks import save_reputation
|
||||
from authentik.root.middleware import ClientIPMiddleware
|
||||
from authentik.stages.identification.signals import identification_failed
|
||||
|
@ -7,8 +7,8 @@ from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR
|
||||
from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR
|
||||
from authentik.events.models import TaskStatus
|
||||
from authentik.events.system_tasks import SystemTask, prefill_task
|
||||
from authentik.policies.reputation.apps import CACHE_KEY_PREFIX
|
||||
from authentik.policies.reputation.models import Reputation
|
||||
from authentik.policies.reputation.signals import CACHE_KEY_PREFIX
|
||||
from authentik.root.celery import CELERY_APP
|
||||
|
||||
LOGGER = get_logger()
|
||||
|
@ -6,7 +6,8 @@ from django.test import RequestFactory, TestCase
|
||||
from authentik.core.models import User
|
||||
from authentik.lib.generators import generate_id
|
||||
from authentik.policies.reputation.api import ReputationPolicySerializer
|
||||
from authentik.policies.reputation.models import CACHE_KEY_PREFIX, Reputation, ReputationPolicy
|
||||
from authentik.policies.reputation.apps import CACHE_KEY_PREFIX
|
||||
from authentik.policies.reputation.models import Reputation, ReputationPolicy
|
||||
from authentik.policies.reputation.tasks import save_reputation
|
||||
from authentik.policies.types import PolicyRequest
|
||||
from authentik.stages.password import BACKEND_INBUILT
|
||||
|
@ -91,13 +91,10 @@ def _get_startup_tasks_default_tenant() -> list[Callable]:
|
||||
def _get_startup_tasks_all_tenants() -> list[Callable]:
|
||||
"""Get all tasks to be run on startup for all tenants"""
|
||||
from authentik.admin.tasks import clear_update_notifications
|
||||
from authentik.outposts.tasks import outpost_connection_discovery, outpost_controller_all
|
||||
from authentik.providers.proxy.tasks import proxy_set_defaults
|
||||
|
||||
return [
|
||||
clear_update_notifications,
|
||||
outpost_connection_discovery,
|
||||
outpost_controller_all,
|
||||
proxy_set_defaults,
|
||||
]
|
||||
|
||||
|
@ -1,8 +1,7 @@
|
||||
"""Dynamically set SameSite depending if the upstream connection is TLS or not"""
|
||||
|
||||
from hashlib import sha512
|
||||
from time import time
|
||||
from timeit import default_timer
|
||||
from time import perf_counter, time
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
from django.conf import settings
|
||||
@ -294,14 +293,14 @@ class LoggingMiddleware:
|
||||
self.get_response = get_response
|
||||
|
||||
def __call__(self, request: HttpRequest) -> HttpResponse:
|
||||
start = default_timer()
|
||||
start = perf_counter()
|
||||
response = self.get_response(request)
|
||||
status_code = response.status_code
|
||||
kwargs = {
|
||||
"request_id": getattr(request, "request_id", None),
|
||||
}
|
||||
kwargs.update(getattr(response, "ak_context", {}))
|
||||
self.log(request, status_code, int((default_timer() - start) * 1000), **kwargs)
|
||||
self.log(request, status_code, int((perf_counter() - start) * 1000), **kwargs)
|
||||
return response
|
||||
|
||||
def log(self, request: HttpRequest, status_code: int, runtime: int, **kwargs):
|
||||
|
@ -2935,8 +2935,6 @@ paths:
|
||||
schema:
|
||||
$ref: '#/components/schemas/PolicyTestResult'
|
||||
description: ''
|
||||
'404':
|
||||
description: for_user user not found
|
||||
'400':
|
||||
content:
|
||||
application/json:
|
||||
@ -43573,17 +43571,14 @@ components:
|
||||
start_timestamp:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Timestamp when the task started
|
||||
readOnly: true
|
||||
finish_timestamp:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Timestamp when the task finished
|
||||
readOnly: true
|
||||
duration:
|
||||
type: number
|
||||
format: double
|
||||
description: Get the duration a task took to run
|
||||
readOnly: true
|
||||
status:
|
||||
$ref: '#/components/schemas/SystemTaskStatusEnum'
|
||||
@ -43963,6 +43958,7 @@ components:
|
||||
maxLength: 254
|
||||
avatar:
|
||||
type: string
|
||||
description: User's avatar, either a http/https URL or a data URI
|
||||
readOnly: true
|
||||
attributes:
|
||||
type: object
|
||||
@ -44634,6 +44630,7 @@ components:
|
||||
maxLength: 254
|
||||
avatar:
|
||||
type: string
|
||||
description: User's avatar, either a http/https URL or a data URI
|
||||
readOnly: true
|
||||
uid:
|
||||
type: string
|
||||
|
@ -110,7 +110,7 @@ export class SystemTaskListPage extends TablePage<SystemTask> {
|
||||
|
||||
row(item: SystemTask): TemplateResult[] {
|
||||
return [
|
||||
html`${item.name}${item.uid ? `:${item.uid}` : ""}`,
|
||||
html`<pre>${item.name}${item.uid ? `:${item.uid}` : ""}</pre>`,
|
||||
html`${item.description}`,
|
||||
html`<div>${getRelativeTime(item.finishTimestamp)}</div>
|
||||
<small>${item.finishTimestamp.toLocaleString()}</small>`,
|
||||
|
Reference in New Issue
Block a user