audit: sanitize kwargs when creating audit event
This commit is contained in:
		@ -1,12 +1,13 @@
 | 
			
		||||
"""passbook audit models"""
 | 
			
		||||
from enum import Enum
 | 
			
		||||
from inspect import getmodule, stack
 | 
			
		||||
from typing import Optional
 | 
			
		||||
from typing import Optional, Dict, Any
 | 
			
		||||
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
from django.contrib.auth.models import AnonymousUser
 | 
			
		||||
from django.contrib.postgres.fields import JSONField
 | 
			
		||||
from django.core.exceptions import ValidationError
 | 
			
		||||
from django.contrib.contenttypes.models import ContentType
 | 
			
		||||
from django.db import models
 | 
			
		||||
from django.http import HttpRequest
 | 
			
		||||
from django.utils.translation import gettext as _
 | 
			
		||||
@ -19,6 +20,26 @@ from passbook.lib.utils.http import get_client_ip
 | 
			
		||||
LOGGER = get_logger()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def sanitize_dict(source: Dict[Any, Any]) -> Dict[Any, Any]:
 | 
			
		||||
    """clean source of all Models that would interfere with the JSONField.
 | 
			
		||||
    Models are replaced with a dictionary of {
 | 
			
		||||
        app: str,
 | 
			
		||||
        name: str,
 | 
			
		||||
        pk: Any
 | 
			
		||||
    }"""
 | 
			
		||||
    for key, value in source.items():
 | 
			
		||||
        if isinstance(value, dict):
 | 
			
		||||
            source[key] = sanitize_dict(value)
 | 
			
		||||
        elif isinstance(value, models.Model):
 | 
			
		||||
            model_content_type = ContentType.objects.get_for_model(value)
 | 
			
		||||
            source[key] = {
 | 
			
		||||
                "app": model_content_type.app_label,
 | 
			
		||||
                "name": model_content_type.model,
 | 
			
		||||
                "pk": value.pk,
 | 
			
		||||
            }
 | 
			
		||||
    return source
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class EventAction(Enum):
 | 
			
		||||
    """All possible actions to save into the audit log"""
 | 
			
		||||
 | 
			
		||||
@ -72,8 +93,9 @@ class Event(UUIDModel):
 | 
			
		||||
            )
 | 
			
		||||
        if not app:
 | 
			
		||||
            app = getmodule(stack()[_inspect_offset][0]).__name__
 | 
			
		||||
        event = Event(action=action.value, app=app, context=kwargs)
 | 
			
		||||
        LOGGER.debug("Created Audit event", action=action, context=kwargs)
 | 
			
		||||
        cleaned_kwargs = sanitize_dict(kwargs)
 | 
			
		||||
        event = Event(action=action.value, app=app, context=cleaned_kwargs)
 | 
			
		||||
        LOGGER.debug("Created Audit event", action=action, context=cleaned_kwargs)
 | 
			
		||||
        return event
 | 
			
		||||
 | 
			
		||||
    def from_http(
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										0
									
								
								passbook/audit/tests/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								passbook/audit/tests/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										16
									
								
								passbook/audit/tests/test_event.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										16
									
								
								passbook/audit/tests/test_event.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,16 @@
 | 
			
		||||
"""audit event tests"""
 | 
			
		||||
 | 
			
		||||
from django.test import TestCase
 | 
			
		||||
from guardian.shortcuts import get_anonymous_user
 | 
			
		||||
 | 
			
		||||
from passbook.audit.models import Event, EventAction
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestAuditEvent(TestCase):
 | 
			
		||||
    """Test Audit Event"""
 | 
			
		||||
 | 
			
		||||
    def test_new_with_model(self):
 | 
			
		||||
        """Create a new Event passing a model as kwarg"""
 | 
			
		||||
        event = Event.new(EventAction.CUSTOM, model=get_anonymous_user())
 | 
			
		||||
        event.save()
 | 
			
		||||
        self.assertIsNotNone(event.pk)
 | 
			
		||||
		Reference in New Issue
	
	Block a user