
* init Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix some other stuff Signed-off-by: Jens Langhammer <jens@goauthentik.io> * more progress Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix missing format Signed-off-by: Jens Langhammer <jens@goauthentik.io> * make it work, send verification event Signed-off-by: Jens Langhammer <jens@goauthentik.io> * progress Signed-off-by: Jens Langhammer <jens@goauthentik.io> * more progress Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix Signed-off-by: Jens Langhammer <jens@goauthentik.io> * save iss Signed-off-by: Jens Langhammer <jens@goauthentik.io> * add signals for MFA devices Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix tests Signed-off-by: Jens Langhammer <jens@goauthentik.io> * refactor more Signed-off-by: Jens Langhammer <jens@goauthentik.io> * re-work auth Signed-off-by: Jens Langhammer <jens@goauthentik.io> * add API to list ssf streams Signed-off-by: Jens Langhammer <jens@goauthentik.io> * start rbac Signed-off-by: Jens Langhammer <jens@goauthentik.io> * add ssf icon Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix web Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix bugs Signed-off-by: Jens Langhammer <jens@goauthentik.io> * make events expire, rewrite sending logic Signed-off-by: Jens Langhammer <jens@goauthentik.io> * add oidc token test Signed-off-by: Jens Langhammer <jens@goauthentik.io> * add stream list Signed-off-by: Jens Langhammer <jens@goauthentik.io> * add jwks tests and fixes Signed-off-by: Jens Langhammer <jens@goauthentik.io> * update web ui Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix configuration endpoint Signed-off-by: Jens Langhammer <jens@goauthentik.io> * replace port number correctly Signed-off-by: Jens Langhammer <jens@goauthentik.io> * better log what went wrong Signed-off-by: Jens Langhammer <jens@goauthentik.io> * linter has opinions Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix messages Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix set status Signed-off-by: Jens Langhammer <jens@goauthentik.io> * more debug logging Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix issuer here too Signed-off-by: Jens Langhammer <jens@goauthentik.io> * remove port :443...removal apparently apple's HTTP logic is wrong and includes the port in the Host header even if the default port is used (80 or 443), which then fails as the URL doesn't exactly match what the admin configured...so instead of trying to add magic about this we'll add it in the docs Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix error when no request in context Signed-off-by: Jens Langhammer <jens@goauthentik.io> * add signal for admin session revoke Signed-off-by: Jens Langhammer <jens@goauthentik.io> * set txn based on request id Signed-off-by: Jens Langhammer <jens@goauthentik.io> * validate method and endpoint url Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix request ID detection Signed-off-by: Jens Langhammer <jens@goauthentik.io> * add timestamp Signed-off-by: Jens Langhammer <jens@goauthentik.io> * temp migration Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix signal Signed-off-by: Jens Langhammer <jens@goauthentik.io> * add signal tests Signed-off-by: Jens Langhammer <jens@goauthentik.io> * the final commit Signed-off-by: Jens Langhammer <jens@goauthentik.io> * ok actually the last commit Signed-off-by: Jens Langhammer <jens@goauthentik.io> --------- Signed-off-by: Jens Langhammer <jens@goauthentik.io>
157 lines
5.6 KiB
Python
157 lines
5.6 KiB
Python
"""Monitored tasks"""
|
|
|
|
from datetime import datetime, timedelta
|
|
from time import perf_counter
|
|
from typing import Any
|
|
|
|
from django.utils.timezone import now
|
|
from django.utils.translation import gettext_lazy as _
|
|
from structlog.stdlib import BoundLogger, get_logger
|
|
from tenant_schemas_celery.task import TenantTask
|
|
|
|
from authentik.events.logs import LogEvent
|
|
from authentik.events.models import Event, EventAction, TaskStatus
|
|
from authentik.events.models import SystemTask as DBSystemTask
|
|
from authentik.events.utils import sanitize_item
|
|
from authentik.lib.utils.errors import exception_to_string
|
|
|
|
|
|
class SystemTask(TenantTask):
|
|
"""Task which can save its state to the cache"""
|
|
|
|
logger: BoundLogger
|
|
|
|
# For tasks that should only be listed if they failed, set this to False
|
|
save_on_success: bool
|
|
|
|
_status: TaskStatus
|
|
_messages: list[LogEvent]
|
|
|
|
_uid: str | None
|
|
# Precise start time from perf_counter
|
|
_start_precise: float | None = None
|
|
_start: datetime | None = None
|
|
|
|
def __init__(self, *args, **kwargs) -> None:
|
|
super().__init__(*args, **kwargs)
|
|
self._status = TaskStatus.SUCCESSFUL
|
|
self.save_on_success = True
|
|
self._uid = None
|
|
self._status = None
|
|
self._messages = []
|
|
self.result_timeout_hours = 6
|
|
|
|
def set_uid(self, uid: str):
|
|
"""Set UID, so in the case of an unexpected error its saved correctly"""
|
|
self._uid = uid
|
|
|
|
def set_status(self, status: TaskStatus, *messages: LogEvent):
|
|
"""Set result for current run, will overwrite previous result."""
|
|
self._status = status
|
|
self._messages = list(messages)
|
|
for idx, msg in enumerate(self._messages):
|
|
if not isinstance(msg, LogEvent):
|
|
self._messages[idx] = LogEvent(msg, logger=self.__name__, log_level="info")
|
|
|
|
def set_error(self, exception: Exception, *messages: LogEvent):
|
|
"""Set result to error and save exception"""
|
|
self._status = TaskStatus.ERROR
|
|
self._messages = list(messages)
|
|
self._messages.extend(
|
|
[LogEvent(exception_to_string(exception), logger=self.__name__, log_level="error")]
|
|
)
|
|
|
|
def before_start(self, task_id, args, kwargs):
|
|
self._start_precise = perf_counter()
|
|
self._start = now()
|
|
self.logger = get_logger().bind(task_id=task_id)
|
|
return super().before_start(task_id, args, kwargs)
|
|
|
|
def db(self) -> DBSystemTask | None:
|
|
"""Get DB object for latest task"""
|
|
return DBSystemTask.objects.filter(
|
|
name=self.__name__,
|
|
uid=self._uid,
|
|
).first()
|
|
|
|
def after_return(self, status, retval, task_id, args: list[Any], kwargs: dict[str, Any], einfo):
|
|
super().after_return(status, retval, task_id, args, kwargs, einfo=einfo)
|
|
if not self._status:
|
|
return
|
|
if self._status == TaskStatus.SUCCESSFUL and not self.save_on_success:
|
|
DBSystemTask.objects.filter(
|
|
name=self.__name__,
|
|
uid=self._uid,
|
|
).delete()
|
|
return
|
|
DBSystemTask.objects.update_or_create(
|
|
name=self.__name__,
|
|
uid=self._uid,
|
|
defaults={
|
|
"description": self.__doc__,
|
|
"start_timestamp": self._start or now(),
|
|
"finish_timestamp": now(),
|
|
"duration": max(perf_counter() - self._start_precise, 0),
|
|
"task_call_module": self.__module__,
|
|
"task_call_func": self.__name__,
|
|
"task_call_args": sanitize_item(args),
|
|
"task_call_kwargs": sanitize_item(kwargs),
|
|
"status": self._status,
|
|
"messages": sanitize_item(self._messages),
|
|
"expires": now() + timedelta(hours=self.result_timeout_hours),
|
|
"expiring": True,
|
|
},
|
|
)
|
|
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
|
super().on_failure(exc, task_id, args, kwargs, einfo=einfo)
|
|
if not self._status:
|
|
self.set_error(exc)
|
|
DBSystemTask.objects.update_or_create(
|
|
name=self.__name__,
|
|
uid=self._uid,
|
|
defaults={
|
|
"description": self.__doc__,
|
|
"start_timestamp": self._start or now(),
|
|
"finish_timestamp": now(),
|
|
"duration": max(perf_counter() - self._start_precise, 0),
|
|
"task_call_module": self.__module__,
|
|
"task_call_func": self.__name__,
|
|
"task_call_args": sanitize_item(args),
|
|
"task_call_kwargs": sanitize_item(kwargs),
|
|
"status": self._status,
|
|
"messages": sanitize_item(self._messages),
|
|
"expires": now() + timedelta(hours=self.result_timeout_hours + 3),
|
|
"expiring": True,
|
|
},
|
|
)
|
|
Event.new(
|
|
EventAction.SYSTEM_TASK_EXCEPTION,
|
|
message=f"Task {self.__name__} encountered an error: {exception_to_string(exc)}",
|
|
).save()
|
|
|
|
def run(self, *args, **kwargs):
|
|
raise NotImplementedError
|
|
|
|
|
|
def prefill_task(func):
|
|
"""Ensure a task's details are always in cache, so it can always be triggered via API"""
|
|
_prefill_tasks.append(
|
|
DBSystemTask(
|
|
name=func.__name__,
|
|
description=func.__doc__,
|
|
start_timestamp=now(),
|
|
finish_timestamp=now(),
|
|
status=TaskStatus.UNKNOWN,
|
|
messages=sanitize_item([_("Task has not been run yet.")]),
|
|
task_call_module=func.__module__,
|
|
task_call_func=func.__name__,
|
|
expiring=False,
|
|
duration=0,
|
|
)
|
|
)
|
|
return func
|
|
|
|
|
|
_prefill_tasks = []
|