Compare commits
	
		
			1 Commits
		
	
	
		
			website/do
			...
			smusali/ev
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 617d913ca2 | 
| @ -38,6 +38,7 @@ class EventSerializer(ModelSerializer): | ||||
|             "created", | ||||
|             "expires", | ||||
|             "brand", | ||||
|             "batch_id", | ||||
|         ] | ||||
|  | ||||
|  | ||||
|  | ||||
| @ -53,6 +53,9 @@ class NotificationTransportSerializer(ModelSerializer): | ||||
|             "webhook_url", | ||||
|             "webhook_mapping", | ||||
|             "send_once", | ||||
|             "enable_batching", | ||||
|             "batch_timeout", | ||||
|             "max_batch_size", | ||||
|         ] | ||||
|  | ||||
|  | ||||
|  | ||||
| @ -172,6 +172,63 @@ class EventManager(Manager): | ||||
|         return self.get_queryset().get_events_per(time_since, extract, data_points) | ||||
|  | ||||
|  | ||||
| class EventBatch(ExpiringModel): | ||||
|     """Model to store information about batches of events.""" | ||||
|  | ||||
|     batch_id = models.UUIDField(primary_key=True, default=uuid4, editable=False) | ||||
|     event_type = models.CharField(max_length=255) | ||||
|     event_app = models.CharField(max_length=255) | ||||
|     event_user = models.CharField(max_length=255) | ||||
|     start_time = models.DateTimeField(auto_now_add=True) | ||||
|     end_time = models.DateTimeField(null=True, blank=True) | ||||
|     event_count = models.IntegerField(default=0) | ||||
|     last_updated = models.DateTimeField(auto_now=True) | ||||
|     max_batch_size = models.IntegerField(default=10) | ||||
|     batch_timeout = models.IntegerField(default=60)  # Timeout in seconds | ||||
|     sent = models.BooleanField(default=False) | ||||
|  | ||||
|     def add_event_to_batch(self, event): | ||||
|         """Add an event to the batch and check if it's ready to send.""" | ||||
|         self.add_event(event) | ||||
|         if self.check_batch_limits(): | ||||
|             self.process_batch() | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_or_create_batch(action, app, user): | ||||
|         """Get or create a batch for a given action.""" | ||||
|         return EventBatch.objects.filter( | ||||
|             event_type=action, event_app=app, event_user=user, end_time__isnull=True | ||||
|         ).first() or EventBatch.objects.create(event_type=action, event_app=app, event_user=user) | ||||
|  | ||||
|     def check_batch_limits(self): | ||||
|         """Check if the batch has reached its size or timeout limits.""" | ||||
|         time_elapsed = now() - self.start_time | ||||
|         return self.event_count >= self.max_batch_size or time_elapsed >= timedelta( | ||||
|             seconds=self.batch_timeout | ||||
|         ) | ||||
|  | ||||
|     def add_event(self, event): | ||||
|         """Add an event to the batch.""" | ||||
|         self.event_count += 1 | ||||
|         self.save() | ||||
|  | ||||
|     def create_batch_summary(self): | ||||
|         """Create a summary message for the batch.""" | ||||
|         return f"Batched Event Summary: {self.event_type} action \ | ||||
|             on {self.event_app} app by {self.event_user} user \ | ||||
|             occurred {self.event_count} times between {self.start_time} and {now()}" | ||||
|  | ||||
|     def process_batch(self): | ||||
|         """Process the batch and check if it's ready to send.""" | ||||
|         summary_message = self.create_batch_summary() | ||||
|         return summary_message | ||||
|          | ||||
|     def send_notification(self): | ||||
|         """Send notification for this batch.""" | ||||
|         # Implement the logic to send notification | ||||
|         pass | ||||
|  | ||||
|  | ||||
| class Event(SerializerModel, ExpiringModel): | ||||
|     """An individual Audit/Metrics/Notification/Error Event""" | ||||
|  | ||||
| @ -187,6 +244,8 @@ class Event(SerializerModel, ExpiringModel): | ||||
|     # Shadow the expires attribute from ExpiringModel to override the default duration | ||||
|     expires = models.DateTimeField(default=default_event_duration) | ||||
|  | ||||
|     batch_id = models.UUIDField(null=True, blank=True) | ||||
|  | ||||
|     objects = EventManager() | ||||
|  | ||||
|     @staticmethod | ||||
| @ -214,6 +273,7 @@ class Event(SerializerModel, ExpiringModel): | ||||
|             # Also ensure that closest django app has the correct prefix | ||||
|             if len(django_apps) > 0 and django_apps[0].startswith(app): | ||||
|                 app = django_apps[0] | ||||
|  | ||||
|         cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs)) | ||||
|         event = Event(action=action, app=app, context=cleaned_kwargs) | ||||
|         return event | ||||
| @ -275,6 +335,9 @@ class Event(SerializerModel, ExpiringModel): | ||||
|         return self | ||||
|  | ||||
|     def save(self, *args, **kwargs): | ||||
|         # Creating a batch for this event in the save method | ||||
|         batch = EventBatch.get_or_create_batch(self.action, self.user, self.app) | ||||
|         self.batch_id = batch.batch_id | ||||
|         if self._state.adding: | ||||
|             LOGGER.info( | ||||
|                 "Created Event", | ||||
| @ -334,7 +397,17 @@ class NotificationTransport(SerializerModel): | ||||
|         ), | ||||
|     ) | ||||
|  | ||||
|     enable_batching = models.BooleanField(default=False) | ||||
|     batch_timeout = models.IntegerField(default=60)  # Timeout in seconds | ||||
|     max_batch_size = models.IntegerField(default=10) | ||||
|  | ||||
|     def send(self, notification: "Notification") -> list[str]: | ||||
|         """Send a batched notification or a single notification""" | ||||
|         if self.enable_batching: | ||||
|             return self.process_batch(notification) | ||||
|         return self.send_notification(notification) | ||||
|  | ||||
|     def send_notification(self, notification: "Notification") -> list[str]: | ||||
|         """Send notification to user, called from async task""" | ||||
|         if self.mode == TransportMode.LOCAL: | ||||
|             return self.send_local(notification) | ||||
|  | ||||
| @ -3,11 +3,13 @@ | ||||
| from typing import Optional | ||||
|  | ||||
| from django.db.models.query_utils import Q | ||||
| from django.utils import timezone | ||||
| from guardian.shortcuts import get_anonymous_user | ||||
| from structlog.stdlib import get_logger | ||||
|  | ||||
| from authentik.core.exceptions import PropertyMappingExpressionException | ||||
| from authentik.core.models import User | ||||
| from authentik.events.models import EventBatch  # Importing the EventBatch model | ||||
| from authentik.events.models import ( | ||||
|     Event, | ||||
|     Notification, | ||||
| @ -17,6 +19,13 @@ from authentik.events.models import ( | ||||
|     TaskStatus, | ||||
| ) | ||||
| from authentik.events.system_tasks import SystemTask, prefill_task | ||||
| from authentik.events.monitored_tasks import ( | ||||
|     MonitoredTask, | ||||
|     TaskResult, | ||||
|     TaskResultStatus, | ||||
|     shared_task, | ||||
| ) | ||||
| >>>>>>> 5255207c5 (events/batch: add event batching mechanism [AUTH-134]) | ||||
| from authentik.policies.engine import PolicyEngine | ||||
| from authentik.policies.models import PolicyBinding, PolicyEngineMode | ||||
| from authentik.root.celery import CELERY_APP | ||||
| @ -107,20 +116,44 @@ def notification_transport( | ||||
|         event = Event.objects.filter(pk=event_pk).first() | ||||
|         if not event: | ||||
|             return | ||||
|  | ||||
|         user = User.objects.filter(pk=user_pk).first() | ||||
|         if not user: | ||||
|             return | ||||
|         trigger = NotificationRule.objects.filter(pk=trigger_pk).first() | ||||
|         if not trigger: | ||||
|             return | ||||
|         notification = Notification( | ||||
|             severity=trigger.severity, body=event.summary, event=event, user=user | ||||
|         ) | ||||
|  | ||||
|         # Check if batching is enabled and process accordingly | ||||
|         transport = NotificationTransport.objects.filter(pk=transport_pk).first() | ||||
| <<<<<<< HEAD | ||||
|         if not transport: | ||||
|             return | ||||
|         transport.send(notification) | ||||
|         self.set_status(TaskStatus.SUCCESSFUL) | ||||
| ======= | ||||
|         if transport and transport.enable_batching: | ||||
|             # Process the event for batching | ||||
|             batch = EventBatch.get_or_create_batch(event.action, event.app, event.user) | ||||
|             batch.add_event_to_batch(event) | ||||
|             # Check if the batch has reached its limits | ||||
|             if not batch.check_batch_limits(): | ||||
|                 return | ||||
|  | ||||
|             batch_summary = batch.process_batch() | ||||
|             batch.delete() | ||||
|             notification = Notification( | ||||
|                 severity=trigger.severity, body=batch_summary, event=event, user=user | ||||
|             ) | ||||
|         else: | ||||
|             notification = Notification( | ||||
|                 severity=trigger.severity, body=event.summary, event=event, user=user | ||||
|             ) | ||||
|  | ||||
|         transport.send_notification(notification) | ||||
|  | ||||
|         self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL)) | ||||
| >>>>>>> 5255207c5 (events/batch: add event batching mechanism [AUTH-134]) | ||||
|     except (NotificationTransportError, PropertyMappingExpressionException) as exc: | ||||
|         self.set_error(exc) | ||||
|         raise exc | ||||
| @ -143,4 +176,21 @@ def notification_cleanup(self: SystemTask): | ||||
|     for notification in notifications: | ||||
|         notification.delete() | ||||
|     LOGGER.debug("Expired notifications", amount=amount) | ||||
| <<<<<<< HEAD | ||||
|     self.set_status(TaskStatus.SUCCESSFUL, f"Expired {amount} Notifications") | ||||
| ======= | ||||
|     self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, [f"Expired {amount} Notifications"])) | ||||
|  | ||||
| # Scheduled task to check and send pending batches | ||||
| @CELERY_APP.task(base=MonitoredTask) | ||||
| @shared_task | ||||
| def check_and_send_pending_batches(): | ||||
|     """Check for pending batches that haven't been sent and have been idle for a specified time.""" | ||||
|     idle_time = timezone.now() - timedelta(minutes=10)  # Example idle time | ||||
|     pending_batches = EventBatch.objects.filter(sent=False, last_updated__lt=idle_time) | ||||
|     for batch in pending_batches: | ||||
|         batch.send_notification() | ||||
|         batch.sent = True | ||||
|         batch.save() | ||||
|  | ||||
| >>>>>>> 5255207c5 (events/batch: add event batching mechanism [AUTH-134]) | ||||
|  | ||||
| @ -2931,6 +2931,11 @@ | ||||
|                     "type": "object", | ||||
|                     "additionalProperties": true, | ||||
|                     "title": "Brand" | ||||
|                 }, | ||||
|                 "batch_id": { | ||||
|                     "type": "string", | ||||
|                     "format": "uuid", | ||||
|                     "title": "Batch id" | ||||
|                 } | ||||
|             }, | ||||
|             "required": [] | ||||
| @ -2965,6 +2970,22 @@ | ||||
|                     "type": "boolean", | ||||
|                     "title": "Send once", | ||||
|                     "description": "Only send notification once, for example when sending a webhook into a chat channel." | ||||
|                 }, | ||||
|                 "enable_batching": { | ||||
|                     "type": "boolean", | ||||
|                     "title": "Enable batching" | ||||
|                 }, | ||||
|                 "batch_timeout": { | ||||
|                     "type": "integer", | ||||
|                     "minimum": -2147483648, | ||||
|                     "maximum": 2147483647, | ||||
|                     "title": "Batch timeout" | ||||
|                 }, | ||||
|                 "max_batch_size": { | ||||
|                     "type": "integer", | ||||
|                     "minimum": -2147483648, | ||||
|                     "maximum": 2147483647, | ||||
|                     "title": "Max batch size" | ||||
|                 } | ||||
|             }, | ||||
|             "required": [] | ||||
| @ -3040,6 +3061,11 @@ | ||||
|                             "type": "object", | ||||
|                             "additionalProperties": true, | ||||
|                             "title": "Brand" | ||||
|                         }, | ||||
|                         "batch_id": { | ||||
|                             "type": "string", | ||||
|                             "format": "uuid", | ||||
|                             "title": "Batch id" | ||||
|                         } | ||||
|                     }, | ||||
|                     "required": [ | ||||
|  | ||||
							
								
								
									
										42
									
								
								schema.yml
									
									
									
									
									
								
							
							
						
						
									
										42
									
								
								schema.yml
									
									
									
									
									
								
							| @ -32266,6 +32266,10 @@ components: | ||||
|           type: string | ||||
|           format: date-time | ||||
|         brand: {} | ||||
|         batch_id: | ||||
|           type: string | ||||
|           format: uuid | ||||
|           nullable: true | ||||
|       required: | ||||
|       - action | ||||
|       - app | ||||
| @ -32763,6 +32767,10 @@ components: | ||||
|           type: string | ||||
|           format: date-time | ||||
|         brand: {} | ||||
|         batch_id: | ||||
|           type: string | ||||
|           format: uuid | ||||
|           nullable: true | ||||
|       required: | ||||
|       - action | ||||
|       - app | ||||
| @ -35308,6 +35316,16 @@ components: | ||||
|           type: boolean | ||||
|           description: Only send notification once, for example when sending a webhook | ||||
|             into a chat channel. | ||||
|         enable_batching: | ||||
|           type: boolean | ||||
|         batch_timeout: | ||||
|           type: integer | ||||
|           maximum: 2147483647 | ||||
|           minimum: -2147483648 | ||||
|         max_batch_size: | ||||
|           type: integer | ||||
|           maximum: 2147483647 | ||||
|           minimum: -2147483648 | ||||
|       required: | ||||
|       - mode_verbose | ||||
|       - name | ||||
| @ -35344,6 +35362,16 @@ components: | ||||
|           type: boolean | ||||
|           description: Only send notification once, for example when sending a webhook | ||||
|             into a chat channel. | ||||
|         enable_batching: | ||||
|           type: boolean | ||||
|         batch_timeout: | ||||
|           type: integer | ||||
|           maximum: 2147483647 | ||||
|           minimum: -2147483648 | ||||
|         max_batch_size: | ||||
|           type: integer | ||||
|           maximum: 2147483647 | ||||
|           minimum: -2147483648 | ||||
|       required: | ||||
|       - name | ||||
|     NotificationTransportTest: | ||||
| @ -38388,6 +38416,10 @@ components: | ||||
|           type: string | ||||
|           format: date-time | ||||
|         brand: {} | ||||
|         batch_id: | ||||
|           type: string | ||||
|           format: uuid | ||||
|           nullable: true | ||||
|     PatchedExpressionPolicyRequest: | ||||
|       type: object | ||||
|       description: Group Membership Policy Serializer | ||||
| @ -38905,6 +38937,16 @@ components: | ||||
|           type: boolean | ||||
|           description: Only send notification once, for example when sending a webhook | ||||
|             into a chat channel. | ||||
|         enable_batching: | ||||
|           type: boolean | ||||
|         batch_timeout: | ||||
|           type: integer | ||||
|           maximum: 2147483647 | ||||
|           minimum: -2147483648 | ||||
|         max_batch_size: | ||||
|           type: integer | ||||
|           maximum: 2147483647 | ||||
|           minimum: -2147483648 | ||||
|     PatchedNotificationWebhookMappingRequest: | ||||
|       type: object | ||||
|       description: NotificationWebhookMapping Serializer | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	