104 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			104 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Event notification tasks"""
 | |
| from guardian.shortcuts import get_anonymous_user
 | |
| from structlog import get_logger
 | |
| 
 | |
| from authentik.core.models import User
 | |
| from authentik.events.models import (
 | |
|     Event,
 | |
|     Notification,
 | |
|     NotificationRule,
 | |
|     NotificationTransport,
 | |
|     NotificationTransportError,
 | |
| )
 | |
| from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
 | |
| from authentik.policies.engine import PolicyEngine, PolicyEngineMode
 | |
| from authentik.policies.models import PolicyBinding
 | |
| from authentik.root.celery import CELERY_APP
 | |
| 
 | |
| LOGGER = get_logger()
 | |
| 
 | |
| 
 | |
| @CELERY_APP.task()
 | |
| def event_notification_handler(event_uuid: str):
 | |
|     """Start task for each trigger definition"""
 | |
|     for trigger in NotificationRule.objects.all():
 | |
|         event_trigger_handler.apply_async(
 | |
|             args=[event_uuid, trigger.name], queue="authentik_events"
 | |
|         )
 | |
| 
 | |
| 
 | |
| @CELERY_APP.task()
 | |
| def event_trigger_handler(event_uuid: str, trigger_name: str):
 | |
|     """Check if policies attached to NotificationRule match event"""
 | |
|     event: Event = Event.objects.get(event_uuid=event_uuid)
 | |
|     trigger: NotificationRule = NotificationRule.objects.get(name=trigger_name)
 | |
| 
 | |
|     if "policy_uuid" in event.context:
 | |
|         policy_uuid = event.context["policy_uuid"]
 | |
|         if PolicyBinding.objects.filter(
 | |
|             target__in=NotificationRule.objects.all().values_list(
 | |
|                 "pbm_uuid", flat=True
 | |
|             ),
 | |
|             policy=policy_uuid,
 | |
|         ).exists():
 | |
|             # If policy that caused this event to be created is attached
 | |
|             # to *any* NotificationRule, we return early.
 | |
|             # This is the most effective way to prevent infinite loops.
 | |
|             LOGGER.debug(
 | |
|                 "e(trigger): attempting to prevent infinite loop", trigger=trigger
 | |
|             )
 | |
|             return
 | |
| 
 | |
|     if not trigger.group:
 | |
|         LOGGER.debug("e(trigger): trigger has no group", trigger=trigger)
 | |
|         return
 | |
| 
 | |
|     LOGGER.debug("e(trigger): checking if trigger applies", trigger=trigger)
 | |
|     user = User.objects.filter(pk=event.user.get("pk")) or get_anonymous_user()
 | |
|     policy_engine = PolicyEngine(trigger, user)
 | |
|     policy_engine.mode = PolicyEngineMode.MODE_OR
 | |
|     policy_engine.empty_result = False
 | |
|     policy_engine.use_cache = False
 | |
|     policy_engine.request.context["event"] = event
 | |
|     policy_engine.build()
 | |
|     result = policy_engine.result
 | |
|     if not result.passing:
 | |
|         return
 | |
| 
 | |
|     LOGGER.debug("e(trigger): event trigger matched", trigger=trigger)
 | |
|     # Create the notification objects
 | |
|     for transport in trigger.transports.all():
 | |
|         for user in trigger.group.users.all():
 | |
|             LOGGER.debug("created notification")
 | |
|             notification = Notification.objects.create(
 | |
|                 severity=trigger.severity, body=event.summary, event=event, user=user
 | |
|             )
 | |
|             notification_transport.apply_async(
 | |
|                 args=[notification.pk, transport.pk], queue="authentik_events"
 | |
|             )
 | |
|             if transport.send_once:
 | |
|                 break
 | |
| 
 | |
| 
 | |
| @CELERY_APP.task(
 | |
|     bind=True,
 | |
|     autoretry_for=(NotificationTransportError,),
 | |
|     retry_backoff=True,
 | |
|     base=MonitoredTask,
 | |
| )
 | |
| def notification_transport(
 | |
|     self: MonitoredTask, notification_pk: int, transport_pk: int
 | |
| ):
 | |
|     """Send notification over specified transport"""
 | |
|     self.save_on_success = False
 | |
|     try:
 | |
|         notification: Notification = Notification.objects.get(pk=notification_pk)
 | |
|         transport: NotificationTransport = NotificationTransport.objects.get(
 | |
|             pk=transport_pk
 | |
|         )
 | |
|         transport.send(notification)
 | |
|         self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL))
 | |
|     except NotificationTransportError as exc:
 | |
|         self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
 | |
|         raise exc
 | 
