86 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			86 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Audit middleware"""
 | 
						|
from functools import partial
 | 
						|
from typing import Callable
 | 
						|
 | 
						|
from django.contrib.auth.models import User
 | 
						|
from django.db.models import Model
 | 
						|
from django.db.models.signals import post_save, pre_delete
 | 
						|
from django.http import HttpRequest, HttpResponse
 | 
						|
 | 
						|
from passbook.audit.models import Event, EventAction, model_to_dict
 | 
						|
from passbook.audit.signals import EventNewThread
 | 
						|
from passbook.core.middleware import LOCAL
 | 
						|
 | 
						|
 | 
						|
class AuditMiddleware:
 | 
						|
    """Register handlers for duration of request-response that log creation/update/deletion
 | 
						|
    of models"""
 | 
						|
 | 
						|
    get_response: Callable[[HttpRequest], HttpResponse]
 | 
						|
 | 
						|
    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
 | 
						|
        self.get_response = get_response
 | 
						|
 | 
						|
    def __call__(self, request: HttpRequest) -> HttpResponse:
 | 
						|
        # Connect signal for automatic logging
 | 
						|
        if hasattr(request, "user") and getattr(
 | 
						|
            request.user, "is_authenticated", False
 | 
						|
        ):
 | 
						|
            post_save_handler = partial(
 | 
						|
                self.post_save_handler, user=request.user, request=request
 | 
						|
            )
 | 
						|
            pre_delete_handler = partial(
 | 
						|
                self.pre_delete_handler, user=request.user, request=request
 | 
						|
            )
 | 
						|
            post_save.connect(
 | 
						|
                post_save_handler,
 | 
						|
                dispatch_uid=LOCAL.passbook["request_id"],
 | 
						|
                weak=False,
 | 
						|
            )
 | 
						|
            pre_delete.connect(
 | 
						|
                pre_delete_handler,
 | 
						|
                dispatch_uid=LOCAL.passbook["request_id"],
 | 
						|
                weak=False,
 | 
						|
            )
 | 
						|
 | 
						|
        response = self.get_response(request)
 | 
						|
 | 
						|
        post_save.disconnect(dispatch_uid=LOCAL.passbook["request_id"])
 | 
						|
        pre_delete.disconnect(dispatch_uid=LOCAL.passbook["request_id"])
 | 
						|
 | 
						|
        return response
 | 
						|
 | 
						|
    # pylint: disable=unused-argument
 | 
						|
    def process_exception(self, request: HttpRequest, exception: Exception):
 | 
						|
        """Unregister handlers in case of exception"""
 | 
						|
        post_save.disconnect(dispatch_uid=LOCAL.passbook["request_id"])
 | 
						|
        pre_delete.disconnect(dispatch_uid=LOCAL.passbook["request_id"])
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    # pylint: disable=unused-argument
 | 
						|
    def post_save_handler(
 | 
						|
        user: User, request: HttpRequest, sender, instance: Model, created: bool, **_
 | 
						|
    ):
 | 
						|
        """Signal handler for all object's post_save"""
 | 
						|
        if isinstance(instance, Event):
 | 
						|
            return
 | 
						|
 | 
						|
        action = EventAction.MODEL_CREATED if created else EventAction.MODEL_UPDATED
 | 
						|
        EventNewThread(action, request, user=user, model=model_to_dict(instance)).run()
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    # pylint: disable=unused-argument
 | 
						|
    def pre_delete_handler(
 | 
						|
        user: User, request: HttpRequest, sender, instance: Model, **_
 | 
						|
    ):
 | 
						|
        """Signal handler for all object's pre_delete"""
 | 
						|
        if isinstance(instance, Event):
 | 
						|
            return
 | 
						|
 | 
						|
        EventNewThread(
 | 
						|
            EventAction.MODEL_DELETED,
 | 
						|
            request,
 | 
						|
            user=user,
 | 
						|
            model=model_to_dict(instance),
 | 
						|
        ).run()
 |