Compare commits
	
		
			1 Commits
		
	
	
		
			version-te
			...
			smusali/ev
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 617d913ca2 | 
| @ -38,6 +38,7 @@ class EventSerializer(ModelSerializer): | |||||||
|             "created", |             "created", | ||||||
|             "expires", |             "expires", | ||||||
|             "brand", |             "brand", | ||||||
|  |             "batch_id", | ||||||
|         ] |         ] | ||||||
|  |  | ||||||
|  |  | ||||||
|  | |||||||
| @ -53,6 +53,9 @@ class NotificationTransportSerializer(ModelSerializer): | |||||||
|             "webhook_url", |             "webhook_url", | ||||||
|             "webhook_mapping", |             "webhook_mapping", | ||||||
|             "send_once", |             "send_once", | ||||||
|  |             "enable_batching", | ||||||
|  |             "batch_timeout", | ||||||
|  |             "max_batch_size", | ||||||
|         ] |         ] | ||||||
|  |  | ||||||
|  |  | ||||||
|  | |||||||
| @ -172,6 +172,63 @@ class EventManager(Manager): | |||||||
|         return self.get_queryset().get_events_per(time_since, extract, data_points) |         return self.get_queryset().get_events_per(time_since, extract, data_points) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class EventBatch(ExpiringModel): | ||||||
|  |     """Model to store information about batches of events.""" | ||||||
|  |  | ||||||
|  |     batch_id = models.UUIDField(primary_key=True, default=uuid4, editable=False) | ||||||
|  |     event_type = models.CharField(max_length=255) | ||||||
|  |     event_app = models.CharField(max_length=255) | ||||||
|  |     event_user = models.CharField(max_length=255) | ||||||
|  |     start_time = models.DateTimeField(auto_now_add=True) | ||||||
|  |     end_time = models.DateTimeField(null=True, blank=True) | ||||||
|  |     event_count = models.IntegerField(default=0) | ||||||
|  |     last_updated = models.DateTimeField(auto_now=True) | ||||||
|  |     max_batch_size = models.IntegerField(default=10) | ||||||
|  |     batch_timeout = models.IntegerField(default=60)  # Timeout in seconds | ||||||
|  |     sent = models.BooleanField(default=False) | ||||||
|  |  | ||||||
|  |     def add_event_to_batch(self, event): | ||||||
|  |         """Add an event to the batch and check if it's ready to send.""" | ||||||
|  |         self.add_event(event) | ||||||
|  |         if self.check_batch_limits(): | ||||||
|  |             self.process_batch() | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def get_or_create_batch(action, app, user): | ||||||
|  |         """Get or create a batch for a given action.""" | ||||||
|  |         return EventBatch.objects.filter( | ||||||
|  |             event_type=action, event_app=app, event_user=user, end_time__isnull=True | ||||||
|  |         ).first() or EventBatch.objects.create(event_type=action, event_app=app, event_user=user) | ||||||
|  |  | ||||||
|  |     def check_batch_limits(self): | ||||||
|  |         """Check if the batch has reached its size or timeout limits.""" | ||||||
|  |         time_elapsed = now() - self.start_time | ||||||
|  |         return self.event_count >= self.max_batch_size or time_elapsed >= timedelta( | ||||||
|  |             seconds=self.batch_timeout | ||||||
|  |         ) | ||||||
|  |  | ||||||
|  |     def add_event(self, event): | ||||||
|  |         """Add an event to the batch.""" | ||||||
|  |         self.event_count += 1 | ||||||
|  |         self.save() | ||||||
|  |  | ||||||
|  |     def create_batch_summary(self): | ||||||
|  |         """Create a summary message for the batch.""" | ||||||
|  |         return f"Batched Event Summary: {self.event_type} action \ | ||||||
|  |             on {self.event_app} app by {self.event_user} user \ | ||||||
|  |             occurred {self.event_count} times between {self.start_time} and {now()}" | ||||||
|  |  | ||||||
|  |     def process_batch(self): | ||||||
|  |         """Process the batch and check if it's ready to send.""" | ||||||
|  |         summary_message = self.create_batch_summary() | ||||||
|  |         return summary_message | ||||||
|  |          | ||||||
|  |     def send_notification(self): | ||||||
|  |         """Send notification for this batch.""" | ||||||
|  |         # Implement the logic to send notification | ||||||
|  |         pass | ||||||
|  |  | ||||||
|  |  | ||||||
| class Event(SerializerModel, ExpiringModel): | class Event(SerializerModel, ExpiringModel): | ||||||
|     """An individual Audit/Metrics/Notification/Error Event""" |     """An individual Audit/Metrics/Notification/Error Event""" | ||||||
|  |  | ||||||
| @ -187,6 +244,8 @@ class Event(SerializerModel, ExpiringModel): | |||||||
|     # Shadow the expires attribute from ExpiringModel to override the default duration |     # Shadow the expires attribute from ExpiringModel to override the default duration | ||||||
|     expires = models.DateTimeField(default=default_event_duration) |     expires = models.DateTimeField(default=default_event_duration) | ||||||
|  |  | ||||||
|  |     batch_id = models.UUIDField(null=True, blank=True) | ||||||
|  |  | ||||||
|     objects = EventManager() |     objects = EventManager() | ||||||
|  |  | ||||||
|     @staticmethod |     @staticmethod | ||||||
| @ -214,6 +273,7 @@ class Event(SerializerModel, ExpiringModel): | |||||||
|             # Also ensure that closest django app has the correct prefix |             # Also ensure that closest django app has the correct prefix | ||||||
|             if len(django_apps) > 0 and django_apps[0].startswith(app): |             if len(django_apps) > 0 and django_apps[0].startswith(app): | ||||||
|                 app = django_apps[0] |                 app = django_apps[0] | ||||||
|  |  | ||||||
|         cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs)) |         cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs)) | ||||||
|         event = Event(action=action, app=app, context=cleaned_kwargs) |         event = Event(action=action, app=app, context=cleaned_kwargs) | ||||||
|         return event |         return event | ||||||
| @ -275,6 +335,9 @@ class Event(SerializerModel, ExpiringModel): | |||||||
|         return self |         return self | ||||||
|  |  | ||||||
|     def save(self, *args, **kwargs): |     def save(self, *args, **kwargs): | ||||||
|  |         # Creating a batch for this event in the save method | ||||||
|  |         batch = EventBatch.get_or_create_batch(self.action, self.user, self.app) | ||||||
|  |         self.batch_id = batch.batch_id | ||||||
|         if self._state.adding: |         if self._state.adding: | ||||||
|             LOGGER.info( |             LOGGER.info( | ||||||
|                 "Created Event", |                 "Created Event", | ||||||
| @ -334,7 +397,17 @@ class NotificationTransport(SerializerModel): | |||||||
|         ), |         ), | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
|  |     enable_batching = models.BooleanField(default=False) | ||||||
|  |     batch_timeout = models.IntegerField(default=60)  # Timeout in seconds | ||||||
|  |     max_batch_size = models.IntegerField(default=10) | ||||||
|  |  | ||||||
|     def send(self, notification: "Notification") -> list[str]: |     def send(self, notification: "Notification") -> list[str]: | ||||||
|  |         """Send a batched notification or a single notification""" | ||||||
|  |         if self.enable_batching: | ||||||
|  |             return self.process_batch(notification) | ||||||
|  |         return self.send_notification(notification) | ||||||
|  |  | ||||||
|  |     def send_notification(self, notification: "Notification") -> list[str]: | ||||||
|         """Send notification to user, called from async task""" |         """Send notification to user, called from async task""" | ||||||
|         if self.mode == TransportMode.LOCAL: |         if self.mode == TransportMode.LOCAL: | ||||||
|             return self.send_local(notification) |             return self.send_local(notification) | ||||||
|  | |||||||
| @ -3,11 +3,13 @@ | |||||||
| from typing import Optional | from typing import Optional | ||||||
|  |  | ||||||
| from django.db.models.query_utils import Q | from django.db.models.query_utils import Q | ||||||
|  | from django.utils import timezone | ||||||
| from guardian.shortcuts import get_anonymous_user | from guardian.shortcuts import get_anonymous_user | ||||||
| from structlog.stdlib import get_logger | from structlog.stdlib import get_logger | ||||||
|  |  | ||||||
| from authentik.core.exceptions import PropertyMappingExpressionException | from authentik.core.exceptions import PropertyMappingExpressionException | ||||||
| from authentik.core.models import User | from authentik.core.models import User | ||||||
|  | from authentik.events.models import EventBatch  # Importing the EventBatch model | ||||||
| from authentik.events.models import ( | from authentik.events.models import ( | ||||||
|     Event, |     Event, | ||||||
|     Notification, |     Notification, | ||||||
| @ -17,6 +19,13 @@ from authentik.events.models import ( | |||||||
|     TaskStatus, |     TaskStatus, | ||||||
| ) | ) | ||||||
| from authentik.events.system_tasks import SystemTask, prefill_task | from authentik.events.system_tasks import SystemTask, prefill_task | ||||||
|  | from authentik.events.monitored_tasks import ( | ||||||
|  |     MonitoredTask, | ||||||
|  |     TaskResult, | ||||||
|  |     TaskResultStatus, | ||||||
|  |     shared_task, | ||||||
|  | ) | ||||||
|  | >>>>>>> 5255207c5 (events/batch: add event batching mechanism [AUTH-134]) | ||||||
| from authentik.policies.engine import PolicyEngine | from authentik.policies.engine import PolicyEngine | ||||||
| from authentik.policies.models import PolicyBinding, PolicyEngineMode | from authentik.policies.models import PolicyBinding, PolicyEngineMode | ||||||
| from authentik.root.celery import CELERY_APP | from authentik.root.celery import CELERY_APP | ||||||
| @ -107,20 +116,44 @@ def notification_transport( | |||||||
|         event = Event.objects.filter(pk=event_pk).first() |         event = Event.objects.filter(pk=event_pk).first() | ||||||
|         if not event: |         if not event: | ||||||
|             return |             return | ||||||
|  |  | ||||||
|         user = User.objects.filter(pk=user_pk).first() |         user = User.objects.filter(pk=user_pk).first() | ||||||
|         if not user: |         if not user: | ||||||
|             return |             return | ||||||
|         trigger = NotificationRule.objects.filter(pk=trigger_pk).first() |         trigger = NotificationRule.objects.filter(pk=trigger_pk).first() | ||||||
|         if not trigger: |         if not trigger: | ||||||
|             return |             return | ||||||
|         notification = Notification( |  | ||||||
|             severity=trigger.severity, body=event.summary, event=event, user=user |         # Check if batching is enabled and process accordingly | ||||||
|         ) |  | ||||||
|         transport = NotificationTransport.objects.filter(pk=transport_pk).first() |         transport = NotificationTransport.objects.filter(pk=transport_pk).first() | ||||||
|  | <<<<<<< HEAD | ||||||
|         if not transport: |         if not transport: | ||||||
|             return |             return | ||||||
|         transport.send(notification) |         transport.send(notification) | ||||||
|         self.set_status(TaskStatus.SUCCESSFUL) |         self.set_status(TaskStatus.SUCCESSFUL) | ||||||
|  | ======= | ||||||
|  |         if transport and transport.enable_batching: | ||||||
|  |             # Process the event for batching | ||||||
|  |             batch = EventBatch.get_or_create_batch(event.action, event.app, event.user) | ||||||
|  |             batch.add_event_to_batch(event) | ||||||
|  |             # Check if the batch has reached its limits | ||||||
|  |             if not batch.check_batch_limits(): | ||||||
|  |                 return | ||||||
|  |  | ||||||
|  |             batch_summary = batch.process_batch() | ||||||
|  |             batch.delete() | ||||||
|  |             notification = Notification( | ||||||
|  |                 severity=trigger.severity, body=batch_summary, event=event, user=user | ||||||
|  |             ) | ||||||
|  |         else: | ||||||
|  |             notification = Notification( | ||||||
|  |                 severity=trigger.severity, body=event.summary, event=event, user=user | ||||||
|  |             ) | ||||||
|  |  | ||||||
|  |         transport.send_notification(notification) | ||||||
|  |  | ||||||
|  |         self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL)) | ||||||
|  | >>>>>>> 5255207c5 (events/batch: add event batching mechanism [AUTH-134]) | ||||||
|     except (NotificationTransportError, PropertyMappingExpressionException) as exc: |     except (NotificationTransportError, PropertyMappingExpressionException) as exc: | ||||||
|         self.set_error(exc) |         self.set_error(exc) | ||||||
|         raise exc |         raise exc | ||||||
| @ -143,4 +176,21 @@ def notification_cleanup(self: SystemTask): | |||||||
|     for notification in notifications: |     for notification in notifications: | ||||||
|         notification.delete() |         notification.delete() | ||||||
|     LOGGER.debug("Expired notifications", amount=amount) |     LOGGER.debug("Expired notifications", amount=amount) | ||||||
|  | <<<<<<< HEAD | ||||||
|     self.set_status(TaskStatus.SUCCESSFUL, f"Expired {amount} Notifications") |     self.set_status(TaskStatus.SUCCESSFUL, f"Expired {amount} Notifications") | ||||||
|  | ======= | ||||||
|  |     self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, [f"Expired {amount} Notifications"])) | ||||||
|  |  | ||||||
|  | # Scheduled task to check and send pending batches | ||||||
|  | @CELERY_APP.task(base=MonitoredTask) | ||||||
|  | @shared_task | ||||||
|  | def check_and_send_pending_batches(): | ||||||
|  |     """Check for pending batches that haven't been sent and have been idle for a specified time.""" | ||||||
|  |     idle_time = timezone.now() - timedelta(minutes=10)  # Example idle time | ||||||
|  |     pending_batches = EventBatch.objects.filter(sent=False, last_updated__lt=idle_time) | ||||||
|  |     for batch in pending_batches: | ||||||
|  |         batch.send_notification() | ||||||
|  |         batch.sent = True | ||||||
|  |         batch.save() | ||||||
|  |  | ||||||
|  | >>>>>>> 5255207c5 (events/batch: add event batching mechanism [AUTH-134]) | ||||||
|  | |||||||
| @ -2931,6 +2931,11 @@ | |||||||
|                     "type": "object", |                     "type": "object", | ||||||
|                     "additionalProperties": true, |                     "additionalProperties": true, | ||||||
|                     "title": "Brand" |                     "title": "Brand" | ||||||
|  |                 }, | ||||||
|  |                 "batch_id": { | ||||||
|  |                     "type": "string", | ||||||
|  |                     "format": "uuid", | ||||||
|  |                     "title": "Batch id" | ||||||
|                 } |                 } | ||||||
|             }, |             }, | ||||||
|             "required": [] |             "required": [] | ||||||
| @ -2965,6 +2970,22 @@ | |||||||
|                     "type": "boolean", |                     "type": "boolean", | ||||||
|                     "title": "Send once", |                     "title": "Send once", | ||||||
|                     "description": "Only send notification once, for example when sending a webhook into a chat channel." |                     "description": "Only send notification once, for example when sending a webhook into a chat channel." | ||||||
|  |                 }, | ||||||
|  |                 "enable_batching": { | ||||||
|  |                     "type": "boolean", | ||||||
|  |                     "title": "Enable batching" | ||||||
|  |                 }, | ||||||
|  |                 "batch_timeout": { | ||||||
|  |                     "type": "integer", | ||||||
|  |                     "minimum": -2147483648, | ||||||
|  |                     "maximum": 2147483647, | ||||||
|  |                     "title": "Batch timeout" | ||||||
|  |                 }, | ||||||
|  |                 "max_batch_size": { | ||||||
|  |                     "type": "integer", | ||||||
|  |                     "minimum": -2147483648, | ||||||
|  |                     "maximum": 2147483647, | ||||||
|  |                     "title": "Max batch size" | ||||||
|                 } |                 } | ||||||
|             }, |             }, | ||||||
|             "required": [] |             "required": [] | ||||||
| @ -3040,6 +3061,11 @@ | |||||||
|                             "type": "object", |                             "type": "object", | ||||||
|                             "additionalProperties": true, |                             "additionalProperties": true, | ||||||
|                             "title": "Brand" |                             "title": "Brand" | ||||||
|  |                         }, | ||||||
|  |                         "batch_id": { | ||||||
|  |                             "type": "string", | ||||||
|  |                             "format": "uuid", | ||||||
|  |                             "title": "Batch id" | ||||||
|                         } |                         } | ||||||
|                     }, |                     }, | ||||||
|                     "required": [ |                     "required": [ | ||||||
|  | |||||||
							
								
								
									
										42
									
								
								schema.yml
									
									
									
									
									
								
							
							
						
						
									
										42
									
								
								schema.yml
									
									
									
									
									
								
							| @ -32266,6 +32266,10 @@ components: | |||||||
|           type: string |           type: string | ||||||
|           format: date-time |           format: date-time | ||||||
|         brand: {} |         brand: {} | ||||||
|  |         batch_id: | ||||||
|  |           type: string | ||||||
|  |           format: uuid | ||||||
|  |           nullable: true | ||||||
|       required: |       required: | ||||||
|       - action |       - action | ||||||
|       - app |       - app | ||||||
| @ -32763,6 +32767,10 @@ components: | |||||||
|           type: string |           type: string | ||||||
|           format: date-time |           format: date-time | ||||||
|         brand: {} |         brand: {} | ||||||
|  |         batch_id: | ||||||
|  |           type: string | ||||||
|  |           format: uuid | ||||||
|  |           nullable: true | ||||||
|       required: |       required: | ||||||
|       - action |       - action | ||||||
|       - app |       - app | ||||||
| @ -35308,6 +35316,16 @@ components: | |||||||
|           type: boolean |           type: boolean | ||||||
|           description: Only send notification once, for example when sending a webhook |           description: Only send notification once, for example when sending a webhook | ||||||
|             into a chat channel. |             into a chat channel. | ||||||
|  |         enable_batching: | ||||||
|  |           type: boolean | ||||||
|  |         batch_timeout: | ||||||
|  |           type: integer | ||||||
|  |           maximum: 2147483647 | ||||||
|  |           minimum: -2147483648 | ||||||
|  |         max_batch_size: | ||||||
|  |           type: integer | ||||||
|  |           maximum: 2147483647 | ||||||
|  |           minimum: -2147483648 | ||||||
|       required: |       required: | ||||||
|       - mode_verbose |       - mode_verbose | ||||||
|       - name |       - name | ||||||
| @ -35344,6 +35362,16 @@ components: | |||||||
|           type: boolean |           type: boolean | ||||||
|           description: Only send notification once, for example when sending a webhook |           description: Only send notification once, for example when sending a webhook | ||||||
|             into a chat channel. |             into a chat channel. | ||||||
|  |         enable_batching: | ||||||
|  |           type: boolean | ||||||
|  |         batch_timeout: | ||||||
|  |           type: integer | ||||||
|  |           maximum: 2147483647 | ||||||
|  |           minimum: -2147483648 | ||||||
|  |         max_batch_size: | ||||||
|  |           type: integer | ||||||
|  |           maximum: 2147483647 | ||||||
|  |           minimum: -2147483648 | ||||||
|       required: |       required: | ||||||
|       - name |       - name | ||||||
|     NotificationTransportTest: |     NotificationTransportTest: | ||||||
| @ -38388,6 +38416,10 @@ components: | |||||||
|           type: string |           type: string | ||||||
|           format: date-time |           format: date-time | ||||||
|         brand: {} |         brand: {} | ||||||
|  |         batch_id: | ||||||
|  |           type: string | ||||||
|  |           format: uuid | ||||||
|  |           nullable: true | ||||||
|     PatchedExpressionPolicyRequest: |     PatchedExpressionPolicyRequest: | ||||||
|       type: object |       type: object | ||||||
|       description: Group Membership Policy Serializer |       description: Group Membership Policy Serializer | ||||||
| @ -38905,6 +38937,16 @@ components: | |||||||
|           type: boolean |           type: boolean | ||||||
|           description: Only send notification once, for example when sending a webhook |           description: Only send notification once, for example when sending a webhook | ||||||
|             into a chat channel. |             into a chat channel. | ||||||
|  |         enable_batching: | ||||||
|  |           type: boolean | ||||||
|  |         batch_timeout: | ||||||
|  |           type: integer | ||||||
|  |           maximum: 2147483647 | ||||||
|  |           minimum: -2147483648 | ||||||
|  |         max_batch_size: | ||||||
|  |           type: integer | ||||||
|  |           maximum: 2147483647 | ||||||
|  |           minimum: -2147483648 | ||||||
|     PatchedNotificationWebhookMappingRequest: |     PatchedNotificationWebhookMappingRequest: | ||||||
|       type: object |       type: object | ||||||
|       description: NotificationWebhookMapping Serializer |       description: NotificationWebhookMapping Serializer | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	