Compare commits

...

1 Commits

Author SHA1 Message Date
b0e6558a4f current version
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
2025-06-13 16:09:00 +02:00
14 changed files with 558 additions and 26 deletions

View File

@ -69,6 +69,7 @@ SESSION_KEY_APPLICATION_PRE = "authentik/flows/application_pre"
SESSION_KEY_GET = "authentik/flows/get"
SESSION_KEY_POST = "authentik/flows/post"
SESSION_KEY_HISTORY = "authentik/flows/history"
SESSION_KEY_AUTH_STARTED = "authentik/flows/auth_started"
QS_KEY_TOKEN = "flow_token" # nosec
QS_QUERY = "query"
@ -454,6 +455,7 @@ class FlowExecutorView(APIView):
SESSION_KEY_APPLICATION_PRE,
SESSION_KEY_PLAN,
SESSION_KEY_GET,
SESSION_KEY_AUTH_STARTED,
# We might need the initial POST payloads for later requests
# SESSION_KEY_POST,
# We don't delete the history on purpose, as a user might

View File

@ -6,7 +6,8 @@ from django.shortcuts import get_object_or_404
from ua_parser.user_agent_parser import Parse
from authentik.core.views.interface import InterfaceView
from authentik.flows.models import Flow
from authentik.flows.models import Flow, FlowDesignation
from authentik.flows.views.executor import SESSION_KEY_AUTH_STARTED
class FlowInterfaceView(InterfaceView):
@ -14,6 +15,12 @@ class FlowInterfaceView(InterfaceView):
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
flow = get_object_or_404(Flow, slug=self.kwargs.get("flow_slug"))
if (
not self.request.user.is_authenticated
and flow.designation == FlowDesignation.AUTHENTICATION
):
self.request.session[SESSION_KEY_AUTH_STARTED] = True
self.request.session.save()
kwargs["flow"] = flow
kwargs["flow_background_url"] = flow.background_url(self.request)
kwargs["inspector"] = "inspector" in self.request.GET

View File

@ -39,3 +39,4 @@ class AuthentikPoliciesConfig(ManagedAppConfig):
label = "authentik_policies"
verbose_name = "authentik Policies"
default = True
mountpoint = "policy/"

View File

@ -0,0 +1,89 @@
{% extends 'login/base_full.html' %}
{% load static %}
{% load i18n %}
{% block head %}
{{ block.super }}
<script>
let redirecting = false;
const checkAuth = async () => {
if (redirecting) return true;
const url = "{{ check_auth_url }}";
console.debug("authentik/policies/buffer: Checking authentication...");
try {
const result = await fetch(url, {
method: "HEAD",
});
if (result.status >= 400) {
return false
}
console.debug("authentik/policies/buffer: Continuing");
redirecting = true;
if ("{{ auth_req_method }}" === "post") {
document.querySelector("form").submit();
} else {
window.location.assign("{{ continue_url|escapejs }}");
}
} catch {
return false;
}
};
let timeout = 100;
let offset = 20;
let attempt = 0;
const main = async () => {
attempt += 1;
await checkAuth();
console.debug(`authentik/policies/buffer: Waiting ${timeout}ms...`);
setTimeout(main, timeout);
timeout += (offset * attempt);
if (timeout >= 2000) {
timeout = 2000;
}
}
document.addEventListener("visibilitychange", async () => {
if (document.hidden) return;
console.debug("authentik/policies/buffer: Checking authentication on tab activate...");
await checkAuth();
});
main();
</script>
{% endblock %}
{% block title %}
{% trans 'Waiting for authentication...' %} - {{ brand.branding_title }}
{% endblock %}
{% block card_title %}
{% trans 'Waiting for authentication...' %}
{% endblock %}
{% block card %}
<form class="pf-c-form" method="{{ auth_req_method }}" action="{{ continue_url }}">
{% if auth_req_method == "post" %}
{% for key, value in auth_req_body.items %}
<input type="hidden" name="{{ key }}" value="{{ value }}" />
{% endfor %}
{% endif %}
<div class="pf-c-empty-state">
<div class="pf-c-empty-state__content">
<div class="pf-c-empty-state__icon">
<span class="pf-c-spinner pf-m-xl" role="progressbar">
<span class="pf-c-spinner__clipper"></span>
<span class="pf-c-spinner__lead-ball"></span>
<span class="pf-c-spinner__tail-ball"></span>
</span>
</div>
<h1 class="pf-c-title pf-m-lg">
{% trans "You're already authenticating in another tab. This page will refresh once authentication is completed." %}
</h1>
</div>
</div>
<div class="pf-c-form__group pf-m-action">
<a href="{{ auth_req_url }}" class="pf-c-button pf-m-primary pf-m-block">
{% trans "Authenticate in this tab" %}
</a>
</div>
</form>
{% endblock %}

