Compare commits
1 Commits
main
...
website/do
Author | SHA1 | Date | |
---|---|---|---|
cddc1c0478 |
@ -0,0 +1,172 @@
|
||||
---
|
||||
title: Notification Rule Expression Policies
|
||||
---
|
||||
|
||||
## Introduction
|
||||
|
||||
Notification rules with bound expression policies are very powerful. The following are examples of what can be achieved.
|
||||
|
||||
### Change user attributes upon account deactivation
|
||||
|
||||
This example code is triggered when a user account with the `sshPublicKey` attribute set is deactivated. It saves the `sshPublicKey` attribute to a new `inactivesshPublicKey` attribute, and subsequently nullifies the `sshPublicKey` attribute.
|
||||
|
||||
```python
|
||||
from authentik.core.models import User
|
||||
|
||||
# Check if an event has occurred
|
||||
event = request.context.get("event", None)
|
||||
if not event:
|
||||
ak_logger.info("no event")
|
||||
return False
|
||||
|
||||
# Check if the event action includes updating a model
|
||||
if event.action != "model_updated":
|
||||
ak_logger.info("event action does not match")
|
||||
return False
|
||||
|
||||
model_app = event.context["model"]["app"]
|
||||
model_name = event.context["model"]["model_name"]
|
||||
|
||||
# Check if the model that was updated is the user model
|
||||
if model_app != "authentik_core" or model_name != "user":
|
||||
ak_logger.info("model does not match")
|
||||
|
||||
user_pk = event.context["model"]["pk"]
|
||||
user = User.objects.filter(pk=user_pk).first()
|
||||
|
||||
# Check if an user object was found
|
||||
if not user:
|
||||
ak_logger.info("user not found")
|
||||
return False
|
||||
|
||||
# Check if user is active
|
||||
if user.is_active:
|
||||
ak_logger.info("user is active, not changing")
|
||||
return False
|
||||
|
||||
# Check if user has the `sshPublicKey` attribute set
|
||||
if not user.attributes.get("sshPublicKey"):
|
||||
ak_logger.info("no public keys to remove")
|
||||
return False
|
||||
|
||||
# Save the `sshPublicKey` attribute to a new `inactiveSSHPublicKey` attribute
|
||||
user.attributes["inactiveSSHPublicKey"] = user.attributes["sshPublicKey"]
|
||||
|
||||
# Nullify the `sshPublicKey` attribute
|
||||
user.attributes["sshPublicKey"] = []
|
||||
|
||||
# Save the changes made to the user
|
||||
user.save()
|
||||
|
||||
return False
|
||||
```
|
||||
|
||||
### Alert when application is created without binding
|
||||
|
||||
This code is triggered when a new application is created without any user, group, or policy bound to it. The notification rule can then be configured to alert an administrator. This feature is useful for ensuring limited access to applications, as by default, an application without any users, groups, or policies bound to it can be accessed by all users.
|
||||
|
||||
```python
|
||||
from authentik.core.models import Application
|
||||
from authentik.policies.models import PolicyBinding
|
||||
|
||||
# Check if an event has occurred
|
||||
event = request.context.get("event", None)
|
||||
if not event:
|
||||
ak_logger.info("no event")
|
||||
return False
|
||||
|
||||
# Check if the event action includes creating a model
|
||||
if event.action != "model_created":
|
||||
ak_logger.info("event action does not match")
|
||||
return False
|
||||
|
||||
model_app = event.context["model"]["app"]
|
||||
model_name = event.context["model"]["model_name"]
|
||||
|
||||
# Check if the model that was created is the application model
|
||||
if model_app != "authentik_core" or model_name != "application":
|
||||
ak_logger.info("model does not match")
|
||||
|
||||
application_pk = event.context["model"]["pk"]
|
||||
application = Application.objects.filter(pk=application_pk).first()
|
||||
|
||||
# Check if an application object was found
|
||||
if not application:
|
||||
ak_logger.info("application not found")
|
||||
return False
|
||||
|
||||
# Check if application has binding
|
||||
if PolicyBinding.objects.filter(target=application).exists():
|
||||
output = PolicyBinding.objects.filter(target=application)
|
||||
ak_logger.info("application has bindings, returning true")
|
||||
return True
|
||||
|
||||
return False
|
||||
```
|
||||
|
||||
### Append user addition history to group attributes
|
||||
|
||||
This code is triggered when a user is added to a group. It then creates and updates a `UserAddedHistory` attribute to the group with a date/time stamp and the username of the added user. This functionality is already available within the changelog of a group, but this code can be used as a template to trigger alerts or other events.
|
||||
|
||||
:::note
|
||||
This policy interacts with the `diff` event output. This filed is only available with an enterprise license.
|
||||
:::
|
||||
|
||||
```python
|
||||
from authentik.core.models import User
|
||||
from authentik.core.models import Group
|
||||
from datetime import datetime
|
||||
|
||||
# Check if an event has occurred
|
||||
event = request.context.get("event", None)
|
||||
if not event:
|
||||
ak_logger.info("no event")
|
||||
return False
|
||||
|
||||
# Check if the event action includes updating a model
|
||||
if event.action != "model_updated":
|
||||
ak_logger.info("event action does not match")
|
||||
return False
|
||||
|
||||
model_app = event.context["model"]["app"]
|
||||
model_name = event.context["model"]["model_name"]
|
||||
|
||||
# Check if the model that was updated is the group model
|
||||
if model_app != "authentik_core" or model_name != "group":
|
||||
ak_logger.info("model does not match")
|
||||
|
||||
group_pk = event.context["model"]["pk"]
|
||||
group = Group.objects.filter(pk=group_pk).first()
|
||||
|
||||
# If user was added to group, get user object, else return false
|
||||
if "add" in event.context["diff"]["users"]:
|
||||
ak_logger.info("user added to group")
|
||||
|
||||
user_pk = event.context["diff"]["users"]["add"][0]
|
||||
user = User.objects.filter(pk=user_pk).first()
|
||||
else:
|
||||
ak_logger.info("user not added to group")
|
||||
return False
|
||||
|
||||
# Check if a group object was found
|
||||
if not group:
|
||||
ak_logger.info("group not found")
|
||||
return False
|
||||
|
||||
# Check if an user object was found
|
||||
if not user:
|
||||
ak_logger.info("user not found")
|
||||
return False
|
||||
|
||||
if not group.attributes.get("UserAddedHistory"):
|
||||
group.attributes["UserAddedHistory"] = []
|
||||
|
||||
current_date_time = datetime.now().isoformat(timespec='seconds')
|
||||
|
||||
group.attributes["UserAddedHistory"].append(current_date_time + " - Added user: " + user.username)
|
||||
|
||||
# Save the changes made to the group
|
||||
group.save()
|
||||
|
||||
return False
|
||||
```
|
Reference in New Issue
Block a user