events: Notifications (#418)

* events: initial alerting implementation

* policies: move error handling to process, ensure policy UUID is saved

* policies: add tests for error handling in PolicyProcess

* events: improve loop detection

* events: add API for action and trigger

* policies: ensure http_request is not used in context

* events: adjust unittests for user handling

* policies/event_matcher: add policy type

* events: add API tests

* events: add middleware tests

* core: make application's provider not required

* outposts: allow blank kubeconfig

* outposts: validate kubeconfig before saving

* api: fix formatting

* stages/invitation: remove invitation_created signal as model_created functions the same

* stages/invitation: ensure created_by is set when creating from API

* events: rebase migrations on master

* events: fix missing Alerts from API

* policies: fix unittests

* events: add tests for alerts

* events: rename from alerting to notifications

* events: add ability to specify severity of notification created

* policies/event_matcher: Add app field to match on event app

* policies/event_matcher: fix EventMatcher not being included in API

* core: use objects.none() when get_queryset is used

* events: use m2m for multiple transports, create notification object in task

* events: add default triggers

* events: fix migrations return value

* events: fix notification_transport not being in the correct queue

* stages/email: allow sending of email without backend

* events: implement sending via webhook + slack/discord + email
This commit is contained in:
Jens L
2021-01-11 18:43:59 +01:00
committed by GitHub
parent f8a426f0e8
commit 1ccf6dcf6f
40 changed files with 2074 additions and 84 deletions

View File

View File

@ -0,0 +1,33 @@
"""Notification API Views"""
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ModelViewSet
from authentik.events.models import Notification
class NotificationSerializer(ModelSerializer):
"""Notification Serializer"""
class Meta:
model = Notification
fields = [
"pk",
"severity",
"body",
"created",
"event",
"seen",
]
class NotificationViewSet(ModelViewSet):
"""Notification Viewset"""
queryset = Notification.objects.all()
serializer_class = NotificationSerializer
def get_queryset(self):
if not self.request:
return super().get_queryset()
return Notification.objects.filter(user=self.request.user)

View File

@ -0,0 +1,53 @@
"""NotificationTransport API Views"""
from django.http.response import Http404
from guardian.shortcuts import get_objects_for_user
from rest_framework.decorators import action
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ModelViewSet
from authentik.events.models import (
Notification,
NotificationSeverity,
NotificationTransport,
)
class NotificationTransportSerializer(ModelSerializer):
"""NotificationTransport Serializer"""
class Meta:
model = NotificationTransport
fields = [
"pk",
"name",
"mode",
"webhook_url",
]
class NotificationTransportViewSet(ModelViewSet):
"""NotificationTransport Viewset"""
queryset = NotificationTransport.objects.all()
serializer_class = NotificationTransportSerializer
@action(detail=True, methods=["post"])
# pylint: disable=invalid-name
def test(self, request: Request, pk=None) -> Response:
"""Send example notification using selected transport. Requires
Modify permissions."""
transports = get_objects_for_user(
request.user, "authentik_events.change_notificationtransport"
).filter(pk=pk)
if not transports.exists():
raise Http404
transport = transports.first()
notification = Notification(
severity=NotificationSeverity.NOTICE,
body=f"Test Notification from transport {transport.name}",
user=request.user,
)
return Response(transport.send(notification))

View File

@ -0,0 +1,26 @@
"""NotificationTrigger API Views"""
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ModelViewSet
from authentik.events.models import NotificationTrigger
class NotificationTriggerSerializer(ModelSerializer):
"""NotificationTrigger Serializer"""
class Meta:
model = NotificationTrigger
fields = [
"pk",
"name",
"transports",
"severity",
]
class NotificationTriggerViewSet(ModelViewSet):
"""NotificationTrigger Viewset"""
queryset = NotificationTrigger.objects.all()
serializer_class = NotificationTriggerSerializer

View File

@ -0,0 +1,148 @@
# Generated by Django 3.1.4 on 2021-01-11 16:36
import uuid
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("authentik_policies", "0004_policy_execution_logging"),
("authentik_core", "0016_auto_20201202_2234"),
("authentik_events", "0009_auto_20201227_1210"),
]
operations = [
migrations.CreateModel(
name="NotificationTransport",
fields=[
(
"uuid",
models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
),
),
("name", models.TextField(unique=True)),
(
"mode",
models.TextField(
choices=[
("webhook", "Generic Webhook"),
("webhook_slack", "Slack Webhook (Slack/Discord)"),
("email", "Email"),
]
),
),
("webhook_url", models.TextField(blank=True)),
],
options={
"verbose_name": "Notification Transport",
"verbose_name_plural": "Notification Transports",
},
),
migrations.CreateModel(
name="NotificationTrigger",
fields=[
(
"policybindingmodel_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="authentik_policies.policybindingmodel",
),
),
("name", models.TextField(unique=True)),
(
"severity",
models.TextField(
choices=[
("notice", "Notice"),
("warning", "Warning"),
("alert", "Alert"),
],
default="notice",
help_text="Controls which severity level the created notifications will have.",
),
),
(
"group",
models.ForeignKey(
blank=True,
help_text="Define which group of users this notification should be sent and shown to. If left empty, Notification won't ben sent.",
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="authentik_core.group",
),
),
(
"transports",
models.ManyToManyField(
help_text="Select which transports should be used to notify the user. If none are selected, the notification will only be shown in the authentik UI.",
to="authentik_events.NotificationTransport",
),
),
],
options={
"verbose_name": "Notification Trigger",
"verbose_name_plural": "Notification Triggers",
},
bases=("authentik_policies.policybindingmodel",),
),
migrations.CreateModel(
name="Notification",
fields=[
(
"uuid",
models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
),
),
(
"severity",
models.TextField(
choices=[
("notice", "Notice"),
("warning", "Warning"),
("alert", "Alert"),
]
),
),
("body", models.TextField()),
("created", models.DateTimeField(auto_now_add=True)),
("seen", models.BooleanField(default=False)),
(
"event",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="authentik_events.event",
),
),
(
"user",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"verbose_name": "Notification",
"verbose_name_plural": "Notifications",
},
),
]

