api: cleanup owner permissions (#12598)

* api: cleanup owner superuser permissions

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* remove remaining owner filters

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* re-organise

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix order of filtering

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* re-add legacy behaviour for tokens

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix notifications

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens L.
2025-01-08 18:01:10 +01:00
committed by GitHub
parent 1b4fee2bac
commit 3ee3adc509
19 changed files with 59 additions and 166 deletions

View File

@ -1,67 +0,0 @@
"""API Authorization"""
from django.conf import settings
from django.db.models import Model
from django.db.models.query import QuerySet
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.authentication import get_authorization_header
from rest_framework.filters import BaseFilterBackend
from rest_framework.permissions import BasePermission
from rest_framework.request import Request
from authentik.api.authentication import validate_auth
from authentik.rbac.filters import ObjectFilter
class OwnerFilter(BaseFilterBackend):
"""Filter objects by their owner"""
owner_key = "user"
def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet:
if request.user.is_superuser:
return queryset
return queryset.filter(**{self.owner_key: request.user})
class SecretKeyFilter(DjangoFilterBackend):
"""Allow access to all objects when authenticated with secret key as token.
Replaces both DjangoFilterBackend and ObjectFilter"""
def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet:
auth_header = get_authorization_header(request)
token = validate_auth(auth_header)
if token and token == settings.SECRET_KEY:
return queryset
queryset = ObjectFilter().filter_queryset(request, queryset, view)
return super().filter_queryset(request, queryset, view)
class OwnerPermissions(BasePermission):
"""Authorize requests by an object's owner matching the requesting user"""
owner_key = "user"
def has_permission(self, request: Request, view) -> bool:
"""If the user is authenticated, we allow all requests here. For listing, the
object-level permissions are done by the filter backend"""
return request.user.is_authenticated
def has_object_permission(self, request: Request, view, obj: Model) -> bool:
"""Check if the object's owner matches the currently logged in user"""
if not hasattr(obj, self.owner_key):
return False
owner = getattr(obj, self.owner_key)
if owner != request.user:
return False
return True
class OwnerSuperuserPermissions(OwnerPermissions):
"""Similar to OwnerPermissions, except always allow access for superusers"""
def has_object_permission(self, request: Request, view, obj: Model) -> bool:
if request.user.is_superuser:
return True
return super().has_object_permission(request, view, obj)

View File

@ -14,10 +14,10 @@ from rest_framework.response import Response
from rest_framework.validators import UniqueValidator
from rest_framework.viewsets import ModelViewSet
from authentik.api.authorization import SecretKeyFilter
from authentik.brands.models import Brand
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer, PassiveSerializer
from authentik.rbac.filters import SecretKeyFilter
from authentik.tenants.utils import get_current_tenant

View File

@ -2,16 +2,12 @@
from typing import TypedDict
from django_filters.rest_framework import DjangoFilterBackend
from guardian.utils import get_anonymous_user
from rest_framework import mixins
from rest_framework.fields import SerializerMethodField
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.request import Request
from rest_framework.viewsets import GenericViewSet
from ua_parser import user_agent_parser
from authentik.api.authorization import OwnerSuperuserPermissions
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer
from authentik.core.models import AuthenticatedSession
@ -110,11 +106,4 @@ class AuthenticatedSessionViewSet(
search_fields = ["user__username", "last_ip", "last_user_agent"]
filterset_fields = ["user__username", "last_ip", "last_user_agent"]
ordering = ["user__username"]
permission_classes = [OwnerSuperuserPermissions]
filter_backends = [DjangoFilterBackend, OrderingFilter, SearchFilter]
def get_queryset(self):
user = self.request.user if self.request else get_anonymous_user()
if user.is_superuser:
return super().get_queryset()
return super().get_queryset().filter(user=user.pk)
owner_field = "user"

View File

@ -2,19 +2,16 @@
from collections.abc import Iterable
from django_filters.rest_framework import DjangoFilterBackend
from drf_spectacular.utils import OpenApiResponse, extend_schema
from rest_framework import mixins
from rest_framework.decorators import action
from rest_framework.fields import CharField, ReadOnlyField, SerializerMethodField
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.parsers import MultiPartParser
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
from structlog.stdlib import get_logger
from authentik.api.authorization import OwnerFilter, OwnerSuperuserPermissions
from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
from authentik.core.api.object_types import TypesMixin
from authentik.core.api.used_by import UsedByMixin
@ -189,11 +186,10 @@ class UserSourceConnectionViewSet(
queryset = UserSourceConnection.objects.all()
serializer_class = UserSourceConnectionSerializer
permission_classes = [OwnerSuperuserPermissions]
filterset_fields = ["user", "source__slug"]
search_fields = ["source__slug"]
filter_backends = [OwnerFilter, DjangoFilterBackend, OrderingFilter, SearchFilter]
ordering = ["source__slug", "pk"]
owner_field = "user"
class GroupSourceConnectionSerializer(SourceSerializer):
@ -228,8 +224,7 @@ class GroupSourceConnectionViewSet(
queryset = GroupSourceConnection.objects.all()
serializer_class = GroupSourceConnectionSerializer
permission_classes = [OwnerSuperuserPermissions]
filterset_fields = ["group", "source__slug"]
search_fields = ["source__slug"]
filter_backends = [OwnerFilter, DjangoFilterBackend, OrderingFilter, SearchFilter]
ordering = ["source__slug", "pk"]
owner_field = "user"

View File

@ -3,18 +3,15 @@
from typing import Any
from django.utils.timezone import now
from django_filters.rest_framework import DjangoFilterBackend
from drf_spectacular.utils import OpenApiResponse, extend_schema, inline_serializer
from guardian.shortcuts import assign_perm, get_anonymous_user
from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError
from rest_framework.fields import CharField
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet
from authentik.api.authorization import OwnerSuperuserPermissions
from authentik.blueprints.api import ManagedSerializer
from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
from authentik.core.api.used_by import UsedByMixin
@ -138,8 +135,8 @@ class TokenViewSet(UsedByMixin, ModelViewSet):
"managed",
]
ordering = ["identifier", "expires"]
permission_classes = [OwnerSuperuserPermissions]
filter_backends = [DjangoFilterBackend, OrderingFilter, SearchFilter]
owner_field = "user"
rbac_allow_create_without_perm = True
def get_queryset(self):
user = self.request.user if self.request else get_anonymous_user()

View File

@ -28,7 +28,6 @@ from rest_framework.validators import UniqueValidator
from rest_framework.viewsets import ModelViewSet
from structlog.stdlib import get_logger
from authentik.api.authorization import SecretKeyFilter
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer, PassiveSerializer
from authentik.crypto.apps import MANAGED_KEY
@ -36,7 +35,7 @@ from authentik.crypto.builder import CertificateBuilder, PrivateKeyAlg
from authentik.crypto.models import CertificateKeyPair
from authentik.events.models import Event, EventAction
from authentik.rbac.decorators import permission_required
from authentik.rbac.filters import ObjectFilter
from authentik.rbac.filters import ObjectFilter, SecretKeyFilter
LOGGER = get_logger()

View File

@ -1,11 +1,8 @@
"""RAC Provider API Views"""
from django_filters.rest_framework.backends import DjangoFilterBackend
from rest_framework import mixins
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.viewsets import GenericViewSet
from authentik.api.authorization import OwnerFilter, OwnerSuperuserPermissions
from authentik.core.api.groups import GroupMemberSerializer
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer
@ -34,12 +31,6 @@ class ConnectionTokenSerializer(EnterpriseRequiredMixin, ModelSerializer):
]
class ConnectionTokenOwnerFilter(OwnerFilter):
"""Owner filter for connection tokens (checks session's user)"""
owner_key = "session__user"
class ConnectionTokenViewSet(
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
@ -55,10 +46,4 @@ class ConnectionTokenViewSet(
filterset_fields = ["endpoint", "session__user", "provider"]
search_fields = ["endpoint__name", "provider__name"]
ordering = ["endpoint__name", "provider__name"]
permission_classes = [OwnerSuperuserPermissions]
filter_backends = [
ConnectionTokenOwnerFilter,
DjangoFilterBackend,
OrderingFilter,
SearchFilter,
]
owner_field = "session__user"

View File

@ -1,14 +1,11 @@
"""AuthenticatorEndpointGDTCStage API Views"""
from django_filters.rest_framework.backends import DjangoFilterBackend
from rest_framework import mixins
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.permissions import IsAdminUser
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import GenericViewSet, ModelViewSet
from structlog.stdlib import get_logger
from authentik.api.authorization import OwnerFilter, OwnerPermissions
from authentik.core.api.used_by import UsedByMixin
from authentik.enterprise.api import EnterpriseRequiredMixin
from authentik.enterprise.stages.authenticator_endpoint_gdtc.models import (
@ -67,8 +64,7 @@ class EndpointDeviceViewSet(
search_fields = ["name"]
filterset_fields = ["name"]
ordering = ["name"]
permission_classes = [OwnerPermissions]
filter_backends = [OwnerFilter, DjangoFilterBackend, OrderingFilter, SearchFilter]
owner_field = "user"
class EndpointAdminDeviceViewSet(ModelViewSet):

View File

@ -1,17 +1,15 @@
"""Notification API Views"""
from django_filters.rest_framework import DjangoFilterBackend
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiResponse, extend_schema
from rest_framework import mixins
from rest_framework.decorators import action
from rest_framework.fields import ReadOnlyField
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.permissions import IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
from authentik.api.authorization import OwnerFilter, OwnerPermissions
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer
from authentik.events.api.events import EventSerializer
@ -57,8 +55,7 @@ class NotificationViewSet(
"seen",
"user",
]
permission_classes = [OwnerPermissions]
filter_backends = [OwnerFilter, DjangoFilterBackend, OrderingFilter, SearchFilter]
owner_field = "user"
@extend_schema(
request=OpenApiTypes.NONE,
@ -66,7 +63,7 @@ class NotificationViewSet(
204: OpenApiResponse(description="Marked tasks as read successfully."),
},
)
@action(detail=False, methods=["post"])
@action(detail=False, methods=["post"], permission_classes=[IsAuthenticated])
def mark_all_seen(self, request: Request) -> Response:
"""Mark all the user's notifications as seen"""
Notification.objects.filter(user=request.user, seen=False).update(seen=True)

View File

@ -1,10 +1,15 @@
"""RBAC API Filter"""
from django.conf import settings
from django.db.models import QuerySet
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.authentication import get_authorization_header
from rest_framework.exceptions import PermissionDenied
from rest_framework.request import Request
from rest_framework.views import APIView
from rest_framework_guardian.filters import ObjectPermissionsFilter
from authentik.api.authentication import validate_auth
from authentik.core.models import UserTypes
@ -12,7 +17,7 @@ class ObjectFilter(ObjectPermissionsFilter):
"""Object permission filter that grants global permission higher priority than
per-object permissions"""
def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet:
def filter_queryset(self, request: Request, queryset: QuerySet, view: APIView) -> QuerySet:
permission = self.perm_format % {
"app_label": queryset.model._meta.app_label,
"model_name": queryset.model._meta.model_name,
@ -21,6 +26,9 @@ class ObjectFilter(ObjectPermissionsFilter):
# per-object permissions
if request.user.has_perm(permission):
return queryset
# User does not have permissions, but we have an owner field defined, so filter by that
if owner_field := getattr(view, "owner_field", None):
return queryset.filter(**{owner_field: request.user})
queryset = super().filter_queryset(request, queryset, view)
# Outposts (which are the only objects using internal service accounts)
# except requests to return an empty list when they have no objects
@ -32,3 +40,17 @@ class ObjectFilter(ObjectPermissionsFilter):
# and also no object permissions assigned (directly or via role)
raise PermissionDenied()
return queryset
class SecretKeyFilter(DjangoFilterBackend):
"""Allow access to all objects when authenticated with secret key as token.
Replaces both DjangoFilterBackend and ObjectFilter"""
def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet:
auth_header = get_authorization_header(request)
token = validate_auth(auth_header)
if token and token == settings.SECRET_KEY:
return queryset
queryset = ObjectFilter().filter_queryset(request, queryset, view)
return super().filter_queryset(request, queryset, view)

View File

@ -15,6 +15,17 @@ class ObjectPermissions(DjangoObjectPermissions):
lookup = getattr(view, "lookup_url_kwarg", None) or getattr(view, "lookup_field", None)
if lookup and lookup in view.kwargs:
return True
# Legacy behaviour:
# Allow creation of objects even without explicit permission
queryset = self._queryset(view)
required_perms = self.get_required_permissions(request.method, queryset.model)
if (
len(required_perms) == 1
and f"{queryset.model._meta.app_label}.add_{queryset.model._meta.model_name}"
in required_perms
and getattr(view, "rbac_allow_create_without_perm", False)
):
return True
return super().has_permission(request, view)
def has_object_permission(self, request: Request, view, obj: Model) -> bool:
@ -24,6 +35,10 @@ class ObjectPermissions(DjangoObjectPermissions):
# Rank global permissions higher than per-object permissions
if request.user.has_perms(perms):
return True
# Allow access for owners if configured
if owner_field := getattr(view, "owner_field", None):
if getattr(obj, owner_field) == request.user:
return True
return super().has_object_permission(request, view, obj)

View File

@ -1,10 +1,7 @@
"""Kerberos Source Serializer"""
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.viewsets import ModelViewSet
from authentik.api.authorization import OwnerFilter, OwnerSuperuserPermissions
from authentik.core.api.sources import (
GroupSourceConnectionSerializer,
GroupSourceConnectionViewSet,
@ -32,9 +29,8 @@ class UserKerberosSourceConnectionViewSet(UsedByMixin, ModelViewSet):
serializer_class = UserKerberosSourceConnectionSerializer
filterset_fields = ["source__slug"]
search_fields = ["source__slug"]
permission_classes = [OwnerSuperuserPermissions]
filter_backends = [OwnerFilter, DjangoFilterBackend, OrderingFilter, SearchFilter]
ordering = ["source__slug"]
owner_field = "user"
class GroupKerberosSourceConnectionSerializer(GroupSourceConnectionSerializer):

View File

@ -1,20 +1,17 @@
"""AuthenticatorDuoStage API Views"""
from django.http import Http404
from django_filters.rest_framework.backends import DjangoFilterBackend
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiResponse, extend_schema, inline_serializer
from guardian.shortcuts import get_objects_for_user
from rest_framework import mixins
from rest_framework.decorators import action
from rest_framework.fields import CharField, ChoiceField, IntegerField
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet, ModelViewSet
from structlog.stdlib import get_logger
from authentik.api.authorization import OwnerFilter, OwnerPermissions
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer
from authentik.flows.api.stages import StageSerializer
@ -189,8 +186,7 @@ class DuoDeviceViewSet(
search_fields = ["name"]
filterset_fields = ["name"]
ordering = ["name"]
permission_classes = [OwnerPermissions]
filter_backends = [OwnerFilter, DjangoFilterBackend, OrderingFilter, SearchFilter]
owner_field = "user"
class DuoAdminDeviceViewSet(ModelViewSet):

View File

@ -1,11 +1,8 @@
"""AuthenticatorSMSStage API Views"""
from django_filters.rest_framework.backends import DjangoFilterBackend
from rest_framework import mixins
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.viewsets import GenericViewSet, ModelViewSet
from authentik.api.authorization import OwnerFilter, OwnerPermissions
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer
from authentik.flows.api.stages import StageSerializer
@ -65,11 +62,10 @@ class SMSDeviceViewSet(
queryset = SMSDevice.objects.all()
serializer_class = SMSDeviceSerializer
permission_classes = [OwnerPermissions]
filter_backends = [OwnerFilter, DjangoFilterBackend, OrderingFilter, SearchFilter]
search_fields = ["name"]
filterset_fields = ["name"]
ordering = ["name"]
owner_field = "user"
class SMSAdminDeviceViewSet(ModelViewSet):

View File

@ -1,11 +1,8 @@
"""AuthenticatorStaticStage API Views"""
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework import mixins
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.viewsets import GenericViewSet, ModelViewSet
from authentik.api.authorization import OwnerFilter, OwnerPermissions
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer
from authentik.flows.api.stages import StageSerializer
@ -69,11 +66,10 @@ class StaticDeviceViewSet(
queryset = StaticDevice.objects.filter(confirmed=True)
serializer_class = StaticDeviceSerializer
permission_classes = [OwnerPermissions]
filter_backends = [OwnerFilter, DjangoFilterBackend, OrderingFilter, SearchFilter]
search_fields = ["name"]
filterset_fields = ["name"]
ordering = ["name"]
owner_field = "user"
class StaticAdminDeviceViewSet(ModelViewSet):

View File

@ -1,12 +1,9 @@
"""AuthenticatorTOTPStage API Views"""
from django_filters.rest_framework.backends import DjangoFilterBackend
from rest_framework import mixins
from rest_framework.fields import ChoiceField
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.viewsets import GenericViewSet, ModelViewSet
from authentik.api.authorization import OwnerFilter, OwnerPermissions
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer
from authentik.flows.api.stages import StageSerializer
@ -61,11 +58,10 @@ class TOTPDeviceViewSet(
queryset = TOTPDevice.objects.filter(confirmed=True)
serializer_class = TOTPDeviceSerializer
permission_classes = [OwnerPermissions]
filter_backends = [OwnerFilter, DjangoFilterBackend, OrderingFilter, SearchFilter]
search_fields = ["name"]
filterset_fields = ["name"]
ordering = ["name"]
owner_field = "user"
class TOTPAdminDeviceViewSet(ModelViewSet):

View File

@ -1,11 +1,8 @@
"""AuthenticatorWebAuthnStage API Views"""
from django_filters.rest_framework.backends import DjangoFilterBackend
from rest_framework import mixins
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.viewsets import GenericViewSet, ModelViewSet
from authentik.api.authorization import OwnerFilter, OwnerPermissions
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer
from authentik.stages.authenticator_webauthn.api.device_types import WebAuthnDeviceTypeSerializer
@ -40,8 +37,7 @@ class WebAuthnDeviceViewSet(
search_fields = ["name"]
filterset_fields = ["name"]
ordering = ["name"]
permission_classes = [OwnerPermissions]
filter_backends = [OwnerFilter, DjangoFilterBackend, OrderingFilter, SearchFilter]
owner_field = "user"
class WebAuthnAdminDeviceViewSet(ModelViewSet):

View File

@ -1,12 +1,8 @@
"""ConsentStage API Views"""
from django_filters.rest_framework import DjangoFilterBackend
from guardian.utils import get_anonymous_user
from rest_framework import mixins
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.viewsets import GenericViewSet, ModelViewSet
from authentik.api.authorization import OwnerFilter, OwnerSuperuserPermissions
from authentik.core.api.applications import ApplicationSerializer
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.users import UserSerializer
@ -57,11 +53,4 @@ class UserConsentViewSet(
filterset_fields = ["user", "application"]
ordering = ["application", "expires"]
search_fields = ["user__username"]
permission_classes = [OwnerSuperuserPermissions]
filter_backends = [OwnerFilter, DjangoFilterBackend, OrderingFilter, SearchFilter]
def get_queryset(self):
user = self.request.user if self.request else get_anonymous_user()
if user.is_superuser:
return super().get_queryset()
return super().get_queryset().filter(user=user.pk)
owner_field = "user"

View File

@ -8,11 +8,11 @@ from django.http import HttpResponseNotFound
from django.http.request import urljoin
from django.utils.timezone import now
from drf_spectacular.utils import OpenApiResponse, extend_schema
from rest_framework import permissions
from rest_framework.authentication import get_authorization_header
from rest_framework.decorators import action
from rest_framework.fields import CharField, IntegerField
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.permissions import BasePermission
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.serializers import DateTimeField, ModelSerializer
@ -27,7 +27,7 @@ from authentik.recovery.lib import create_admin_group, create_recovery_token
from authentik.tenants.models import Tenant
class TenantApiKeyPermission(permissions.BasePermission):
class TenantApiKeyPermission(BasePermission):
"""Authentication based on tenants.api_key"""
def has_permission(self, request: Request, view: View) -> bool: