Files
authentik/authentik/events/middleware.py
Jens Langhammer c7dcf92a2e api: add tests for swagger generation
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2021-03-30 15:23:37 +02:00

90 lines
3.2 KiB
Python

"""Events middleware"""
from functools import partial
from typing import Callable
from django.db.models import Model
from django.db.models.signals import post_save, pre_delete
from django.http import HttpRequest, HttpResponse
from guardian.models import UserObjectPermission
from authentik.core.middleware import LOCAL
from authentik.core.models import User
from authentik.events.models import Event, EventAction, Notification
from authentik.events.signals import EventNewThread
from authentik.events.utils import model_to_dict
class AuditMiddleware:
"""Register handlers for duration of request-response that log creation/update/deletion
of models"""
get_response: Callable[[HttpRequest], HttpResponse]
def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
self.get_response = get_response
def __call__(self, request: HttpRequest) -> HttpResponse:
# Connect signal for automatic logging
if hasattr(request, "user") and getattr(
request.user, "is_authenticated", False
):
post_save_handler = partial(
self.post_save_handler, user=request.user, request=request
)
pre_delete_handler = partial(
self.pre_delete_handler, user=request.user, request=request
)
post_save.connect(
post_save_handler,
dispatch_uid=LOCAL.authentik["request_id"],
weak=False,
)
pre_delete.connect(
pre_delete_handler,
dispatch_uid=LOCAL.authentik["request_id"],
weak=False,
)
response = self.get_response(request)
post_save.disconnect(dispatch_uid=LOCAL.authentik["request_id"])
pre_delete.disconnect(dispatch_uid=LOCAL.authentik["request_id"])
return response
# pylint: disable=unused-argument
def process_exception(self, request: HttpRequest, exception: Exception):
"""Unregister handlers in case of exception"""
post_save.disconnect(dispatch_uid=LOCAL.authentik["request_id"])
pre_delete.disconnect(dispatch_uid=LOCAL.authentik["request_id"])
@staticmethod
# pylint: disable=unused-argument
def post_save_handler(
user: User, request: HttpRequest, sender, instance: Model, created: bool, **_
):
"""Signal handler for all object's post_save"""
if isinstance(instance, (Event, Notification, UserObjectPermission)):
return
action = EventAction.MODEL_CREATED if created else EventAction.MODEL_UPDATED
EventNewThread(action, request, user=user, model=model_to_dict(instance)).run()
@staticmethod
# pylint: disable=unused-argument
def pre_delete_handler(
user: User, request: HttpRequest, sender, instance: Model, **_
):
"""Signal handler for all object's pre_delete"""
if isinstance(
instance, (Event, Notification, UserObjectPermission)
): # pragma: no cover
return
EventNewThread(
EventAction.MODEL_DELETED,
request,
user=user,
model=model_to_dict(instance),
).run()