diff --git a/authentik/flows/tests/test_executor.py b/authentik/flows/tests/test_executor.py index c8e67492de..894a3ae7b2 100644 --- a/authentik/flows/tests/test_executor.py +++ b/authentik/flows/tests/test_executor.py @@ -1,6 +1,7 @@ """flow views tests""" from unittest.mock import MagicMock, PropertyMock, patch +from urllib.parse import urlencode from django.http import HttpRequest, HttpResponse from django.test.client import RequestFactory @@ -18,7 +19,12 @@ from authentik.flows.models import ( from authentik.flows.planner import FlowPlan, FlowPlanner from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, StageView from authentik.flows.tests import FlowTestCase -from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_PLAN, FlowExecutorView +from authentik.flows.views.executor import ( + NEXT_ARG_NAME, + QS_QUERY, + SESSION_KEY_PLAN, + FlowExecutorView, +) from authentik.lib.generators import generate_id from authentik.policies.dummy.models import DummyPolicy from authentik.policies.models import PolicyBinding @@ -121,16 +127,73 @@ class TestFlowExecutor(FlowTestCase): TO_STAGE_RESPONSE_MOCK, ) def test_invalid_flow_redirect(self): - """Tests that an invalid flow still redirects""" + """Test invalid flow with valid redirect destination""" flow = create_test_flow( FlowDesignation.AUTHENTICATION, ) dest = "/unique-string" url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) - response = self.client.get(url + f"?{NEXT_ARG_NAME}={dest}") + response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") self.assertEqual(response.status_code, 302) - self.assertEqual(response.url, reverse("authentik_core:root-redirect")) + self.assertEqual(response.url, "/unique-string") + + @patch( + "authentik.flows.views.executor.to_stage_response", + TO_STAGE_RESPONSE_MOCK, + ) + def test_invalid_flow_invalid_redirect(self): + """Test invalid flow redirect with an invalid URL""" + flow = create_test_flow( + FlowDesignation.AUTHENTICATION, + ) + + dest = "http://something.example.com/unique-string" + url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) + + response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") + self.assertEqual(response.status_code, 200) + self.assertStageResponse( + response, + flow, + component="ak-stage-access-denied", + error_message="Invalid next URL", + ) + + @patch( + "authentik.flows.views.executor.to_stage_response", + TO_STAGE_RESPONSE_MOCK, + ) + def test_valid_flow_redirect(self): + """Test valid flow with valid redirect destination""" + flow = create_test_flow() + + dest = "/unique-string" + url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) + + response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") + self.assertEqual(response.status_code, 302) + self.assertEqual(response.url, "/unique-string") + + @patch( + "authentik.flows.views.executor.to_stage_response", + TO_STAGE_RESPONSE_MOCK, + ) + def test_valid_flow_invalid_redirect(self): + """Test valid flow redirect with an invalid URL""" + flow = create_test_flow() + + dest = "http://something.example.com/unique-string" + url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) + + response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") + self.assertEqual(response.status_code, 200) + self.assertStageResponse( + response, + flow, + component="ak-stage-access-denied", + error_message="Invalid next URL", + ) @patch( "authentik.flows.views.executor.to_stage_response", diff --git a/authentik/flows/views/executor.py b/authentik/flows/views/executor.py index c68c8e4d43..29aba77e46 100644 --- a/authentik/flows/views/executor.py +++ b/authentik/flows/views/executor.py @@ -11,6 +11,7 @@ from django.shortcuts import get_object_or_404, redirect from django.template.response import TemplateResponse from django.urls import reverse from django.utils.decorators import method_decorator +from django.utils.translation import gettext as _ from django.views.decorators.clickjacking import xframe_options_sameorigin from django.views.generic import View from drf_spectacular.types import OpenApiTypes @@ -176,6 +177,8 @@ class FlowExecutorView(APIView): self.cancel() self._logger.debug("f(exec): Continuing existing plan") + # Initial flow request, check if we have an upstream query string passed in + request.session[SESSION_KEY_GET] = get_params # Don't check session again as we've either already loaded the plan or we need to plan if not self.plan: request.session[SESSION_KEY_HISTORY] = [] @@ -190,8 +193,6 @@ class FlowExecutorView(APIView): # To match behaviour with loading an empty flow plan from cache, # we don't show an error message here, but rather call _flow_done() return self._flow_done() - # Initial flow request, check if we have an upstream query string passed in - request.session[SESSION_KEY_GET] = get_params # We don't save the Plan after getting the next stage # as it hasn't been successfully passed yet try: @@ -390,7 +391,11 @@ class FlowExecutorView(APIView): NEXT_ARG_NAME, "authentik_core:root-redirect" ) self.cancel() - return to_stage_response(self.request, redirect_with_qs(next_param)) + if next_param and not is_url_absolute(next_param): + return to_stage_response(self.request, redirect_with_qs(next_param)) + return to_stage_response( + self.request, self.stage_invalid(error_message=_("Invalid next URL")) + ) def stage_ok(self) -> HttpResponse: """Callback called by stages upon successful completion.