199 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			199 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""passbook audit models"""
 | 
						|
from inspect import getmodule, stack
 | 
						|
from typing import Any, Dict, Optional, Union
 | 
						|
from uuid import UUID, uuid4
 | 
						|
 | 
						|
from django.conf import settings
 | 
						|
from django.contrib.auth.models import AnonymousUser
 | 
						|
from django.core.exceptions import ValidationError
 | 
						|
from django.db import models
 | 
						|
from django.db.models.base import Model
 | 
						|
from django.http import HttpRequest
 | 
						|
from django.utils.translation import gettext as _
 | 
						|
from django.views.debug import SafeExceptionReporterFilter
 | 
						|
from guardian.utils import get_anonymous_user
 | 
						|
from structlog import get_logger
 | 
						|
 | 
						|
from passbook.core.middleware import (
 | 
						|
    SESSION_IMPERSONATE_ORIGINAL_USER,
 | 
						|
    SESSION_IMPERSONATE_USER,
 | 
						|
)
 | 
						|
from passbook.core.models import User
 | 
						|
from passbook.lib.utils.http import get_client_ip
 | 
						|
 | 
						|
LOGGER = get_logger("passbook.audit")
 | 
						|
 | 
						|
 | 
						|
def cleanse_dict(source: Dict[Any, Any]) -> Dict[Any, Any]:
 | 
						|
    """Cleanse a dictionary, recursively"""
 | 
						|
    final_dict = {}
 | 
						|
    for key, value in source.items():
 | 
						|
        try:
 | 
						|
            if SafeExceptionReporterFilter.hidden_settings.search(key):
 | 
						|
                final_dict[key] = SafeExceptionReporterFilter.cleansed_substitute
 | 
						|
            else:
 | 
						|
                final_dict[key] = value
 | 
						|
        except TypeError:
 | 
						|
            final_dict[key] = value
 | 
						|
        if isinstance(value, dict):
 | 
						|
            final_dict[key] = cleanse_dict(value)
 | 
						|
    return final_dict
 | 
						|
 | 
						|
 | 
						|
def model_to_dict(model: Model) -> Dict[str, Any]:
 | 
						|
    """Convert model to dict"""
 | 
						|
    name = str(model)
 | 
						|
    if hasattr(model, "name"):
 | 
						|
        name = model.name
 | 
						|
    return {
 | 
						|
        "app": model._meta.app_label,
 | 
						|
        "model_name": model._meta.model_name,
 | 
						|
        "pk": model.pk,
 | 
						|
        "name": name,
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
def get_user(user: User, original_user: Optional[User] = None) -> Dict[str, Any]:
 | 
						|
    """Convert user object to dictionary, optionally including the original user"""
 | 
						|
    if isinstance(user, AnonymousUser):
 | 
						|
        user = get_anonymous_user()
 | 
						|
    user_data = {
 | 
						|
        "username": user.username,
 | 
						|
        "pk": user.pk,
 | 
						|
        "email": user.email,
 | 
						|
    }
 | 
						|
    if original_user:
 | 
						|
        original_data = get_user(original_user)
 | 
						|
        original_data["on_behalf_of"] = user_data
 | 
						|
        return original_data
 | 
						|
    return user_data
 | 
						|
 | 
						|
 | 
						|
def sanitize_dict(source: Dict[Any, Any]) -> Dict[Any, Any]:
 | 
						|
    """clean source of all Models that would interfere with the JSONField.
 | 
						|
    Models are replaced with a dictionary of {
 | 
						|
        app: str,
 | 
						|
        name: str,
 | 
						|
        pk: Any
 | 
						|
    }"""
 | 
						|
    final_dict = {}
 | 
						|
    for key, value in source.items():
 | 
						|
        if isinstance(value, dict):
 | 
						|
            final_dict[key] = sanitize_dict(value)
 | 
						|
        elif isinstance(value, models.Model):
 | 
						|
            final_dict[key] = sanitize_dict(model_to_dict(value))
 | 
						|
        elif isinstance(value, UUID):
 | 
						|
            final_dict[key] = value.hex
 | 
						|
        else:
 | 
						|
            final_dict[key] = value
 | 
						|
    return final_dict
 | 
						|
 | 
						|
 | 
						|
class EventAction(models.TextChoices):
 | 
						|
    """All possible actions to save into the audit log"""
 | 
						|
 | 
						|
    LOGIN = "login"
 | 
						|
    LOGIN_FAILED = "login_failed"
 | 
						|
    LOGOUT = "logout"
 | 
						|
 | 
						|
    SIGN_UP = "sign_up"
 | 
						|
    AUTHORIZE_APPLICATION = "authorize_application"
 | 
						|
    SUSPICIOUS_REQUEST = "suspicious_request"
 | 
						|
    PASSWORD_SET = "password_set"  # noqa # nosec
 | 
						|
 | 
						|
    INVITE_CREATED = "invitation_created"
 | 
						|
    INVITE_USED = "invitation_used"
 | 
						|
 | 
						|
    SOURCE_LINKED = "source_linked"
 | 
						|
 | 
						|
    IMPERSONATION_STARTED = "impersonation_started"
 | 
						|
    IMPERSONATION_ENDED = "impersonation_ended"
 | 
						|
 | 
						|
    MODEL_CREATED = "model_created"
 | 
						|
    MODEL_UPDATED = "model_updated"
 | 
						|
    MODEL_DELETED = "model_deleted"
 | 
						|
 | 
						|
    CUSTOM_PREFIX = "custom_"
 | 
						|
 | 
						|
 | 
						|
class Event(models.Model):
 | 
						|
    """An individual audit log event"""
 | 
						|
 | 
						|
    event_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
 | 
						|
    user = models.JSONField(default=dict)
 | 
						|
    action = models.TextField(choices=EventAction.choices)
 | 
						|
    date = models.DateTimeField(auto_now_add=True)
 | 
						|
    app = models.TextField()
 | 
						|
    context = models.JSONField(default=dict, blank=True)
 | 
						|
    client_ip = models.GenericIPAddressField(null=True)
 | 
						|
    created = models.DateTimeField(auto_now_add=True)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _get_app_from_request(request: HttpRequest) -> str:
 | 
						|
        if not isinstance(request, HttpRequest):
 | 
						|
            return ""
 | 
						|
        return request.resolver_match.app_name
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def new(
 | 
						|
        action: Union[str, EventAction],
 | 
						|
        app: Optional[str] = None,
 | 
						|
        _inspect_offset: int = 1,
 | 
						|
        **kwargs,
 | 
						|
    ) -> "Event":
 | 
						|
        """Create new Event instance from arguments. Instance is NOT saved."""
 | 
						|
        if not isinstance(action, EventAction):
 | 
						|
            action = EventAction.CUSTOM_PREFIX + action
 | 
						|
        if not app:
 | 
						|
            app = getmodule(stack()[_inspect_offset][0]).__name__
 | 
						|
        cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
 | 
						|
        event = Event(action=action, app=app, context=cleaned_kwargs)
 | 
						|
        return event
 | 
						|
 | 
						|
    def from_http(
 | 
						|
        self, request: HttpRequest, user: Optional[settings.AUTH_USER_MODEL] = None
 | 
						|
    ) -> "Event":
 | 
						|
        """Add data from a Django-HttpRequest, allowing the creation of
 | 
						|
        Events independently from requests.
 | 
						|
        `user` arguments optionally overrides user from requests."""
 | 
						|
        if hasattr(request, "user"):
 | 
						|
            self.user = get_user(
 | 
						|
                request.user,
 | 
						|
                request.session.get(SESSION_IMPERSONATE_ORIGINAL_USER, None),
 | 
						|
            )
 | 
						|
        if user:
 | 
						|
            self.user = get_user(user)
 | 
						|
        # Check if we're currently impersonating, and add that user
 | 
						|
        if hasattr(request, "session"):
 | 
						|
            if SESSION_IMPERSONATE_ORIGINAL_USER in request.session:
 | 
						|
                self.user = get_user(request.session[SESSION_IMPERSONATE_ORIGINAL_USER])
 | 
						|
                self.user["on_behalf_of"] = get_user(
 | 
						|
                    request.session[SESSION_IMPERSONATE_USER]
 | 
						|
                )
 | 
						|
        # User 255.255.255.255 as fallback if IP cannot be determined
 | 
						|
        self.client_ip = get_client_ip(request) or "255.255.255.255"
 | 
						|
        # If there's no app set, we get it from the requests too
 | 
						|
        if not self.app:
 | 
						|
            self.app = Event._get_app_from_request(request)
 | 
						|
        self.save()
 | 
						|
        return self
 | 
						|
 | 
						|
    def save(self, *args, **kwargs):
 | 
						|
        if not self._state.adding:
 | 
						|
            raise ValidationError(
 | 
						|
                "you may not edit an existing %s" % self._meta.model_name
 | 
						|
            )
 | 
						|
        LOGGER.debug(
 | 
						|
            "Created Audit event",
 | 
						|
            action=self.action,
 | 
						|
            context=self.context,
 | 
						|
            client_ip=self.client_ip,
 | 
						|
            user=self.user,
 | 
						|
        )
 | 
						|
        return super().save(*args, **kwargs)
 | 
						|
 | 
						|
    class Meta:
 | 
						|
 | 
						|
        verbose_name = _("Audit Event")
 | 
						|
        verbose_name_plural = _("Audit Events")
 |