View File

@ -0,0 +1,121 @@
from django.contrib.auth.models import AnonymousUser
from django.contrib.sessions.middleware import SessionMiddleware
from django.http import HttpResponse
from django.test import RequestFactory, TestCase
from django.urls import reverse
from authentik.core.models import Application, Provider
from authentik.core.tests.utils import create_test_flow, create_test_user
from authentik.flows.models import FlowDesignation
from authentik.flows.planner import FlowPlan
from authentik.flows.views.executor import SESSION_KEY_PLAN
from authentik.lib.generators import generate_id
from authentik.lib.tests.utils import dummy_get_response
from authentik.policies.views import (
QS_BUFFER_ID,
SESSION_KEY_BUFFER,
BufferedPolicyAccessView,
BufferView,
PolicyAccessView,
)
class TestPolicyViews(TestCase):
"""Test PolicyAccessView"""
def setUp(self):
super().setUp()
self.factory = RequestFactory()
self.user = create_test_user()
def test_pav(self):
"""Test simple policy access view"""
provider = Provider.objects.create(
name=generate_id(),
)
app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
class TestView(PolicyAccessView):
def resolve_provider_application(self):
self.provider = provider
self.application = app
def get(self, *args, **kwargs):
return HttpResponse("foo")
req = self.factory.get("/")
req.user = self.user
res = TestView.as_view()(req)
self.assertEqual(res.status_code, 200)
self.assertEqual(res.content, b"foo")
def test_pav_buffer(self):
"""Test simple policy access view"""
provider = Provider.objects.create(
name=generate_id(),
)
app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
flow = create_test_flow(FlowDesignation.AUTHENTICATION)
class TestView(BufferedPolicyAccessView):
def resolve_provider_application(self):
self.provider = provider
self.application = app
def get(self, *args, **kwargs):
return HttpResponse("foo")
req = self.factory.get("/")
req.user = AnonymousUser()
middleware = SessionMiddleware(dummy_get_response)
middleware.process_request(req)
req.session[SESSION_KEY_PLAN] = FlowPlan(flow.pk)
req.session.save()
res = TestView.as_view()(req)
self.assertEqual(res.status_code, 302)
self.assertTrue(res.url.startswith(reverse("authentik_policies:buffer")))
def test_pav_buffer_skip(self):
"""Test simple policy access view (skip buffer)"""
provider = Provider.objects.create(
name=generate_id(),
)
app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
flow = create_test_flow(FlowDesignation.AUTHENTICATION)
class TestView(BufferedPolicyAccessView):
def resolve_provider_application(self):
self.provider = provider
self.application = app
def get(self, *args, **kwargs):
return HttpResponse("foo")
req = self.factory.get("/?skip_buffer=true")
req.user = AnonymousUser()
middleware = SessionMiddleware(dummy_get_response)
middleware.process_request(req)
req.session[SESSION_KEY_PLAN] = FlowPlan(flow.pk)
req.session.save()
res = TestView.as_view()(req)
self.assertEqual(res.status_code, 302)
self.assertTrue(res.url.startswith(reverse("authentik_flows:default-authentication")))
def test_buffer(self):
"""Test buffer view"""
uid = generate_id()
req = self.factory.get(f"/?{QS_BUFFER_ID}={uid}")
req.user = AnonymousUser()
middleware = SessionMiddleware(dummy_get_response)
middleware.process_request(req)
ts = generate_id()
req.session[SESSION_KEY_BUFFER % uid] = {
"method": "get",
"body": {},
"url": f"/{ts}",
}
req.session.save()
res = BufferView.as_view()(req)
self.assertEqual(res.status_code, 200)
self.assertIn(ts, res.render().content.decode())

View File