View File

@ -0,0 +1,108 @@
# Generated by Django 3.1.4 on 2021-01-10 18:57
from django.apps.registry import Apps
from django.db import migrations
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from authentik.events.models import EventAction
def notify_configuration_error(apps: Apps, schema_editor: BaseDatabaseSchemaEditor):
db_alias = schema_editor.connection.alias
PolicyBinding = apps.get_model("authentik_policies", "PolicyBinding")
EventMatcherPolicy = apps.get_model(
"authentik_policies_event_matcher", "EventMatcherPolicy"
)
NotificationTrigger = apps.get_model("authentik_events", "NotificationTrigger")
policy, _ = EventMatcherPolicy.objects.using(db_alias).update_or_create(
name="default-match-configuration-error",
defaults={"action": EventAction.CONFIGURATION_ERROR},
)
trigger, _ = NotificationTrigger.objects.using(db_alias).update_or_create(
name="default-notify-configuration-error",
)
PolicyBinding.objects.using(db_alias).update_or_create(
target=trigger,
policy=policy,
defaults={
"order": 0,
},
)
def notify_update(apps: Apps, schema_editor: BaseDatabaseSchemaEditor):
db_alias = schema_editor.connection.alias
PolicyBinding = apps.get_model("authentik_policies", "PolicyBinding")
EventMatcherPolicy = apps.get_model(
"authentik_policies_event_matcher", "EventMatcherPolicy"
)
NotificationTrigger = apps.get_model("authentik_events", "NotificationTrigger")
policy, _ = EventMatcherPolicy.objects.using(db_alias).update_or_create(
name="default-match-update",
defaults={"action": EventAction.UPDATE_AVAILABLE},
)
trigger, _ = NotificationTrigger.objects.using(db_alias).update_or_create(
name="default-notify-update",
)
PolicyBinding.objects.using(db_alias).update_or_create(
target=trigger,
policy=policy,
defaults={
"order": 0,
},
)
def notify_exception(apps: Apps, schema_editor: BaseDatabaseSchemaEditor):
db_alias = schema_editor.connection.alias
PolicyBinding = apps.get_model("authentik_policies", "PolicyBinding")
EventMatcherPolicy = apps.get_model(
"authentik_policies_event_matcher", "EventMatcherPolicy"
)
NotificationTrigger = apps.get_model("authentik_events", "NotificationTrigger")
policy_policy_exc, _ = EventMatcherPolicy.objects.using(db_alias).update_or_create(
name="default-match-policy-exception",
defaults={"action": EventAction.POLICY_EXCEPTION},
)
policy_pm_exc, _ = EventMatcherPolicy.objects.using(db_alias).update_or_create(
name="default-match-property-mapping-exception",
defaults={"action": EventAction.PROPERTY_MAPPING_EXCEPTION},
)
trigger, _ = NotificationTrigger.objects.using(db_alias).update_or_create(
name="default-notify-exception",
)
PolicyBinding.objects.using(db_alias).update_or_create(
target=trigger,
policy=policy_policy_exc,
defaults={
"order": 0,
},
)
PolicyBinding.objects.using(db_alias).update_or_create(
target=trigger,
policy=policy_pm_exc,
defaults={
"order": 1,
},
)
class Migration(migrations.Migration):
dependencies = [
(
"authentik_events",
"0010_notification_notificationtransport_notificationtrigger",
),
("authentik_policies_event_matcher", "0003_auto_20210110_1907"),
("authentik_policies", "0004_policy_execution_logging"),
]
operations = [
migrations.RunPython(notify_configuration_error),
migrations.RunPython(notify_update),
migrations.RunPython(notify_exception),
]

