Compare commits
	
		
			1 Commits
		
	
	
		
			imports-fo
			...
			smusali/ev
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 617d913ca2 | 
@ -38,6 +38,7 @@ class EventSerializer(ModelSerializer):
 | 
			
		||||
            "created",
 | 
			
		||||
            "expires",
 | 
			
		||||
            "brand",
 | 
			
		||||
            "batch_id",
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -53,6 +53,9 @@ class NotificationTransportSerializer(ModelSerializer):
 | 
			
		||||
            "webhook_url",
 | 
			
		||||
            "webhook_mapping",
 | 
			
		||||
            "send_once",
 | 
			
		||||
            "enable_batching",
 | 
			
		||||
            "batch_timeout",
 | 
			
		||||
            "max_batch_size",
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -172,6 +172,63 @@ class EventManager(Manager):
 | 
			
		||||
        return self.get_queryset().get_events_per(time_since, extract, data_points)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class EventBatch(ExpiringModel):
 | 
			
		||||
    """Model to store information about batches of events."""
 | 
			
		||||
 | 
			
		||||
    batch_id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
 | 
			
		||||
    event_type = models.CharField(max_length=255)
 | 
			
		||||
    event_app = models.CharField(max_length=255)
 | 
			
		||||
    event_user = models.CharField(max_length=255)
 | 
			
		||||
    start_time = models.DateTimeField(auto_now_add=True)
 | 
			
		||||
    end_time = models.DateTimeField(null=True, blank=True)
 | 
			
		||||
    event_count = models.IntegerField(default=0)
 | 
			
		||||
    last_updated = models.DateTimeField(auto_now=True)
 | 
			
		||||
    max_batch_size = models.IntegerField(default=10)
 | 
			
		||||
    batch_timeout = models.IntegerField(default=60)  # Timeout in seconds
 | 
			
		||||
    sent = models.BooleanField(default=False)
 | 
			
		||||
 | 
			
		||||
    def add_event_to_batch(self, event):
 | 
			
		||||
        """Add an event to the batch and check if it's ready to send."""
 | 
			
		||||
        self.add_event(event)
 | 
			
		||||
        if self.check_batch_limits():
 | 
			
		||||
            self.process_batch()
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def get_or_create_batch(action, app, user):
 | 
			
		||||
        """Get or create a batch for a given action."""
 | 
			
		||||
        return EventBatch.objects.filter(
 | 
			
		||||
            event_type=action, event_app=app, event_user=user, end_time__isnull=True
 | 
			
		||||
        ).first() or EventBatch.objects.create(event_type=action, event_app=app, event_user=user)
 | 
			
		||||
 | 
			
		||||
    def check_batch_limits(self):
 | 
			
		||||
        """Check if the batch has reached its size or timeout limits."""
 | 
			
		||||
        time_elapsed = now() - self.start_time
 | 
			
		||||
        return self.event_count >= self.max_batch_size or time_elapsed >= timedelta(
 | 
			
		||||
            seconds=self.batch_timeout
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def add_event(self, event):
 | 
			
		||||
        """Add an event to the batch."""
 | 
			
		||||
        self.event_count += 1
 | 
			
		||||
        self.save()
 | 
			
		||||
 | 
			
		||||
    def create_batch_summary(self):
 | 
			
		||||
        """Create a summary message for the batch."""
 | 
			
		||||
        return f"Batched Event Summary: {self.event_type} action \
 | 
			
		||||
            on {self.event_app} app by {self.event_user} user \
 | 
			
		||||
            occurred {self.event_count} times between {self.start_time} and {now()}"
 | 
			
		||||
 | 
			
		||||
    def process_batch(self):
 | 
			
		||||
        """Process the batch and check if it's ready to send."""
 | 
			
		||||
        summary_message = self.create_batch_summary()
 | 
			
		||||
        return summary_message
 | 
			
		||||
        
 | 
			
		||||
    def send_notification(self):
 | 
			
		||||
        """Send notification for this batch."""
 | 
			
		||||
        # Implement the logic to send notification
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Event(SerializerModel, ExpiringModel):
 | 
			
		||||
    """An individual Audit/Metrics/Notification/Error Event"""
 | 
			
		||||
 | 
			
		||||
@ -187,6 +244,8 @@ class Event(SerializerModel, ExpiringModel):
 | 
			
		||||
    # Shadow the expires attribute from ExpiringModel to override the default duration
 | 
			
		||||
    expires = models.DateTimeField(default=default_event_duration)
 | 
			
		||||
 | 
			
		||||
    batch_id = models.UUIDField(null=True, blank=True)
 | 
			
		||||
 | 
			
		||||
    objects = EventManager()
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
@ -214,6 +273,7 @@ class Event(SerializerModel, ExpiringModel):
 | 
			
		||||
            # Also ensure that closest django app has the correct prefix
 | 
			
		||||
            if len(django_apps) > 0 and django_apps[0].startswith(app):
 | 
			
		||||
                app = django_apps[0]
 | 
			
		||||
 | 
			
		||||
        cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
 | 
			
		||||
        event = Event(action=action, app=app, context=cleaned_kwargs)
 | 
			
		||||
        return event
 | 
			
		||||
@ -275,6 +335,9 @@ class Event(SerializerModel, ExpiringModel):
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
    def save(self, *args, **kwargs):
 | 
			
		||||
        # Creating a batch for this event in the save method
 | 
			
		||||
        batch = EventBatch.get_or_create_batch(self.action, self.user, self.app)
 | 
			
		||||
        self.batch_id = batch.batch_id
 | 
			
		||||
        if self._state.adding:
 | 
			
		||||
            LOGGER.info(
 | 
			
		||||
                "Created Event",
 | 
			
		||||
@ -334,7 +397,17 @@ class NotificationTransport(SerializerModel):
 | 
			
		||||
        ),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    enable_batching = models.BooleanField(default=False)
 | 
			
		||||
    batch_timeout = models.IntegerField(default=60)  # Timeout in seconds
 | 
			
		||||
    max_batch_size = models.IntegerField(default=10)
 | 
			
		||||
 | 
			
		||||
    def send(self, notification: "Notification") -> list[str]:
 | 
			
		||||
        """Send a batched notification or a single notification"""
 | 
			
		||||
        if self.enable_batching:
 | 
			
		||||
            return self.process_batch(notification)
 | 
			
		||||
        return self.send_notification(notification)
 | 
			
		||||
 | 
			
		||||
    def send_notification(self, notification: "Notification") -> list[str]:
 | 
			
		||||
        """Send notification to user, called from async task"""
 | 
			
		||||
        if self.mode == TransportMode.LOCAL:
 | 
			
		||||
            return self.send_local(notification)
 | 
			
		||||
 | 
			
		||||
@ -3,11 +3,13 @@
 | 
			
		||||
from typing import Optional
 | 
			
		||||
 | 
			
		||||
from django.db.models.query_utils import Q
 | 
			
		||||
from django.utils import timezone
 | 
			
		||||
from guardian.shortcuts import get_anonymous_user
 | 
			
		||||
from structlog.stdlib import get_logger
 | 
			
		||||
 | 
			
		||||
from authentik.core.exceptions import PropertyMappingExpressionException
 | 
			
		||||
from authentik.core.models import User
 | 
			
		||||
from authentik.events.models import EventBatch  # Importing the EventBatch model
 | 
			
		||||
from authentik.events.models import (
 | 
			
		||||
    Event,
 | 
			
		||||
    Notification,
 | 
			
		||||
@ -17,6 +19,13 @@ from authentik.events.models import (
 | 
			
		||||
    TaskStatus,
 | 
			
		||||
)
 | 
			
		||||
from authentik.events.system_tasks import SystemTask, prefill_task
 | 
			
		||||
from authentik.events.monitored_tasks import (
 | 
			
		||||
    MonitoredTask,
 | 
			
		||||
    TaskResult,
 | 
			
		||||
    TaskResultStatus,
 | 
			
		||||
    shared_task,
 | 
			
		||||
)
 | 
			
		||||
>>>>>>> 5255207c5 (events/batch: add event batching mechanism [AUTH-134])
 | 
			
		||||
from authentik.policies.engine import PolicyEngine
 | 
			
		||||
from authentik.policies.models import PolicyBinding, PolicyEngineMode
 | 
			
		||||
from authentik.root.celery import CELERY_APP
 | 
			
		||||
@ -107,20 +116,44 @@ def notification_transport(
 | 
			
		||||
        event = Event.objects.filter(pk=event_pk).first()
 | 
			
		||||
        if not event:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        user = User.objects.filter(pk=user_pk).first()
 | 
			
		||||
        if not user:
 | 
			
		||||
            return
 | 
			
		||||
        trigger = NotificationRule.objects.filter(pk=trigger_pk).first()
 | 
			
		||||
        if not trigger:
 | 
			
		||||
            return
 | 
			
		||||
        notification = Notification(
 | 
			
		||||
            severity=trigger.severity, body=event.summary, event=event, user=user
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # Check if batching is enabled and process accordingly
 | 
			
		||||
        transport = NotificationTransport.objects.filter(pk=transport_pk).first()
 | 
			
		||||
<<<<<<< HEAD
 | 
			
		||||
        if not transport:
 | 
			
		||||
            return
 | 
			
		||||
        transport.send(notification)
 | 
			
		||||
        self.set_status(TaskStatus.SUCCESSFUL)
 | 
			
		||||
=======
 | 
			
		||||
        if transport and transport.enable_batching:
 | 
			
		||||
            # Process the event for batching
 | 
			
		||||
            batch = EventBatch.get_or_create_batch(event.action, event.app, event.user)
 | 
			
		||||
            batch.add_event_to_batch(event)
 | 
			
		||||
            # Check if the batch has reached its limits
 | 
			
		||||
            if not batch.check_batch_limits():
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            batch_summary = batch.process_batch()
 | 
			
		||||
            batch.delete()
 | 
			
		||||
            notification = Notification(
 | 
			
		||||
                severity=trigger.severity, body=batch_summary, event=event, user=user
 | 
			
		||||
            )
 | 
			
		||||
        else:
 | 
			
		||||
            notification = Notification(
 | 
			
		||||
                severity=trigger.severity, body=event.summary, event=event, user=user
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        transport.send_notification(notification)
 | 
			
		||||
 | 
			
		||||
        self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL))
 | 
			
		||||
>>>>>>> 5255207c5 (events/batch: add event batching mechanism [AUTH-134])
 | 
			
		||||
    except (NotificationTransportError, PropertyMappingExpressionException) as exc:
 | 
			
		||||
        self.set_error(exc)
 | 
			
		||||
        raise exc
 | 
			
		||||
@ -143,4 +176,21 @@ def notification_cleanup(self: SystemTask):
 | 
			
		||||
    for notification in notifications:
 | 
			
		||||
        notification.delete()
 | 
			
		||||
    LOGGER.debug("Expired notifications", amount=amount)
 | 
			
		||||
<<<<<<< HEAD
 | 
			
		||||
    self.set_status(TaskStatus.SUCCESSFUL, f"Expired {amount} Notifications")
 | 
			
		||||
=======
 | 
			
		||||
    self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, [f"Expired {amount} Notifications"]))
 | 
			
		||||
 | 
			
		||||
# Scheduled task to check and send pending batches
 | 
			
		||||
@CELERY_APP.task(base=MonitoredTask)
 | 
			
		||||
@shared_task
 | 
			
		||||
def check_and_send_pending_batches():
 | 
			
		||||
    """Check for pending batches that haven't been sent and have been idle for a specified time."""
 | 
			
		||||
    idle_time = timezone.now() - timedelta(minutes=10)  # Example idle time
 | 
			
		||||
    pending_batches = EventBatch.objects.filter(sent=False, last_updated__lt=idle_time)
 | 
			
		||||
    for batch in pending_batches:
 | 
			
		||||
        batch.send_notification()
 | 
			
		||||
        batch.sent = True
 | 
			
		||||
        batch.save()
 | 
			
		||||
 | 
			
		||||
>>>>>>> 5255207c5 (events/batch: add event batching mechanism [AUTH-134])
 | 
			
		||||
 | 
			
		||||
@ -2931,6 +2931,11 @@
 | 
			
		||||
                    "type": "object",
 | 
			
		||||
                    "additionalProperties": true,
 | 
			
		||||
                    "title": "Brand"
 | 
			
		||||
                },
 | 
			
		||||
                "batch_id": {
 | 
			
		||||
                    "type": "string",
 | 
			
		||||
                    "format": "uuid",
 | 
			
		||||
                    "title": "Batch id"
 | 
			
		||||
                }
 | 
			
		||||
            },
 | 
			
		||||
            "required": []
 | 
			
		||||
@ -2965,6 +2970,22 @@
 | 
			
		||||
                    "type": "boolean",
 | 
			
		||||
                    "title": "Send once",
 | 
			
		||||
                    "description": "Only send notification once, for example when sending a webhook into a chat channel."
 | 
			
		||||
                },
 | 
			
		||||
                "enable_batching": {
 | 
			
		||||
                    "type": "boolean",
 | 
			
		||||
                    "title": "Enable batching"
 | 
			
		||||
                },
 | 
			
		||||
                "batch_timeout": {
 | 
			
		||||
                    "type": "integer",
 | 
			
		||||
                    "minimum": -2147483648,
 | 
			
		||||
                    "maximum": 2147483647,
 | 
			
		||||
                    "title": "Batch timeout"
 | 
			
		||||
                },
 | 
			
		||||
                "max_batch_size": {
 | 
			
		||||
                    "type": "integer",
 | 
			
		||||
                    "minimum": -2147483648,
 | 
			
		||||
                    "maximum": 2147483647,
 | 
			
		||||
                    "title": "Max batch size"
 | 
			
		||||
                }
 | 
			
		||||
            },
 | 
			
		||||
            "required": []
 | 
			
		||||
@ -3040,6 +3061,11 @@
 | 
			
		||||
                            "type": "object",
 | 
			
		||||
                            "additionalProperties": true,
 | 
			
		||||
                            "title": "Brand"
 | 
			
		||||
                        },
 | 
			
		||||
                        "batch_id": {
 | 
			
		||||
                            "type": "string",
 | 
			
		||||
                            "format": "uuid",
 | 
			
		||||
                            "title": "Batch id"
 | 
			
		||||
                        }
 | 
			
		||||
                    },
 | 
			
		||||
                    "required": [
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										42
									
								
								schema.yml
									
									
									
									
									
								
							
							
						
						
									
										42
									
								
								schema.yml
									
									
									
									
									
								
							@ -32266,6 +32266,10 @@ components:
 | 
			
		||||
          type: string
 | 
			
		||||
          format: date-time
 | 
			
		||||
        brand: {}
 | 
			
		||||
        batch_id:
 | 
			
		||||
          type: string
 | 
			
		||||
          format: uuid
 | 
			
		||||
          nullable: true
 | 
			
		||||
      required:
 | 
			
		||||
      - action
 | 
			
		||||
      - app
 | 
			
		||||
@ -32763,6 +32767,10 @@ components:
 | 
			
		||||
          type: string
 | 
			
		||||
          format: date-time
 | 
			
		||||
        brand: {}
 | 
			
		||||
        batch_id:
 | 
			
		||||
          type: string
 | 
			
		||||
          format: uuid
 | 
			
		||||
          nullable: true
 | 
			
		||||
      required:
 | 
			
		||||
      - action
 | 
			
		||||
      - app
 | 
			
		||||
@ -35308,6 +35316,16 @@ components:
 | 
			
		||||
          type: boolean
 | 
			
		||||
          description: Only send notification once, for example when sending a webhook
 | 
			
		||||
            into a chat channel.
 | 
			
		||||
        enable_batching:
 | 
			
		||||
          type: boolean
 | 
			
		||||
        batch_timeout:
 | 
			
		||||
          type: integer
 | 
			
		||||
          maximum: 2147483647
 | 
			
		||||
          minimum: -2147483648
 | 
			
		||||
        max_batch_size:
 | 
			
		||||
          type: integer
 | 
			
		||||
          maximum: 2147483647
 | 
			
		||||
          minimum: -2147483648
 | 
			
		||||
      required:
 | 
			
		||||
      - mode_verbose
 | 
			
		||||
      - name
 | 
			
		||||
@ -35344,6 +35362,16 @@ components:
 | 
			
		||||
          type: boolean
 | 
			
		||||
          description: Only send notification once, for example when sending a webhook
 | 
			
		||||
            into a chat channel.
 | 
			
		||||
        enable_batching:
 | 
			
		||||
          type: boolean
 | 
			
		||||
        batch_timeout:
 | 
			
		||||
          type: integer
 | 
			
		||||
          maximum: 2147483647
 | 
			
		||||
          minimum: -2147483648
 | 
			
		||||
        max_batch_size:
 | 
			
		||||
          type: integer
 | 
			
		||||
          maximum: 2147483647
 | 
			
		||||
          minimum: -2147483648
 | 
			
		||||
      required:
 | 
			
		||||
      - name
 | 
			
		||||
    NotificationTransportTest:
 | 
			
		||||
@ -38388,6 +38416,10 @@ components:
 | 
			
		||||
          type: string
 | 
			
		||||
          format: date-time
 | 
			
		||||
        brand: {}
 | 
			
		||||
        batch_id:
 | 
			
		||||
          type: string
 | 
			
		||||
          format: uuid
 | 
			
		||||
          nullable: true
 | 
			
		||||
    PatchedExpressionPolicyRequest:
 | 
			
		||||
      type: object
 | 
			
		||||
      description: Group Membership Policy Serializer
 | 
			
		||||
@ -38905,6 +38937,16 @@ components:
 | 
			
		||||
          type: boolean
 | 
			
		||||
          description: Only send notification once, for example when sending a webhook
 | 
			
		||||
            into a chat channel.
 | 
			
		||||
        enable_batching:
 | 
			
		||||
          type: boolean
 | 
			
		||||
        batch_timeout:
 | 
			
		||||
          type: integer
 | 
			
		||||
          maximum: 2147483647
 | 
			
		||||
          minimum: -2147483648
 | 
			
		||||
        max_batch_size:
 | 
			
		||||
          type: integer
 | 
			
		||||
          maximum: 2147483647
 | 
			
		||||
          minimum: -2147483648
 | 
			
		||||
    PatchedNotificationWebhookMappingRequest:
 | 
			
		||||
      type: object
 | 
			
		||||
      description: NotificationWebhookMapping Serializer
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user