@ -1,7 +1,14 @@
"""API URLs"""
from django.urls import path
from authentik.policies.api.bindings import PolicyBindingViewSet
from authentik.policies.api.policies import PolicyViewSet
from authentik.policies.views import BufferView
urlpatterns = [
path("buffer", BufferView.as_view(), name="buffer"),
]
api_urlpatterns = [
("policies/all", PolicyViewSet),

View File

@ -1,23 +1,37 @@
"""authentik access helper classes"""
from typing import Any
from uuid import uuid4
from django.contrib import messages
from django.contrib.auth.mixins import AccessMixin
from django.contrib.auth.views import redirect_to_login
from django.http import HttpRequest, HttpResponse
from django.http import HttpRequest, HttpResponse, QueryDict
from django.shortcuts import redirect
from django.urls import reverse
from django.utils.http import urlencode
from django.utils.translation import gettext as _
from django.views.generic.base import View
from django.views.generic.base import TemplateView, View
from structlog.stdlib import get_logger
from authentik.core.models import Application, Provider, User
from authentik.flows.views.executor import SESSION_KEY_APPLICATION_PRE, SESSION_KEY_POST
from authentik.flows.models import Flow, FlowDesignation
from authentik.flows.planner import FlowPlan
from authentik.flows.views.executor import (
SESSION_KEY_APPLICATION_PRE,
SESSION_KEY_AUTH_STARTED,
SESSION_KEY_PLAN,
SESSION_KEY_POST,
)
from authentik.lib.sentry import SentryIgnoredException
from authentik.policies.denied import AccessDeniedResponse
from authentik.policies.engine import PolicyEngine
from authentik.policies.types import PolicyRequest, PolicyResult
LOGGER = get_logger()
QS_BUFFER_ID = "af_bf_id"
QS_SKIP_BUFFER = "skip_buffer"
SESSION_KEY_BUFFER = "authentik/policies/pav_buffer/%s"
class RequestValidationError(SentryIgnoredException):
@ -125,3 +139,65 @@ class PolicyAccessView(AccessMixin, View):
for message in result.messages:
messages.error(self.request, _(message))
return result
def url_with_qs(url: str, **kwargs):
"""Update/set querystring of `url` with the parameters in `kwargs`. Original query string
parameters are retained"""
if "?" not in url:
return url + f"?{urlencode(kwargs)}"
url, _, qs = url.partition("?")
qs = QueryDict(qs, mutable=True)
qs.update(kwargs)
return url + f"?{urlencode(qs.items())}"
class BufferView(TemplateView):
"""Buffer view"""
template_name = "policies/buffer.html"
def get_context_data(self, **kwargs):
buf_id = self.request.GET.get(QS_BUFFER_ID)
buffer: dict = self.request.session.get(SESSION_KEY_BUFFER % buf_id)
kwargs["auth_req_method"] = buffer["method"]
kwargs["auth_req_body"] = buffer["body"]
kwargs["auth_req_url"] = url_with_qs(buffer["url"], **{QS_SKIP_BUFFER: True})
kwargs["check_auth_url"] = reverse("authentik_api:user-me")
kwargs["continue_url"] = url_with_qs(buffer["url"], **{QS_BUFFER_ID: buf_id})
return super().get_context_data(**kwargs)
class BufferedPolicyAccessView(PolicyAccessView):
"""PolicyAccessView which buffers access requests in case the user is not logged in"""
def handle_no_permission(self):
plan: FlowPlan | None = self.request.session.get(SESSION_KEY_PLAN)
authenticating = self.request.session.get(SESSION_KEY_AUTH_STARTED)
if plan:
flow = Flow.objects.filter(pk=plan.flow_pk).first()
if not flow or flow.designation != FlowDesignation.AUTHENTICATION:
LOGGER.debug("Not buffering request, no flow or flow not for authentication")
return super().handle_no_permission()
if not plan and authenticating is None:
LOGGER.debug("Not buffering request, no flow plan active")
return super().handle_no_permission()
if self.request.GET.get(QS_SKIP_BUFFER):
LOGGER.debug("Not buffering request, explicit skip")
return super().handle_no_permission()
buffer_id = str(uuid4())
LOGGER.debug("Buffering access request", bf_id=buffer_id)
self.request.session[SESSION_KEY_BUFFER % buffer_id] = {
"body": self.request.POST,
"url": self.request.build_absolute_uri(self.request.get_full_path()),
"method": self.request.method.lower(),
}
return redirect(
url_with_qs(reverse("authentik_policies:buffer"), **{QS_BUFFER_ID: buffer_id})
)
def dispatch(self, request, *args, **kwargs):
response = super().dispatch(request, *args, **kwargs)
if QS_BUFFER_ID in self.request.GET:
self.request.session.pop(SESSION_KEY_BUFFER % self.request.GET[QS_BUFFER_ID], None)
return response

View File

@ -30,7 +30,7 @@ from authentik.flows.stage import StageView
from authentik.lib.utils.time import timedelta_from_string
from authentik.lib.views import bad_request_message
from authentik.policies.types import PolicyRequest
from authentik.policies.views import PolicyAccessView, RequestValidationError
from authentik.policies.views import BufferedPolicyAccessView, RequestValidationError
from authentik.providers.oauth2.constants import (
PKCE_METHOD_PLAIN,
PKCE_METHOD_S256,
@ -326,7 +326,7 @@ class OAuthAuthorizationParams:
return code
class AuthorizationFlowInitView(PolicyAccessView):
class AuthorizationFlowInitView(BufferedPolicyAccessView):
"""OAuth2 Flow initializer, checks access to application and starts flow"""
params: OAuthAuthorizationParams

View File

@ -18,11 +18,11 @@ from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, FlowPlanner
from authentik.flows.stage import RedirectStage
from authentik.lib.utils.time import timedelta_from_string
from authentik.policies.engine import PolicyEngine
from authentik.policies.views import PolicyAccessView
from authentik.policies.views import BufferedPolicyAccessView
from authentik.providers.rac.models import ConnectionToken, Endpoint, RACProvider
class RACStartView(PolicyAccessView):
class RACStartView(BufferedPolicyAccessView):
"""Start a RAC connection by checking access and creating a connection token"""
endpoint: Endpoint

View File

@ -35,8 +35,8 @@ REQUEST_KEY_SAML_SIG_ALG = "SigAlg"
REQUEST_KEY_SAML_RESPONSE = "SAMLResponse"
REQUEST_KEY_RELAY_STATE = "RelayState"
SESSION_KEY_AUTH_N_REQUEST = "authentik/providers/saml/authn_request"
SESSION_KEY_LOGOUT_REQUEST = "authentik/providers/saml/logout_request"
PLAN_CONTEXT_SAML_AUTH_N_REQUEST = "authentik/providers/saml/authn_request"
PLAN_CONTEXT_SAML_LOGOUT_REQUEST = "authentik/providers/saml/logout_request"
# This View doesn't have a URL on purpose, as its called by the FlowExecutor
@ -50,10 +50,11 @@ class SAMLFlowFinalView(ChallengeStageView):
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
application: Application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION]
provider: SAMLProvider = get_object_or_404(SAMLProvider, pk=application.provider_id)
if SESSION_KEY_AUTH_N_REQUEST not in self.request.session:
if PLAN_CONTEXT_SAML_AUTH_N_REQUEST not in self.executor.plan.context:
self.logger.warning("No AuthNRequest in context")
return self.executor.stage_invalid()
auth_n_request: AuthNRequest = self.request.session.pop(SESSION_KEY_AUTH_N_REQUEST)
auth_n_request: AuthNRequest = self.executor.plan.context[PLAN_CONTEXT_SAML_AUTH_N_REQUEST]
try:
response = AssertionProcessor(provider, request, auth_n_request).build_response()
except SAMLException as exc:
@ -106,6 +107,3 @@ class SAMLFlowFinalView(ChallengeStageView):
def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
# We'll never get here since the challenge redirects to the SP
return HttpResponseBadRequest()
def cleanup(self):
self.request.session.pop(SESSION_KEY_AUTH_N_REQUEST, None)

View File

@ -19,9 +19,9 @@ from authentik.providers.saml.exceptions import CannotHandleAssertion
from authentik.providers.saml.models import SAMLProvider
from authentik.providers.saml.processors.logout_request_parser import LogoutRequestParser
from authentik.providers.saml.views.flows import (
PLAN_CONTEXT_SAML_LOGOUT_REQUEST,
REQUEST_KEY_RELAY_STATE,
REQUEST_KEY_SAML_REQUEST,
SESSION_KEY_LOGOUT_REQUEST,
)
LOGGER = get_logger()
@ -33,6 +33,10 @@ class SAMLSLOView(PolicyAccessView):
flow: Flow
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.plan_context = {}
def resolve_provider_application(self):
self.application = get_object_or_404(Application, slug=self.kwargs["application_slug"])
self.provider: SAMLProvider = get_object_or_404(
@ -59,6 +63,7 @@ class SAMLSLOView(PolicyAccessView):
request,
{
PLAN_CONTEXT_APPLICATION: self.application,
**self.plan_context,
},
)
plan.append_stage(in_memory_stage(SessionEndStage))
@ -83,7 +88,7 @@ class SAMLSLOBindingRedirectView(SAMLSLOView):
self.request.GET[REQUEST_KEY_SAML_REQUEST],
relay_state=self.request.GET.get(REQUEST_KEY_RELAY_STATE, None),
)
self.request.session[SESSION_KEY_LOGOUT_REQUEST] = logout_request
self.plan_context[PLAN_CONTEXT_SAML_LOGOUT_REQUEST] = logout_request
except CannotHandleAssertion as exc:
Event.new(
EventAction.CONFIGURATION_ERROR,
@ -111,7 +116,7 @@ class SAMLSLOBindingPOSTView(SAMLSLOView):
payload[REQUEST_KEY_SAML_REQUEST],
relay_state=payload.get(REQUEST_KEY_RELAY_STATE, None),
)
self.request.session[SESSION_KEY_LOGOUT_REQUEST] = logout_request
self.plan_context[PLAN_CONTEXT_SAML_LOGOUT_REQUEST] = logout_request
except CannotHandleAssertion as exc:
LOGGER.info(str(exc))
return bad_request_message(self.request, str(exc))

