core: add helper function to create events from expressions, move ak_user_has_authenticator to base evaluator

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer
2022-09-14 21:52:41 +02:00
parent ab28370f20
commit 9f5c019daa
9 changed files with 107 additions and 62 deletions

View File

@ -1,16 +1,20 @@
"""authentik expression policy evaluator"""
import re
from ipaddress import ip_address, ip_network
from textwrap import indent
from typing import Any, Iterable, Optional
from django.core.exceptions import FieldError
from django_otp import devices_for_user
from rest_framework.serializers import ValidationError
from sentry_sdk.hub import Hub
from sentry_sdk.tracing import Span
from structlog.stdlib import get_logger
from authentik.core.models import User
from authentik.events.models import Event
from authentik.lib.utils.http import get_http_session
from authentik.policies.types import PolicyRequest
LOGGER = get_logger()
@ -26,7 +30,8 @@ class BaseEvaluator:
# Filename used for exec
_filename: str
def __init__(self):
def __init__(self, filename: Optional[str] = None):
self._filename = filename if filename else "BaseEvaluator"
# update website/docs/expressions/_objects.md
# update website/docs/expressions/_functions.md
self._globals = {
@ -35,11 +40,14 @@ class BaseEvaluator:
"list_flatten": BaseEvaluator.expr_flatten,
"ak_is_group_member": BaseEvaluator.expr_is_group_member,
"ak_user_by": BaseEvaluator.expr_user_by,
"ak_logger": get_logger(),
"ak_user_has_authenticator": BaseEvaluator.expr_func_user_has_authenticator,
"ak_create_event": self.expr_event_create,
"ak_logger": get_logger(self._filename),
"requests": get_http_session(),
"ip_address": ip_address,
"ip_network": ip_network,
}
self._context = {}
self._filename = "BaseEvalautor"
@staticmethod
def expr_flatten(value: list[Any] | Any) -> Optional[Any]:
@ -60,6 +68,11 @@ class BaseEvaluator:
"""Expression Filter to run re.sub"""
return re.sub(regex, repl, value)
@staticmethod
def expr_is_group_member(user: User, **group_filters) -> bool:
"""Check if `user` is member of group with name `group_name`"""
return user.ak_groups.filter(**group_filters).exists()
@staticmethod
def expr_user_by(**filters) -> Optional[User]:
"""Get user by filters"""
@ -72,15 +85,37 @@ class BaseEvaluator:
return None
@staticmethod
def expr_is_group_member(user: User, **group_filters) -> bool:
"""Check if `user` is member of group with name `group_name`"""
return user.ak_groups.filter(**group_filters).exists()
def expr_func_user_has_authenticator(user: User, device_type: Optional[str] = None) -> bool:
"""Check if a user has any authenticator devices, optionally matching *device_type*"""
user_devices = devices_for_user(user)
if device_type:
for device in user_devices:
device_class = device.__class__.__name__.lower().replace("device", "")
if device_class == device_type:
return True
return False
return len(list(user_devices)) > 0
def expr_event_create(self, action: str, **kwargs):
"""Create event with supplied data and try to extract as much relevant data
from the context"""
kwargs["context"] = self._context
event = Event.new(
action,
app=self._filename,
**kwargs,
)
if "request" in self._context and isinstance(PolicyRequest, self._context["request"]):
policy_request: PolicyRequest = self._context["request"]
if policy_request.http_request:
event.from_http(policy_request)
return
event.save()
def wrap_expression(self, expression: str, params: Iterable[str]) -> str:
"""Wrap expression in a function, call it, and save the result as `result`"""
handler_signature = ",".join(params)
full_expression = ""
full_expression += "from ipaddress import ip_address, ip_network\n"
full_expression += f"def handler({handler_signature}):\n"
full_expression += indent(expression, " ")
full_expression += f"\nresult = handler({handler_signature})"