flows: denied action (#3194)
This commit is contained in:
@ -5,7 +5,6 @@ from typing import TYPE_CHECKING, Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from django.db import models
|
||||
from django.http import HttpRequest
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from model_utils.managers import InheritanceManager
|
||||
from rest_framework.serializers import BaseSerializer
|
||||
@ -40,6 +39,14 @@ class InvalidResponseAction(models.TextChoices):
|
||||
RESTART_WITH_CONTEXT = "restart_with_context"
|
||||
|
||||
|
||||
class FlowDeniedAction(models.TextChoices):
|
||||
"""Configure what response is given to denied flow executions"""
|
||||
|
||||
MESSAGE_CONTINUE = "message_continue"
|
||||
MESSAGE = "message"
|
||||
CONTINUE = "continue"
|
||||
|
||||
|
||||
class FlowDesignation(models.TextChoices):
|
||||
"""Designation of what a Flow should be used for. At a later point, this
|
||||
should be replaced by a database entry."""
|
||||
@ -139,6 +146,12 @@ class Flow(SerializerModel, PolicyBindingModel):
|
||||
),
|
||||
)
|
||||
|
||||
denied_action = models.TextField(
|
||||
choices=FlowDeniedAction.choices,
|
||||
default=FlowDeniedAction.MESSAGE_CONTINUE,
|
||||
help_text=_("Configure what should happen when a flow denies access to a user."),
|
||||
)
|
||||
|
||||
@property
|
||||
def background_url(self) -> str:
|
||||
"""Get the URL to the background image. If the name is /static or starts with http
|
||||
@ -157,23 +170,6 @@ class Flow(SerializerModel, PolicyBindingModel):
|
||||
|
||||
return FlowSerializer
|
||||
|
||||
@staticmethod
|
||||
def with_policy(request: HttpRequest, **flow_filter) -> Optional["Flow"]:
|
||||
"""Get a Flow by `**flow_filter` and check if the request from `request` can access it."""
|
||||
from authentik.policies.engine import PolicyEngine
|
||||
|
||||
flows = Flow.objects.filter(**flow_filter).order_by("slug")
|
||||
for flow in flows:
|
||||
engine = PolicyEngine(flow, request.user, request)
|
||||
engine.build()
|
||||
result = engine.result
|
||||
if result.passing:
|
||||
LOGGER.debug("with_policy: flow passing", flow=flow)
|
||||
return flow
|
||||
LOGGER.warning("with_policy: flow not passing", flow=flow, messages=result.messages)
|
||||
LOGGER.debug("with_policy: no flow found", filters=flow_filter)
|
||||
return None
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Flow {self.name} ({self.slug})"
|
||||
|
||||
|
Reference in New Issue
Block a user