flows: fix mismatched redirect behaviour for invalid and valid flows (#8794)

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens L
2024-03-04 18:46:57 +01:00
committed by GitHub
parent f0c33ef1bf
commit 1be50bcdb2
2 changed files with 75 additions and 7 deletions

View File

@ -1,6 +1,7 @@
"""flow views tests""" """flow views tests"""
from unittest.mock import MagicMock, PropertyMock, patch from unittest.mock import MagicMock, PropertyMock, patch
from urllib.parse import urlencode
from django.http import HttpRequest, HttpResponse from django.http import HttpRequest, HttpResponse
from django.test.client import RequestFactory from django.test.client import RequestFactory
@ -18,7 +19,12 @@ from authentik.flows.models import (
from authentik.flows.planner import FlowPlan, FlowPlanner from authentik.flows.planner import FlowPlan, FlowPlanner
from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, StageView from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, StageView
from authentik.flows.tests import FlowTestCase from authentik.flows.tests import FlowTestCase
from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_PLAN, FlowExecutorView from authentik.flows.views.executor import (
NEXT_ARG_NAME,
QS_QUERY,
SESSION_KEY_PLAN,
FlowExecutorView,
)
from authentik.lib.generators import generate_id from authentik.lib.generators import generate_id
from authentik.policies.dummy.models import DummyPolicy from authentik.policies.dummy.models import DummyPolicy
from authentik.policies.models import PolicyBinding from authentik.policies.models import PolicyBinding
@ -121,16 +127,73 @@ class TestFlowExecutor(FlowTestCase):
TO_STAGE_RESPONSE_MOCK, TO_STAGE_RESPONSE_MOCK,
) )
def test_invalid_flow_redirect(self): def test_invalid_flow_redirect(self):
"""Tests that an invalid flow still redirects""" """Test invalid flow with valid redirect destination"""
flow = create_test_flow( flow = create_test_flow(
FlowDesignation.AUTHENTICATION, FlowDesignation.AUTHENTICATION,
) )
dest = "/unique-string" dest = "/unique-string"
url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
response = self.client.get(url + f"?{NEXT_ARG_NAME}={dest}") response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse("authentik_core:root-redirect")) self.assertEqual(response.url, "/unique-string")
@patch(
"authentik.flows.views.executor.to_stage_response",
TO_STAGE_RESPONSE_MOCK,
)
def test_invalid_flow_invalid_redirect(self):
"""Test invalid flow redirect with an invalid URL"""
flow = create_test_flow(
FlowDesignation.AUTHENTICATION,
)
dest = "http://something.example.com/unique-string"
url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
self.assertEqual(response.status_code, 200)
self.assertStageResponse(
response,
flow,
component="ak-stage-access-denied",
error_message="Invalid next URL",
)
@patch(
"authentik.flows.views.executor.to_stage_response",
TO_STAGE_RESPONSE_MOCK,
)
def test_valid_flow_redirect(self):
"""Test valid flow with valid redirect destination"""
flow = create_test_flow()
dest = "/unique-string"
url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, "/unique-string")
@patch(
"authentik.flows.views.executor.to_stage_response",
TO_STAGE_RESPONSE_MOCK,
)
def test_valid_flow_invalid_redirect(self):
"""Test valid flow redirect with an invalid URL"""
flow = create_test_flow()
dest = "http://something.example.com/unique-string"
url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
self.assertEqual(response.status_code, 200)
self.assertStageResponse(
response,
flow,
component="ak-stage-access-denied",
error_message="Invalid next URL",
)
@patch( @patch(
"authentik.flows.views.executor.to_stage_response", "authentik.flows.views.executor.to_stage_response",

View File

@ -11,6 +11,7 @@ from django.shortcuts import get_object_or_404, redirect
from django.template.response import TemplateResponse from django.template.response import TemplateResponse
from django.urls import reverse from django.urls import reverse
from django.utils.decorators import method_decorator from django.utils.decorators import method_decorator
from django.utils.translation import gettext as _
from django.views.decorators.clickjacking import xframe_options_sameorigin from django.views.decorators.clickjacking import xframe_options_sameorigin
from django.views.generic import View from django.views.generic import View
from drf_spectacular.types import OpenApiTypes from drf_spectacular.types import OpenApiTypes
@ -176,6 +177,8 @@ class FlowExecutorView(APIView):
self.cancel() self.cancel()
self._logger.debug("f(exec): Continuing existing plan") self._logger.debug("f(exec): Continuing existing plan")
# Initial flow request, check if we have an upstream query string passed in
request.session[SESSION_KEY_GET] = get_params
# Don't check session again as we've either already loaded the plan or we need to plan # Don't check session again as we've either already loaded the plan or we need to plan
if not self.plan: if not self.plan:
request.session[SESSION_KEY_HISTORY] = [] request.session[SESSION_KEY_HISTORY] = []
@ -190,8 +193,6 @@ class FlowExecutorView(APIView):
# To match behaviour with loading an empty flow plan from cache, # To match behaviour with loading an empty flow plan from cache,
# we don't show an error message here, but rather call _flow_done() # we don't show an error message here, but rather call _flow_done()
return self._flow_done() return self._flow_done()
# Initial flow request, check if we have an upstream query string passed in
request.session[SESSION_KEY_GET] = get_params
# We don't save the Plan after getting the next stage # We don't save the Plan after getting the next stage
# as it hasn't been successfully passed yet # as it hasn't been successfully passed yet
try: try:
@ -390,7 +391,11 @@ class FlowExecutorView(APIView):
NEXT_ARG_NAME, "authentik_core:root-redirect" NEXT_ARG_NAME, "authentik_core:root-redirect"
) )
self.cancel() self.cancel()
return to_stage_response(self.request, redirect_with_qs(next_param)) if next_param and not is_url_absolute(next_param):
return to_stage_response(self.request, redirect_with_qs(next_param))
return to_stage_response(
self.request, self.stage_invalid(error_message=_("Invalid next URL"))
)
def stage_ok(self) -> HttpResponse: def stage_ok(self) -> HttpResponse:
"""Callback called by stages upon successful completion. """Callback called by stages upon successful completion.