"""Application API Views""" from collections.abc import Iterator from copy import copy from django.core.cache import cache from django.db.models import QuerySet from django.shortcuts import get_object_or_404 from drf_spectacular.types import OpenApiTypes from drf_spectacular.utils import OpenApiParameter, OpenApiResponse, extend_schema from guardian.shortcuts import get_objects_for_user from rest_framework.decorators import action from rest_framework.exceptions import ValidationError from rest_framework.fields import CharField, ReadOnlyField, SerializerMethodField from rest_framework.parsers import MultiPartParser from rest_framework.request import Request from rest_framework.response import Response from rest_framework.viewsets import ModelViewSet from structlog.stdlib import get_logger from authentik.api.pagination import Pagination from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT from authentik.core.api.providers import ProviderSerializer from authentik.core.api.used_by import UsedByMixin from authentik.core.api.utils import ModelSerializer from authentik.core.models import Application, User from authentik.events.logs import LogEventSerializer, capture_logs from authentik.lib.utils.file import ( FilePathSerializer, FileUploadSerializer, set_file, set_file_url, ) from authentik.policies.api.exec import PolicyTestResultSerializer from authentik.policies.engine import PolicyEngine from authentik.policies.types import CACHE_PREFIX, PolicyResult from authentik.rbac.decorators import permission_required from authentik.rbac.filters import ObjectFilter LOGGER = get_logger() def user_app_cache_key(user_pk: str, page_number: int | None = None) -> str: """Cache key where application list for user is saved""" key = f"{CACHE_PREFIX}app_access/{user_pk}" if page_number: key += f"/{page_number}" return key class ApplicationSerializer(ModelSerializer): """Application Serializer""" launch_url = SerializerMethodField() provider_obj = ProviderSerializer(source="get_provider", required=False, read_only=True) backchannel_providers_obj = ProviderSerializer( source="backchannel_providers", required=False, read_only=True, many=True ) meta_icon = ReadOnlyField(source="get_meta_icon") def get_launch_url(self, app: Application) -> str | None: """Allow formatting of launch URL""" user = None if "request" in self.context: user = self.context["request"].user return app.get_launch_url(user) def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) if SERIALIZER_CONTEXT_BLUEPRINT in self.context: self.fields["icon"] = CharField(source="meta_icon", required=False) class Meta: model = Application fields = [ "pk", "name", "slug", "provider", "provider_obj", "backchannel_providers", "backchannel_providers_obj", "launch_url", "open_in_new_tab", "meta_launch_url", "meta_icon", "meta_description", "meta_publisher", "policy_engine_mode", "group", ] extra_kwargs = { "meta_icon": {"read_only": True}, "backchannel_providers": {"required": False}, } class ApplicationViewSet(UsedByMixin, ModelViewSet): """Application Viewset""" queryset = ( Application.objects.all() .with_provider() .prefetch_related("policies") .prefetch_related("backchannel_providers") ) serializer_class = ApplicationSerializer search_fields = [ "name", "slug", "meta_launch_url", "meta_description", "meta_publisher", "group", ] filterset_fields = [ "name", "slug", "meta_launch_url", "meta_description", "meta_publisher", "group", ] lookup_field = "slug" ordering = ["name"] def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet: """Custom filter_queryset method which ignores guardian, but still supports sorting""" for backend in list(self.filter_backends): if backend == ObjectFilter: continue queryset = backend().filter_queryset(self.request, queryset, self) return queryset def _get_allowed_applications( self, pagined_apps: Iterator[Application], user: User | None = None ) -> list[Application]: applications = [] request = self.request._request if user: request = copy(request) request.user = user for application in pagined_apps: engine = PolicyEngine(application, request.user, request) engine.build() if engine.passing: applications.append(application) return applications def _filter_applications_with_launch_url( self, pagined_apps: Iterator[Application] ) -> list[Application]: applications = [] for app in pagined_apps: if app.get_launch_url(): applications.append(app) return applications @extend_schema( parameters=[ OpenApiParameter( name="for_user", location=OpenApiParameter.QUERY, type=OpenApiTypes.INT, ) ], responses={ 200: PolicyTestResultSerializer(), }, ) @action(detail=True, methods=["GET"]) def check_access(self, request: Request, slug: str) -> Response: """Check access to a single application by slug""" # Don't use self.get_object as that checks for view_application permission # which the user might not have, even if they have access application = get_object_or_404(Application, slug=slug) # If the current user is superuser, they can set `for_user` for_user = request.user if request.user.is_superuser and "for_user" in request.query_params: try: for_user = User.objects.filter(pk=request.query_params.get("for_user")).first() except ValueError: raise ValidationError({"for_user": "for_user must be numerical"}) from None if not for_user: raise ValidationError({"for_user": "User not found"}) engine = PolicyEngine(application, for_user, request) engine.use_cache = False with capture_logs() as logs: engine.build() result = engine.result response = PolicyTestResultSerializer(PolicyResult(False)) if result.passing: response = PolicyTestResultSerializer(PolicyResult(True)) if request.user.is_superuser: log_messages = [] for log in logs: if log.attributes.get("process", "") == "PolicyProcess": continue log_messages.append(LogEventSerializer(log).data) result.log_messages = log_messages response = PolicyTestResultSerializer(result) return Response(response.data) @extend_schema( parameters=[ OpenApiParameter( name="superuser_full_list", location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL, ), OpenApiParameter( name="for_user", location=OpenApiParameter.QUERY, type=OpenApiTypes.INT, ), OpenApiParameter( name="only_with_launch_url", location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL, ), ] ) def list(self, request: Request) -> Response: """Custom list method that checks Policy based access instead of guardian""" should_cache = request.query_params.get("search", "") == "" superuser_full_list = ( str(request.query_params.get("superuser_full_list", "false")).lower() == "true" ) if superuser_full_list and request.user.is_superuser: return super().list(request) only_with_launch_url = str( request.query_params.get("only_with_launch_url", "false") ).lower() queryset = self._filter_queryset_for_list(self.get_queryset()) paginator: Pagination = self.paginator paginated_apps = paginator.paginate_queryset(queryset, request) if "for_user" in request.query_params: try: for_user: int = int(request.query_params.get("for_user", 0)) for_user = ( get_objects_for_user(request.user, "authentik_core.view_user_applications") .filter(pk=for_user) .first() ) if not for_user: raise ValidationError({"for_user": "User not found"}) except ValueError as exc: raise ValidationError from exc allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user) serializer = self.get_serializer(allowed_applications, many=True) return self.get_paginated_response(serializer.data) allowed_applications = [] if not should_cache: allowed_applications = self._get_allowed_applications(paginated_apps) if should_cache: allowed_applications = cache.get( user_app_cache_key(self.request.user.pk, paginator.page.number) ) if not allowed_applications: LOGGER.debug("Caching allowed application list", page=paginator.page.number) allowed_applications = self._get_allowed_applications(paginated_apps) cache.set( user_app_cache_key(self.request.user.pk, paginator.page.number), allowed_applications, timeout=86400, ) if only_with_launch_url == "true": allowed_applications = self._filter_applications_with_launch_url(allowed_applications) serializer = self.get_serializer(allowed_applications, many=True) return self.get_paginated_response(serializer.data) @permission_required("authentik_core.change_application") @extend_schema( request={ "multipart/form-data": FileUploadSerializer, }, responses={ 200: OpenApiResponse(description="Success"), 400: OpenApiResponse(description="Bad request"), }, ) @action( detail=True, pagination_class=None, filter_backends=[], methods=["POST"], parser_classes=(MultiPartParser,), ) def set_icon(self, request: Request, slug: str): """Set application icon""" app: Application = self.get_object() return set_file(request, app, "meta_icon") @permission_required("authentik_core.change_application") @extend_schema( request=FilePathSerializer, responses={ 200: OpenApiResponse(description="Success"), 400: OpenApiResponse(description="Bad request"), }, ) @action( detail=True, pagination_class=None, filter_backends=[], methods=["POST"], ) def set_icon_url(self, request: Request, slug: str): """Set application icon (as URL)""" app: Application = self.get_object() return set_file_url(request, app, "meta_icon")