View File

@ -9,15 +9,20 @@ from django.core.exceptions import ValidationError
from django.db import models
from django.http import HttpRequest
from django.utils.translation import gettext as _
from requests import post
from structlog.stdlib import get_logger
from authentik import __version__
from authentik.core.middleware import (
SESSION_IMPERSONATE_ORIGINAL_USER,
SESSION_IMPERSONATE_USER,
)
from authentik.core.models import User
from authentik.core.models import Group, User
from authentik.events.utils import cleanse_dict, get_user, sanitize_dict
from authentik.lib.utils.http import get_client_ip
from authentik.policies.models import PolicyBindingModel
from authentik.stages.email.tasks import send_mail
from authentik.stages.email.utils import TemplateEmailMessage
LOGGER = get_logger("authentik.events")
@ -104,10 +109,12 @@ class Event(models.Model):
Events independently from requests.
`user` arguments optionally overrides user from requests."""
if hasattr(request, "user"):
self.user = get_user(
request.user,
request.session.get(SESSION_IMPERSONATE_ORIGINAL_USER, None),
)
original_user = None
if hasattr(request, "session"):
original_user = request.session.get(
SESSION_IMPERSONATE_ORIGINAL_USER, None
)
self.user = get_user(request.user, original_user)
if user:
self.user = get_user(user)
# Check if we're currently impersonating, and add that user
@ -139,7 +146,189 @@ class Event(models.Model):
)
return super().save(*args, **kwargs)
@property
def summary(self) -> str:
"""Return a summary of this event."""
if "message" in self.context:
return self.context["message"]
return f"{self.action}: {self.context}"
def __str__(self) -> str:
return f"<Event action={self.action} user={self.user} context={self.context}>"
class Meta:
verbose_name = _("Event")
verbose_name_plural = _("Events")
class TransportMode(models.TextChoices):
"""Modes that a notification transport can send a notification"""
WEBHOOK = "webhook", _("Generic Webhook")
WEBHOOK_SLACK = "webhook_slack", _("Slack Webhook (Slack/Discord)")
EMAIL = "email", _("Email")
class NotificationTransport(models.Model):
"""Action which is executed when a Trigger matches"""
uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
name = models.TextField(unique=True)
mode = models.TextField(choices=TransportMode.choices)
webhook_url = models.TextField(blank=True)
def send(self, notification: "Notification") -> list[str]:
"""Send notification to user, called from async task"""
if self.mode == TransportMode.WEBHOOK:
return self.send_webhook(notification)
if self.mode == TransportMode.WEBHOOK_SLACK:
return self.send_webhook_slack(notification)
if self.mode == TransportMode.EMAIL:
return self.send_email(notification)
raise ValueError(f"Invalid mode {self.mode} set")
def send_webhook(self, notification: "Notification") -> list[str]:
"""Send notification to generic webhook"""
response = post(
self.webhook_url,
json={
"body": notification.body,
"severity": notification.severity,
},
)
return [
response.status_code,
response.text,
]
def send_webhook_slack(self, notification: "Notification") -> list[str]:
"""Send notification to slack or slack-compatible endpoints"""
body = {
"username": "authentik",
"icon_url": "https://goauthentik.io/img/icon.png",
"attachments": [
{
"author_name": "authentik",
"author_link": "https://goauthentik.io",
"author_icon": "https://goauthentik.io/img/icon.png",
"title": notification.body,
"color": "#fd4b2d",
"fields": [
{
"title": _("Severity"),
"value": notification.severity,
"short": True,
},
{
"title": _("Dispatched for user"),
"value": str(notification.user),
"short": True,
},
],
"footer": f"authentik v{__version__}",
}
],
}
if notification.event:
body["attachments"][0]["title"] = notification.event.action
body["attachments"][0]["text"] = notification.event.action
response = post(self.webhook_url, json=body)
return [
response.status_code,
response.text,
]
def send_email(self, notification: "Notification") -> list[str]:
"""Send notification via global email configuration"""
body_trunc = (
(notification.body[:75] + "..")
if len(notification.body) > 75
else notification.body
)
mail = TemplateEmailMessage(
subject=f"authentik Notification: {body_trunc}",
template_name="email/setup.html",
to=[notification.user.email],
template_context={
"body": notification.body,
},
)
# Email is sent directly here, as the call to send() should have been from a task.
# pyright: reportGeneralTypeIssues=false
return send_mail(mail.__dict__) # pylint: disable=no-value-for-parameter
class Meta:
verbose_name = _("Notification Transport")
verbose_name_plural = _("Notification Transports")
class NotificationSeverity(models.TextChoices):
"""Severity images that a notification can have"""
NOTICE = "notice", _("Notice")
WARNING = "warning", _("Warning")
ALERT = "alert", _("Alert")
class Notification(models.Model):
"""Event Notification"""
uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
severity = models.TextField(choices=NotificationSeverity.choices)
body = models.TextField()
created = models.DateTimeField(auto_now_add=True)
event = models.ForeignKey(Event, on_delete=models.SET_NULL, null=True, blank=True)
seen = models.BooleanField(default=False)
user = models.ForeignKey(User, on_delete=models.CASCADE)
def __str__(self) -> str:
body_trunc = (self.body[:75] + "..") if len(self.body) > 75 else self.body
return f"Notification for user {self.user}: {body_trunc}"
class Meta:
verbose_name = _("Notification")
verbose_name_plural = _("Notifications")
class NotificationTrigger(PolicyBindingModel):
"""Decide when to create a Notification based on policies attached to this object."""
name = models.TextField(unique=True)
transports = models.ManyToManyField(
NotificationTransport,
help_text=_(
(
"Select which transports should be used to notify the user. If none are "
"selected, the notification will only be shown in the authentik UI."
)
),
)
severity = models.TextField(
choices=NotificationSeverity.choices,
default=NotificationSeverity.NOTICE,
help_text=_(
"Controls which severity level the created notifications will have."
),
)
group = models.ForeignKey(
Group,
help_text=_(
(
"Define which group of users this notification should be sent and shown to. "
"If left empty, Notification won't ben sent."
)
),
null=True,
blank=True,
on_delete=models.SET_NULL,
)
class Meta:
verbose_name = _("Notification Trigger")
verbose_name_plural = _("Notification Triggers")

View File

@ -7,12 +7,14 @@ from django.contrib.auth.signals import (
user_logged_out,
user_login_failed,
)
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.http import HttpRequest
from authentik.core.models import User
from authentik.core.signals import password_changed
from authentik.events.models import Event, EventAction
from authentik.events.tasks import event_notification_handler
from authentik.stages.invitation.models import Invitation
from authentik.stages.invitation.signals import invitation_used
from authentik.stages.user_write.signals import user_write
@ -95,3 +97,10 @@ def on_password_changed(sender, user: User, password: str, **_):
"""Log password change"""
thread = EventNewThread(EventAction.PASSWORD_SET, None, user=user)
thread.run()
@receiver(post_save, sender=Event)
# pylint: disable=unused-argument
def event_post_save_notification(sender, instance: Event, **_):
"""Start task to check if any policies trigger an notification on this event"""
event_notification_handler.delay(instance.event_uuid.hex)

80
authentik/events/tasks.py Normal file
View File

@ -0,0 +1,80 @@
"""Event notification tasks"""
from guardian.shortcuts import get_anonymous_user
from structlog import get_logger
from authentik.events.models import (
Event,
Notification,
NotificationTransport,
NotificationTrigger,
)
from authentik.lib.tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.policies.engine import PolicyEngine
from authentik.root.celery import CELERY_APP
LOGGER = get_logger()
@CELERY_APP.task()
def event_notification_handler(event_uuid: str):
"""Start task for each trigger definition"""
for trigger in NotificationTrigger.objects.all():
event_trigger_handler.apply_async(
args=[event_uuid, trigger.name], queue="authentik_events"
)
@CELERY_APP.task()
def event_trigger_handler(event_uuid: str, trigger_name: str):
"""Check if policies attached to NotificationTrigger match event"""
event: Event = Event.objects.get(event_uuid=event_uuid)
trigger: NotificationTrigger = NotificationTrigger.objects.get(name=trigger_name)
if "policy_uuid" in event.context:
policy_uuid = event.context["policy_uuid"]
if trigger.policies.filter(policy_uuid=policy_uuid).exists():
# Event has been created by a policy that is attached
# to this trigger. To prevent infinite loops, we stop here
LOGGER.debug("e(trigger): attempting to prevent infinite loop")
return
if not trigger.group:
LOGGER.debug("e(trigger): trigger has no group")
return
policy_engine = PolicyEngine(trigger, get_anonymous_user())
policy_engine.request.context["event"] = event
policy_engine.build()
result = policy_engine.result
if not result.passing:
return
LOGGER.debug("e(trigger): event trigger matched")
# Create the notification objects
for user in trigger.group.users.all():
notification = Notification.objects.create(
severity=trigger.severity, body=event.summary, event=event, user=user
)
for transport in trigger.transports.all():
notification_transport.apply_async(
args=[notification.pk, transport.pk], queue="authentik_events"
)
@CELERY_APP.task(bind=True, base=MonitoredTask)
def notification_transport(
self: MonitoredTask, notification_pk: int, transport_pk: int
):
"""Send notification over specified transport"""
self.save_on_success = False
try:
notification: Notification = Notification.objects.get(pk=notification_pk)
transport: NotificationTransport = NotificationTransport.objects.get(
pk=transport_pk
)
transport.send(notification)
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL))
except Exception as exc:
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
raise exc

View File

@ -0,0 +1,24 @@
"""Event API tests"""
from django.shortcuts import reverse
from rest_framework.test import APITestCase
from authentik.core.models import User
from authentik.events.models import Event, EventAction
class TestEventsAPI(APITestCase):
"""Test Event API"""
def test_top_n(self):
"""Test top_per_user"""
user = User.objects.get(username="akadmin")
self.client.force_login(user)
event = Event.new(EventAction.AUTHORIZE_APPLICATION)
event.save() # We save to ensure nothing is un-saveable
response = self.client.get(
reverse("authentik_api:event-top-per-user"),
data={"filter_action": EventAction.AUTHORIZE_APPLICATION},
)
self.assertEqual(response.status_code, 200)

View File

@ -1,9 +1,10 @@
"""events event tests"""
"""event tests"""
from django.contrib.contenttypes.models import ContentType
from django.test import TestCase
from guardian.shortcuts import get_anonymous_user
from authentik.core.models import Group
from authentik.events.models import Event
from authentik.policies.dummy.models import DummyPolicy
@ -13,14 +14,24 @@ class TestEvents(TestCase):
def test_new_with_model(self):
"""Create a new Event passing a model as kwarg"""
event = Event.new("unittest", test={"model": get_anonymous_user()})
test_model = Group.objects.create(name="test")
event = Event.new("unittest", test={"model": test_model})
event.save() # We save to ensure nothing is un-saveable
model_content_type = ContentType.objects.get_for_model(get_anonymous_user())
model_content_type = ContentType.objects.get_for_model(test_model)
self.assertEqual(
event.context.get("test").get("model").get("app"),
model_content_type.app_label,
)
def test_new_with_user(self):
"""Create a new Event passing a user as kwarg"""
event = Event.new("unittest", test={"model": get_anonymous_user()})
event.save() # We save to ensure nothing is un-saveable
self.assertEqual(
event.context.get("test").get("model").get("username"),
get_anonymous_user().username,
)
def test_new_with_uuid_model(self):
"""Create a new Event passing a model (with UUID PK) as kwarg"""
temp_model = DummyPolicy.objects.create(name="test", result=True)

View File

@ -0,0 +1,48 @@
"""Event Middleware tests"""
from django.shortcuts import reverse
from rest_framework.test import APITestCase
from authentik.core.models import Application, User
from authentik.events.models import Event, EventAction
class TestEventsMiddleware(APITestCase):
"""Test Event Middleware"""
def setUp(self) -> None:
super().setUp()
self.user = User.objects.get(username="akadmin")
self.client.force_login(self.user)
def test_create(self):
"""Test model creation event"""
self.client.post(
reverse("authentik_api:application-list"),
data={"name": "test-create", "slug": "test-create"},
)
self.assertTrue(Application.objects.filter(name="test-create").exists())
self.assertTrue(
Event.objects.filter(
action=EventAction.MODEL_CREATED,
context__model__model_name="application",
context__model__app="authentik_core",
context__model__name="test-create",
).exists()
)
def test_delete(self):
"""Test model creation event"""
Application.objects.create(name="test-delete", slug="test-delete")
self.client.delete(
reverse("authentik_api:application-detail", kwargs={"slug": "test-delete"})
)
self.assertFalse(Application.objects.filter(name="test").exists())
self.assertTrue(
Event.objects.filter(
action=EventAction.MODEL_DELETED,
context__model__model_name="application",
context__model__app="authentik_core",
context__model__name="test-delete",
).exists()
)

View File

@ -0,0 +1,77 @@
"""Notification tests"""
from unittest.mock import MagicMock, patch
from django.test import TestCase
from authentik.core.models import Group, User
from authentik.events.models import (
Event,
EventAction,
NotificationTransport,
NotificationTrigger,
)
from authentik.policies.event_matcher.models import EventMatcherPolicy
from authentik.policies.exceptions import PolicyException
from authentik.policies.models import PolicyBinding
class TestEventsNotifications(TestCase):
"""Test Event Notifications"""
def setUp(self) -> None:
self.group = Group.objects.create(name="test-group")
self.user = User.objects.create(name="test-user")
self.group.users.add(self.user)
self.group.save()
def test_trigger_single(self):
"""Test simple transport triggering"""
transport = NotificationTransport.objects.create(name="transport")
trigger = NotificationTrigger.objects.create(name="trigger", group=self.group)
trigger.transports.add(transport)
trigger.save()
matcher = EventMatcherPolicy.objects.create(
name="matcher", action=EventAction.CUSTOM_PREFIX
)
PolicyBinding.objects.create(target=trigger, policy=matcher, order=0)
execute_mock = MagicMock()
with patch("authentik.events.models.NotificationTransport.send", execute_mock):
Event.new(EventAction.CUSTOM_PREFIX).save()
self.assertEqual(execute_mock.call_count, 1)
def test_trigger_no_group(self):
"""Test trigger without group"""
trigger = NotificationTrigger.objects.create(name="trigger")
matcher = EventMatcherPolicy.objects.create(
name="matcher", action=EventAction.CUSTOM_PREFIX
)
PolicyBinding.objects.create(target=trigger, policy=matcher, order=0)
execute_mock = MagicMock()
with patch("authentik.events.models.NotificationTransport.send", execute_mock):
Event.new(EventAction.CUSTOM_PREFIX).save()
self.assertEqual(execute_mock.call_count, 0)
def test_policy_error_recursive(self):
"""Test Policy error which would cause recursion"""
transport = NotificationTransport.objects.create(name="transport")
trigger = NotificationTrigger.objects.create(name="trigger", group=self.group)
trigger.transports.add(transport)
trigger.save()
matcher = EventMatcherPolicy.objects.create(
name="matcher", action=EventAction.CUSTOM_PREFIX
)
PolicyBinding.objects.create(target=trigger, policy=matcher, order=0)
execute_mock = MagicMock()
passes = MagicMock(side_effect=PolicyException)
with patch(
"authentik.policies.event_matcher.models.EventMatcherPolicy.passes", passes
):
with patch(
"authentik.events.models.NotificationTransport.send", execute_mock
):
Event.new(EventAction.CUSTOM_PREFIX).save()
self.assertEqual(passes.call_count, 0)

View File

@ -5,8 +5,10 @@ from typing import Any, Dict, Optional
from uuid import UUID
from django.contrib.auth.models import AnonymousUser
from django.core.handlers.wsgi import WSGIRequest
from django.db import models
from django.db.models.base import Model
from django.http.request import HttpRequest
from django.views.debug import SafeExceptionReporterFilter
from guardian.utils import get_anonymous_user
@ -83,10 +85,14 @@ def sanitize_dict(source: Dict[Any, Any]) -> Dict[Any, Any]:
value = asdict(value)
if isinstance(value, dict):
final_dict[key] = sanitize_dict(value)
elif isinstance(value, User):
final_dict[key] = sanitize_dict(get_user(value))
elif isinstance(value, models.Model):
final_dict[key] = sanitize_dict(model_to_dict(value))
elif isinstance(value, UUID):
final_dict[key] = value.hex
elif isinstance(value, (HttpRequest, WSGIRequest)):
continue
else:
final_dict[key] = value
return final_dict