View File

@ -15,16 +15,16 @@ from authentik.flows.models import in_memory_stage
from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO, FlowPlanner
from authentik.flows.views.executor import SESSION_KEY_POST
from authentik.lib.views import bad_request_message
from authentik.policies.views import PolicyAccessView
from authentik.policies.views import BufferedPolicyAccessView
from authentik.providers.saml.exceptions import CannotHandleAssertion
from authentik.providers.saml.models import SAMLBindings, SAMLProvider
from authentik.providers.saml.processors.authn_request_parser import AuthNRequestParser
from authentik.providers.saml.views.flows import (
PLAN_CONTEXT_SAML_AUTH_N_REQUEST,
REQUEST_KEY_RELAY_STATE,
REQUEST_KEY_SAML_REQUEST,
REQUEST_KEY_SAML_SIG_ALG,
REQUEST_KEY_SAML_SIGNATURE,
SESSION_KEY_AUTH_N_REQUEST,
SAMLFlowFinalView,
)
from authentik.stages.consent.stage import (
@ -35,10 +35,14 @@ from authentik.stages.consent.stage import (
LOGGER = get_logger()
class SAMLSSOView(PolicyAccessView):
class SAMLSSOView(BufferedPolicyAccessView):
"""SAML SSO Base View, which plans a flow and injects our final stage.
Calls get/post handler."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.plan_context = {}
def resolve_provider_application(self):
self.application = get_object_or_404(Application, slug=self.kwargs["application_slug"])
self.provider: SAMLProvider = get_object_or_404(
@ -68,6 +72,7 @@ class SAMLSSOView(PolicyAccessView):
PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
% {"application": self.application.name},
PLAN_CONTEXT_CONSENT_PERMISSIONS: [],
**self.plan_context,
},
)
except FlowNonApplicableException:
@ -83,7 +88,7 @@ class SAMLSSOView(PolicyAccessView):
def post(self, request: HttpRequest, application_slug: str) -> HttpResponse:
"""GET and POST use the same handler, but we can't
override .dispatch easily because PolicyAccessView's dispatch"""
override .dispatch easily because BufferedPolicyAccessView's dispatch"""
return self.get(request, application_slug)
@ -103,7 +108,7 @@ class SAMLSSOBindingRedirectView(SAMLSSOView):
self.request.GET.get(REQUEST_KEY_SAML_SIGNATURE),
self.request.GET.get(REQUEST_KEY_SAML_SIG_ALG),
)
self.request.session[SESSION_KEY_AUTH_N_REQUEST] = auth_n_request
self.plan_context[PLAN_CONTEXT_SAML_AUTH_N_REQUEST] = auth_n_request
except CannotHandleAssertion as exc:
Event.new(
EventAction.CONFIGURATION_ERROR,
@ -137,7 +142,7 @@ class SAMLSSOBindingPOSTView(SAMLSSOView):
payload[REQUEST_KEY_SAML_REQUEST],
payload.get(REQUEST_KEY_RELAY_STATE),
)
self.request.session[SESSION_KEY_AUTH_N_REQUEST] = auth_n_request
self.plan_context[PLAN_CONTEXT_SAML_AUTH_N_REQUEST] = auth_n_request
except CannotHandleAssertion as exc:
LOGGER.info(str(exc))
return bad_request_message(self.request, str(exc))
@ -151,4 +156,4 @@ class SAMLSSOBindingInitView(SAMLSSOView):
"""Create SAML Response from scratch"""
LOGGER.debug("No SAML Request, using IdP-initiated flow.")
auth_n_request = AuthNRequestParser(self.provider).idp_initiated()
self.request.session[SESSION_KEY_AUTH_N_REQUEST] = auth_n_request
self.plan_context[PLAN_CONTEXT_SAML_AUTH_N_REQUEST] = auth_n_request

View File

@ -410,3 +410,77 @@ class TestProviderOAuth2OAuth(SeleniumTestCase):
self.driver.find_element(By.CSS_SELECTOR, "header > h1").text,
"Permission denied",
)
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
@apply_blueprint("default/flow-default-provider-authorization-implicit-consent.yaml")
@apply_blueprint("system/providers-oauth2.yaml")
@reconcile_app("authentik_crypto")
def test_authorization_consent_implied_parallel(self):
"""test OpenID Provider flow (default authorization flow with implied consent)"""
# Bootstrap all needed objects
authorization_flow = Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
)
provider = OAuth2Provider.objects.create(
name=generate_id(),
client_type=ClientTypes.CONFIDENTIAL,
client_id=self.client_id,
client_secret=self.client_secret,
signing_key=create_test_cert(),
redirect_uris=[
RedirectURI(
RedirectURIMatchingMode.STRICT, "http://localhost:3000/login/generic_oauth"
)
],
authorization_flow=authorization_flow,
)
provider.property_mappings.set(
ScopeMapping.objects.filter(
scope_name__in=[
SCOPE_OPENID,
SCOPE_OPENID_EMAIL,
SCOPE_OPENID_PROFILE,
SCOPE_OFFLINE_ACCESS,
]
)
)
Application.objects.create(
name=generate_id(),
slug=self.app_slug,
provider=provider,
)
self.driver.get(self.live_server_url)
login_window = self.driver.current_window_handle
self.driver.switch_to.new_window("tab")
grafana_window = self.driver.current_window_handle
self.driver.get("http://localhost:3000")
self.driver.find_element(By.CLASS_NAME, "btn-service--oauth").click()
self.driver.switch_to.window(login_window)
self.login()
self.driver.switch_to.window(grafana_window)
self.wait_for_url("http://localhost:3000/?orgId=1")
self.driver.get("http://localhost:3000/profile")
self.assertEqual(
self.driver.find_element(By.CLASS_NAME, "page-header__title").text,
self.user.name,
)
self.assertEqual(
self.driver.find_element(By.CSS_SELECTOR, "input[name=name]").get_attribute("value"),
self.user.name,
)
self.assertEqual(
self.driver.find_element(By.CSS_SELECTOR, "input[name=email]").get_attribute("value"),
self.user.email,
)
self.assertEqual(
self.driver.find_element(By.CSS_SELECTOR, "input[name=login]").get_attribute("value"),
self.user.email,
)

