Compare commits

...

1 Commits

Author SHA1 Message Date
617d913ca2 events/batch: add event batching mechanism [AUTH-134] 2024-02-05 14:17:50 +01:00
6 changed files with 198 additions and 3 deletions

View File

@ -38,6 +38,7 @@ class EventSerializer(ModelSerializer):
"created",
"expires",
"brand",
"batch_id",
]

View File

@ -53,6 +53,9 @@ class NotificationTransportSerializer(ModelSerializer):
"webhook_url",
"webhook_mapping",
"send_once",
"enable_batching",
"batch_timeout",
"max_batch_size",
]

View File

@ -172,6 +172,63 @@ class EventManager(Manager):
return self.get_queryset().get_events_per(time_since, extract, data_points)
class EventBatch(ExpiringModel):
"""Model to store information about batches of events."""
batch_id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
event_type = models.CharField(max_length=255)
event_app = models.CharField(max_length=255)
event_user = models.CharField(max_length=255)
start_time = models.DateTimeField(auto_now_add=True)
end_time = models.DateTimeField(null=True, blank=True)
event_count = models.IntegerField(default=0)
last_updated = models.DateTimeField(auto_now=True)
max_batch_size = models.IntegerField(default=10)
batch_timeout = models.IntegerField(default=60) # Timeout in seconds
sent = models.BooleanField(default=False)
def add_event_to_batch(self, event):
"""Add an event to the batch and check if it's ready to send."""
self.add_event(event)
if self.check_batch_limits():
self.process_batch()
@staticmethod
def get_or_create_batch(action, app, user):
"""Get or create a batch for a given action."""
return EventBatch.objects.filter(
event_type=action, event_app=app, event_user=user, end_time__isnull=True
).first() or EventBatch.objects.create(event_type=action, event_app=app, event_user=user)
def check_batch_limits(self):
"""Check if the batch has reached its size or timeout limits."""
time_elapsed = now() - self.start_time
return self.event_count >= self.max_batch_size or time_elapsed >= timedelta(
seconds=self.batch_timeout
)
def add_event(self, event):
"""Add an event to the batch."""
self.event_count += 1
self.save()
def create_batch_summary(self):
"""Create a summary message for the batch."""
return f"Batched Event Summary: {self.event_type} action \
on {self.event_app} app by {self.event_user} user \
occurred {self.event_count} times between {self.start_time} and {now()}"
def process_batch(self):
"""Process the batch and check if it's ready to send."""
summary_message = self.create_batch_summary()
return summary_message
def send_notification(self):
"""Send notification for this batch."""
# Implement the logic to send notification
pass
class Event(SerializerModel, ExpiringModel):
"""An individual Audit/Metrics/Notification/Error Event"""
@ -187,6 +244,8 @@ class Event(SerializerModel, ExpiringModel):
# Shadow the expires attribute from ExpiringModel to override the default duration
expires = models.DateTimeField(default=default_event_duration)
batch_id = models.UUIDField(null=True, blank=True)
objects = EventManager()
@staticmethod
@ -214,6 +273,7 @@ class Event(SerializerModel, ExpiringModel):
# Also ensure that closest django app has the correct prefix
if len(django_apps) > 0 and django_apps[0].startswith(app):
app = django_apps[0]
cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
event = Event(action=action, app=app, context=cleaned_kwargs)
return event
@ -275,6 +335,9 @@ class Event(SerializerModel, ExpiringModel):
return self
def save(self, *args, **kwargs):
# Creating a batch for this event in the save method
batch = EventBatch.get_or_create_batch(self.action, self.user, self.app)
self.batch_id = batch.batch_id
if self._state.adding:
LOGGER.info(
"Created Event",
@ -334,7 +397,17 @@ class NotificationTransport(SerializerModel):
),
)
enable_batching = models.BooleanField(default=False)
batch_timeout = models.IntegerField(default=60) # Timeout in seconds
max_batch_size = models.IntegerField(default=10)
def send(self, notification: "Notification") -> list[str]:
"""Send a batched notification or a single notification"""
if self.enable_batching:
return self.process_batch(notification)
return self.send_notification(notification)
def send_notification(self, notification: "Notification") -> list[str]:
"""Send notification to user, called from async task"""
if self.mode == TransportMode.LOCAL:
return self.send_local(notification)

View File

