core: make application's check_access API return a PolicyResult and accept for_user as superuser
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
@ -13,7 +13,12 @@ from drf_spectacular.utils import (
|
||||
inline_serializer,
|
||||
)
|
||||
from rest_framework.decorators import action
|
||||
from rest_framework.fields import CharField, FileField, SerializerMethodField
|
||||
from rest_framework.fields import (
|
||||
CharField,
|
||||
FileField,
|
||||
IntegerField,
|
||||
SerializerMethodField,
|
||||
)
|
||||
from rest_framework.parsers import MultiPartParser
|
||||
from rest_framework.request import Request
|
||||
from rest_framework.response import Response
|
||||
@ -25,9 +30,11 @@ from structlog.stdlib import get_logger
|
||||
from authentik.admin.api.metrics import CoordinateSerializer, get_events_per_1h
|
||||
from authentik.api.decorators import permission_required
|
||||
from authentik.core.api.providers import ProviderSerializer
|
||||
from authentik.core.models import Application
|
||||
from authentik.core.models import Application, User
|
||||
from authentik.events.models import EventAction
|
||||
from authentik.policies.api.exec import PolicyTestResultSerializer
|
||||
from authentik.policies.engine import PolicyEngine
|
||||
from authentik.policies.types import PolicyResult
|
||||
from authentik.stages.user_login.stage import USER_LOGIN_AUTHENTICATED
|
||||
|
||||
LOGGER = get_logger()
|
||||
@ -112,23 +119,34 @@ class ApplicationViewSet(ModelViewSet):
|
||||
return applications
|
||||
|
||||
@extend_schema(
|
||||
request=inline_serializer(
|
||||
"CheckAccessRequest", fields={"for_user": IntegerField(required=False)}
|
||||
),
|
||||
responses={
|
||||
204: OpenApiResponse(description="Access granted"),
|
||||
403: OpenApiResponse(description="Access denied"),
|
||||
}
|
||||
200: PolicyTestResultSerializer(),
|
||||
404: OpenApiResponse(description="for_user user not found"),
|
||||
},
|
||||
)
|
||||
@action(detail=True, methods=["GET"])
|
||||
@action(detail=True, methods=["POST"])
|
||||
# pylint: disable=unused-argument
|
||||
def check_access(self, request: Request, slug: str) -> Response:
|
||||
"""Check access to a single application by slug"""
|
||||
# Don't use self.get_object as that checks for view_application permission
|
||||
# which the user might not have, even if they have access
|
||||
application = get_object_or_404(Application, slug=slug)
|
||||
engine = PolicyEngine(application, self.request.user, self.request)
|
||||
# If the current user is superuser, they can set `for_user`
|
||||
for_user = self.request.user
|
||||
if self.request.user.is_superuser and "for_user" in request.data:
|
||||
for_user = get_object_or_404(User, pk=request.data.get("for_user"))
|
||||
engine = PolicyEngine(application, for_user, self.request)
|
||||
engine.build()
|
||||
if engine.passing:
|
||||
return Response(status=204)
|
||||
return Response(status=403)
|
||||
result = engine.result
|
||||
response = PolicyTestResultSerializer(PolicyResult(False))
|
||||
if result.passing:
|
||||
response = PolicyTestResultSerializer(PolicyResult(True))
|
||||
if self.request.user.is_superuser:
|
||||
response = PolicyTestResultSerializer(result)
|
||||
return Response(response.data)
|
||||
|
||||
@extend_schema(
|
||||
parameters=[
|
||||
|
||||
Reference in New Issue
Block a user