View File

@ -20,7 +20,7 @@ from tests.e2e.utils import SeleniumTestCase, retry
class TestProviderSAML(SeleniumTestCase):
"""test SAML Provider flow"""
def setup_client(self, provider: SAMLProvider, force_post: bool = False):
def setup_client(self, provider: SAMLProvider, force_post: bool = False, **kwargs):
"""Setup client saml-sp container which we test SAML against"""
metadata_url = (
self.url(
@ -40,6 +40,7 @@ class TestProviderSAML(SeleniumTestCase):
"SP_ENTITY_ID": provider.issuer,
"SP_SSO_BINDING": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST",
"SP_METADATA_URL": metadata_url,
**kwargs,
},
)
@ -111,6 +112,74 @@ class TestProviderSAML(SeleniumTestCase):
[self.user.email],
)
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
@apply_blueprint(
"default/flow-default-provider-authorization-implicit-consent.yaml",
)
@apply_blueprint(
"system/providers-saml.yaml",
)
@reconcile_app("authentik_crypto")
def test_sp_initiated_implicit_post(self):
"""test SAML Provider flow SP-initiated flow (implicit consent)"""
# Bootstrap all needed objects
authorization_flow = Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
)
provider: SAMLProvider = SAMLProvider.objects.create(
name="saml-test",
acs_url="http://localhost:9009/saml/acs",
audience="authentik-e2e",
issuer="authentik-e2e",
sp_binding=SAMLBindings.POST,
authorization_flow=authorization_flow,
signing_kp=create_test_cert(),
)
provider.property_mappings.set(SAMLPropertyMapping.objects.all())
provider.save()
Application.objects.create(
name="SAML",
slug="authentik-saml",
provider=provider,
)
self.setup_client(provider, True)
self.driver.get("http://localhost:9009")
self.login()
self.wait_for_url("http://localhost:9009/")
body = loads(self.driver.find_element(By.CSS_SELECTOR, "pre").text)
self.assertEqual(
body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name"],
[self.user.name],
)
self.assertEqual(
body["attr"][
"http://schemas.microsoft.com/ws/2008/06/identity/claims/windowsaccountname"
],
[self.user.username],
)
self.assertEqual(
body["attr"]["http://schemas.goauthentik.io/2021/02/saml/username"],
[self.user.username],
)
self.assertEqual(
body["attr"]["http://schemas.goauthentik.io/2021/02/saml/uid"],
[str(self.user.pk)],
)
self.assertEqual(
body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress"],
[self.user.email],
)
self.assertEqual(
body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/upn"],
[self.user.email],
)
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
@ -450,3 +519,81 @@ class TestProviderSAML(SeleniumTestCase):
lambda driver: driver.current_url.startswith(should_url),
f"URL {self.driver.current_url} doesn't match expected URL {should_url}",
)
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
@apply_blueprint(
"default/flow-default-provider-authorization-implicit-consent.yaml",
)
@apply_blueprint(
"system/providers-saml.yaml",
)
@reconcile_app("authentik_crypto")
def test_sp_initiated_implicit_post_buffer(self):
"""test SAML Provider flow SP-initiated flow (implicit consent)"""
# Bootstrap all needed objects
authorization_flow = Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
)
provider: SAMLProvider = SAMLProvider.objects.create(
name="saml-test",
acs_url=f"http://{self.host}:9009/saml/acs",
audience="authentik-e2e",
issuer="authentik-e2e",
sp_binding=SAMLBindings.POST,
authorization_flow=authorization_flow,
signing_kp=create_test_cert(),
)
provider.property_mappings.set(SAMLPropertyMapping.objects.all())
provider.save()
Application.objects.create(
name="SAML",
slug="authentik-saml",
provider=provider,
)
self.setup_client(provider, True, SP_ROOT_URL=f"http://{self.host}:9009")
self.driver.get(self.live_server_url)
login_window = self.driver.current_window_handle
self.driver.switch_to.new_window("tab")
client_window = self.driver.current_window_handle
# We need to access the SP on the same host as the IdP for SameSite cookies
self.driver.get(f"http://{self.host}:9009")
self.driver.switch_to.window(login_window)
self.login()
self.driver.switch_to.window(client_window)
self.wait_for_url(f"http://{self.host}:9009/")
body = loads(self.driver.find_element(By.CSS_SELECTOR, "pre").text)
self.assertEqual(
body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name"],
[self.user.name],
)
self.assertEqual(
body["attr"][
"http://schemas.microsoft.com/ws/2008/06/identity/claims/windowsaccountname"
],
[self.user.username],
)
self.assertEqual(
body["attr"]["http://schemas.goauthentik.io/2021/02/saml/username"],
[self.user.username],
)
self.assertEqual(
body["attr"]["http://schemas.goauthentik.io/2021/02/saml/uid"],
[str(self.user.pk)],
)
self.assertEqual(
body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress"],
[self.user.email],
)
self.assertEqual(
body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/upn"],
[self.user.email],
)