core: add API to directly send recovery link to user
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
@ -1,13 +1,16 @@
|
||||
"""User API Views"""
|
||||
from json import loads
|
||||
from typing import Optional
|
||||
|
||||
from django.db.models.query import QuerySet
|
||||
from django.urls import reverse_lazy
|
||||
from django.utils.http import urlencode
|
||||
from django.utils.translation import gettext as _
|
||||
from django_filters.filters import BooleanFilter, CharFilter
|
||||
from django_filters.filterset import FilterSet
|
||||
from drf_spectacular.utils import extend_schema, extend_schema_field
|
||||
from guardian.utils import get_anonymous_user
|
||||
from drf_spectacular.types import OpenApiTypes
|
||||
from drf_spectacular.utils import OpenApiParameter, extend_schema, extend_schema_field
|
||||
from guardian.shortcuts import get_anonymous_user, get_objects_for_user
|
||||
from rest_framework.decorators import action
|
||||
from rest_framework.fields import CharField, JSONField, SerializerMethodField
|
||||
from rest_framework.permissions import IsAuthenticated
|
||||
@ -17,10 +20,12 @@ from rest_framework.serializers import (
|
||||
BooleanField,
|
||||
ListSerializer,
|
||||
ModelSerializer,
|
||||
Serializer,
|
||||
ValidationError,
|
||||
)
|
||||
from rest_framework.viewsets import ModelViewSet
|
||||
from rest_framework_guardian.filters import ObjectPermissionsFilter
|
||||
from structlog.stdlib import get_logger
|
||||
|
||||
from authentik.admin.api.metrics import CoordinateSerializer, get_events_per_1h
|
||||
from authentik.api.decorators import permission_required
|
||||
@ -30,8 +35,13 @@ from authentik.core.api.utils import LinkSerializer, PassiveSerializer, is_dict
|
||||
from authentik.core.middleware import SESSION_IMPERSONATE_ORIGINAL_USER, SESSION_IMPERSONATE_USER
|
||||
from authentik.core.models import Token, TokenIntents, User
|
||||
from authentik.events.models import EventAction
|
||||
from authentik.stages.email.models import EmailStage
|
||||
from authentik.stages.email.tasks import send_mails
|
||||
from authentik.stages.email.utils import TemplateEmailMessage
|
||||
from authentik.tenants.models import Tenant
|
||||
|
||||
LOGGER = get_logger()
|
||||
|
||||
|
||||
class UserSerializer(ModelSerializer):
|
||||
"""User Serializer"""
|
||||
@ -171,6 +181,28 @@ class UserViewSet(UsedByMixin, ModelViewSet):
|
||||
def get_queryset(self): # pragma: no cover
|
||||
return User.objects.all().exclude(pk=get_anonymous_user().pk)
|
||||
|
||||
def _create_recovery_link(self) -> tuple[Optional[str], Optional[Token]]:
|
||||
"""Create a recovery link (when the current tenant has a recovery flow set),
|
||||
that can either be shown to an admin or sent to the user directly"""
|
||||
tenant: Tenant = self.request._request.tenant
|
||||
# Check that there is a recovery flow, if not return an error
|
||||
flow = tenant.flow_recovery
|
||||
if not flow:
|
||||
LOGGER.debug("No recovery flow set")
|
||||
return None, None
|
||||
user: User = self.get_object()
|
||||
token, __ = Token.objects.get_or_create(
|
||||
identifier=f"{user.uid}-password-reset",
|
||||
user=user,
|
||||
intent=TokenIntents.INTENT_RECOVERY,
|
||||
)
|
||||
querystring = urlencode({"token": token.key})
|
||||
link = self.request.build_absolute_uri(
|
||||
reverse_lazy("authentik_core:if-flow", kwargs={"flow_slug": flow.slug})
|
||||
+ f"?{querystring}"
|
||||
)
|
||||
return link, token
|
||||
|
||||
@extend_schema(responses={200: SessionUserSerializer(many=False)})
|
||||
@action(detail=False, pagination_class=None, filter_backends=[])
|
||||
# pylint: disable=invalid-name
|
||||
@ -226,24 +258,60 @@ class UserViewSet(UsedByMixin, ModelViewSet):
|
||||
# pylint: disable=invalid-name, unused-argument
|
||||
def recovery(self, request: Request, pk: int) -> Response:
|
||||
"""Create a temporary link that a user can use to recover their accounts"""
|
||||
tenant: Tenant = request._request.tenant
|
||||
# Check that there is a recovery flow, if not return an error
|
||||
flow = tenant.flow_recovery
|
||||
if not flow:
|
||||
link, _ = self._create_recovery_link()
|
||||
if not link:
|
||||
LOGGER.debug("Couldn't create token")
|
||||
return Response({"link": ""}, status=404)
|
||||
user: User = self.get_object()
|
||||
token, __ = Token.objects.get_or_create(
|
||||
identifier=f"{user.uid}-password-reset",
|
||||
user=user,
|
||||
intent=TokenIntents.INTENT_RECOVERY,
|
||||
)
|
||||
querystring = urlencode({"token": token.key})
|
||||
link = request.build_absolute_uri(
|
||||
reverse_lazy("authentik_core:if-flow", kwargs={"flow_slug": flow.slug})
|
||||
+ f"?{querystring}"
|
||||
)
|
||||
return Response({"link": link})
|
||||
|
||||
@permission_required("authentik_core.reset_user_password")
|
||||
@extend_schema(
|
||||
parameters=[
|
||||
OpenApiParameter(
|
||||
name="email_stage",
|
||||
location=OpenApiParameter.QUERY,
|
||||
type=OpenApiTypes.STR,
|
||||
required=True,
|
||||
)
|
||||
],
|
||||
responses={
|
||||
"204": Serializer(),
|
||||
"404": Serializer(),
|
||||
},
|
||||
)
|
||||
@action(detail=True, pagination_class=None, filter_backends=[])
|
||||
# pylint: disable=invalid-name, unused-argument
|
||||
def recovery_email(self, request: Request, pk: int) -> Response:
|
||||
"""Create a temporary link that a user can use to recover their accounts"""
|
||||
for_user = self.get_object()
|
||||
if for_user.email == "":
|
||||
LOGGER.debug("User doesn't have an email address")
|
||||
return Response(status=404)
|
||||
link, token = self._create_recovery_link()
|
||||
if not link:
|
||||
LOGGER.debug("Couldn't create token")
|
||||
return Response(status=404)
|
||||
# Lookup the email stage to assure the current user can access it
|
||||
stages = get_objects_for_user(
|
||||
request.user, "authentik_stages_email.view_emailstage"
|
||||
).filter(pk=request.query_params.get("email_stage"))
|
||||
if not stages.exists():
|
||||
LOGGER.debug("Email stage does not exist/user has no permissions")
|
||||
return Response(status=404)
|
||||
email_stage: EmailStage = stages.first()
|
||||
message = TemplateEmailMessage(
|
||||
subject=_(email_stage.subject),
|
||||
template_name=email_stage.template,
|
||||
to=[for_user.email],
|
||||
template_context={
|
||||
"url": link,
|
||||
"user": for_user,
|
||||
"expires": token.expires,
|
||||
},
|
||||
)
|
||||
send_mails(email_stage, message)
|
||||
return Response(status=204)
|
||||
|
||||
def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet:
|
||||
"""Custom filter_queryset method which ignores guardian, but still supports sorting"""
|
||||
for backend in list(self.filter_backends):
|
||||
|
||||
Reference in New Issue
Block a user