@ -3,11 +3,13 @@
from typing import Optional
from django.db.models.query_utils import Q
from django.utils import timezone
from guardian.shortcuts import get_anonymous_user
from structlog.stdlib import get_logger
from authentik.core.exceptions import PropertyMappingExpressionException
from authentik.core.models import User
from authentik.events.models import EventBatch # Importing the EventBatch model
from authentik.events.models import (
Event,
Notification,
@ -17,6 +19,13 @@ from authentik.events.models import (
TaskStatus,
)
from authentik.events.system_tasks import SystemTask, prefill_task
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
shared_task,
)
>>>>>>> 5255207c5 (events/batch: add event batching mechanism [AUTH-134])
from authentik.policies.engine import PolicyEngine
from authentik.policies.models import PolicyBinding, PolicyEngineMode
from authentik.root.celery import CELERY_APP
@ -107,20 +116,44 @@ def notification_transport(
event = Event.objects.filter(pk=event_pk).first()
if not event:
return
user = User.objects.filter(pk=user_pk).first()
if not user:
return
trigger = NotificationRule.objects.filter(pk=trigger_pk).first()
if not trigger:
return
notification = Notification(
severity=trigger.severity, body=event.summary, event=event, user=user
)
# Check if batching is enabled and process accordingly
transport = NotificationTransport.objects.filter(pk=transport_pk).first()
<<<<<<< HEAD
if not transport:
return
transport.send(notification)
self.set_status(TaskStatus.SUCCESSFUL)
=======
if transport and transport.enable_batching:
# Process the event for batching
batch = EventBatch.get_or_create_batch(event.action, event.app, event.user)
batch.add_event_to_batch(event)
# Check if the batch has reached its limits
if not batch.check_batch_limits():
return
batch_summary = batch.process_batch()
batch.delete()
notification = Notification(
severity=trigger.severity, body=batch_summary, event=event, user=user
)
else:
notification = Notification(
severity=trigger.severity, body=event.summary, event=event, user=user
)
transport.send_notification(notification)
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL))
>>>>>>> 5255207c5 (events/batch: add event batching mechanism [AUTH-134])
except (NotificationTransportError, PropertyMappingExpressionException) as exc:
self.set_error(exc)
raise exc
@ -143,4 +176,21 @@ def notification_cleanup(self: SystemTask):
for notification in notifications:
notification.delete()
LOGGER.debug("Expired notifications", amount=amount)
<<<<<<< HEAD
self.set_status(TaskStatus.SUCCESSFUL, f"Expired {amount} Notifications")
=======
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, [f"Expired {amount} Notifications"]))
# Scheduled task to check and send pending batches
@CELERY_APP.task(base=MonitoredTask)
@shared_task
def check_and_send_pending_batches():
"""Check for pending batches that haven't been sent and have been idle for a specified time."""
idle_time = timezone.now() - timedelta(minutes=10) # Example idle time
pending_batches = EventBatch.objects.filter(sent=False, last_updated__lt=idle_time)
for batch in pending_batches:
batch.send_notification()
batch.sent = True
batch.save()
>>>>>>> 5255207c5 (events/batch: add event batching mechanism [AUTH-134])

View File

@ -2931,6 +2931,11 @@
"type": "object",
"additionalProperties": true,
"title": "Brand"
},
"batch_id": {
"type": "string",
"format": "uuid",
"title": "Batch id"
}
},
"required": []
@ -2965,6 +2970,22 @@
"type": "boolean",
"title": "Send once",
"description": "Only send notification once, for example when sending a webhook into a chat channel."
},
"enable_batching": {
"type": "boolean",
"title": "Enable batching"
},
"batch_timeout": {
"type": "integer",
"minimum": -2147483648,
"maximum": 2147483647,
"title": "Batch timeout"
},
"max_batch_size": {
"type": "integer",
"minimum": -2147483648,
"maximum": 2147483647,
"title": "Max batch size"
}
},
"required": []
@ -3040,6 +3061,11 @@
"type": "object",
"additionalProperties": true,
"title": "Brand"
},
"batch_id": {
"type": "string",
"format": "uuid",
"title": "Batch id"
}
},
"required": [

View File

@ -32266,6 +32266,10 @@ components:
type: string
format: date-time
brand: {}
batch_id:
type: string
format: uuid
nullable: true
required:
- action
- app
@ -32763,6 +32767,10 @@ components:
type: string
format: date-time
brand: {}
batch_id:
type: string
format: uuid
nullable: true
required:
- action
- app
@ -35308,6 +35316,16 @@ components:
type: boolean
description: Only send notification once, for example when sending a webhook
into a chat channel.
enable_batching:
type: boolean
batch_timeout:
type: integer
maximum: 2147483647
minimum: -2147483648
max_batch_size:
type: integer
maximum: 2147483647
minimum: -2147483648
required:
- mode_verbose
- name
@ -35344,6 +35362,16 @@ components:
type: boolean
description: Only send notification once, for example when sending a webhook
into a chat channel.
enable_batching:
type: boolean
batch_timeout:
type: integer
maximum: 2147483647
minimum: -2147483648
max_batch_size:
type: integer
maximum: 2147483647
minimum: -2147483648
required:
- name
NotificationTransportTest:
@ -38388,6 +38416,10 @@ components:
type: string
format: date-time
brand: {}
batch_id:
type: string
format: uuid
nullable: true
PatchedExpressionPolicyRequest:
type: object
description: Group Membership Policy Serializer
@ -38905,6 +38937,16 @@ components:
type: boolean
description: Only send notification once, for example when sending a webhook
into a chat channel.
enable_batching:
type: boolean
batch_timeout:
type: integer
maximum: 2147483647
minimum: -2147483648
max_batch_size:
type: integer
maximum: 2147483647
minimum: -2147483648
PatchedNotificationWebhookMappingRequest:
type: object
description: NotificationWebhookMapping Serializer