Files
authentik/authentik/sources/saml/views.py
Jens L 13e2eea72f web/user: new end-user interface (#1404)
* web/user: migrate to top navbar

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* web/user: prepare config from server

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* re-sort

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* remove old interface

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* update issue template

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* use notification badge

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* web/user: re-add go-to-admin button

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* *: fix remaining redirects directly to admin

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* make settings better

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* api: ensure sources and stages are sorted

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* web/user: add sessions and consent

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* providers/oauth2: add post wrapper to stage

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* website/docs: add new interface to release notes

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2021-09-16 22:17:05 +02:00

195 lines
7.9 KiB
Python

"""saml sp views"""
from urllib.parse import ParseResult, parse_qsl, urlparse, urlunparse
from django.contrib.auth import logout
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import Http404, HttpRequest, HttpResponse
from django.http.response import HttpResponseBadRequest
from django.shortcuts import get_object_or_404, redirect
from django.utils.decorators import method_decorator
from django.utils.http import urlencode
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from structlog.stdlib import get_logger
from xmlsec import InternalError, VerificationError
from authentik.flows.challenge import Challenge, ChallengeResponse, ChallengeTypes
from authentik.flows.models import in_memory_stage
from authentik.flows.planner import (
PLAN_CONTEXT_REDIRECT,
PLAN_CONTEXT_SOURCE,
PLAN_CONTEXT_SSO,
FlowPlanner,
)
from authentik.flows.stage import ChallengeStageView
from authentik.flows.views import NEXT_ARG_NAME, SESSION_KEY_GET, SESSION_KEY_PLAN
from authentik.lib.utils.urls import redirect_with_qs
from authentik.lib.views import bad_request_message
from authentik.providers.saml.utils.encoding import nice64
from authentik.providers.saml.views.flows import AutosubmitChallenge
from authentik.sources.saml.exceptions import MissingSAMLResponse, UnsupportedNameIDFormat
from authentik.sources.saml.models import SAMLBindingTypes, SAMLSource
from authentik.sources.saml.processors.metadata import MetadataProcessor
from authentik.sources.saml.processors.request import RequestProcessor
from authentik.sources.saml.processors.response import ResponseProcessor
from authentik.stages.consent.stage import (
PLAN_CONTEXT_CONSENT_HEADER,
PLAN_CONTEXT_CONSENT_TITLE,
ConsentStageView,
)
PLAN_CONTEXT_TITLE = "title"
PLAN_CONTEXT_URL = "url"
PLAN_CONTEXT_ATTRS = "attrs"
LOGGER = get_logger()
class AutosubmitStageView(ChallengeStageView):
"""Wrapper stage to create an autosubmit challenge from plan context variables"""
def get_challenge(self, *args, **kwargs) -> Challenge:
return AutosubmitChallenge(
data={
"type": ChallengeTypes.NATIVE.value,
"component": "ak-stage-autosubmit",
"title": self.executor.plan.context.get(PLAN_CONTEXT_TITLE, ""),
"url": self.executor.plan.context.get(PLAN_CONTEXT_URL, ""),
"attrs": self.executor.plan.context.get(PLAN_CONTEXT_ATTRS, ""),
},
)
# Since `ak-stage-autosubmit` redirects off site, we don't have anything to check
def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
return HttpResponseBadRequest()
class InitiateView(View):
"""Get the Form with SAML Request, which sends us to the IDP"""
def handle_login_flow(self, source: SAMLSource, *stages_to_append, **kwargs) -> HttpResponse:
"""Prepare Authentication Plan, redirect user FlowExecutor"""
# Ensure redirect is carried through when user was trying to
# authorize application
final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get(
NEXT_ARG_NAME, "authentik_core:if-user"
)
kwargs.update(
{
PLAN_CONTEXT_SSO: True,
PLAN_CONTEXT_SOURCE: source,
PLAN_CONTEXT_REDIRECT: final_redirect,
}
)
# We run the Flow planner here so we can pass the Pending user in the context
planner = FlowPlanner(source.pre_authentication_flow)
planner.allow_empty_flows = True
plan = planner.plan(self.request, kwargs)
for stage in stages_to_append:
plan.append_stage(stage)
self.request.session[SESSION_KEY_PLAN] = plan
return redirect_with_qs(
"authentik_core:if-flow",
self.request.GET,
flow_slug=source.pre_authentication_flow.slug,
)
def get(self, request: HttpRequest, source_slug: str) -> HttpResponse:
"""Replies with an XHTML SSO Request."""
source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
if not source.enabled:
raise Http404
relay_state = request.GET.get("next", "")
auth_n_req = RequestProcessor(source, request, relay_state)
# If the source is configured for Redirect bindings, we can just redirect there
if source.binding_type == SAMLBindingTypes.REDIRECT:
# Parse the initial SSO URL
sso_url = urlparse(source.sso_url)
# Parse the querystring into a dict...
url_kwargs = dict(parse_qsl(sso_url.query))
# ... and update it with the SAML args
url_kwargs.update(auth_n_req.build_auth_n_detached())
# Encode it back into a string
res = ParseResult(
scheme=sso_url.scheme,
netloc=sso_url.netloc,
path=sso_url.path,
params=sso_url.params,
query=urlencode(url_kwargs),
fragment=sso_url.fragment,
)
# and merge it back into a URL
final_url = urlunparse(res)
return redirect(final_url)
# As POST Binding we show a form
try:
saml_request = nice64(auth_n_req.build_auth_n())
except InternalError as exc:
LOGGER.warning(str(exc))
return bad_request_message(request, str(exc))
injected_stages = []
plan_kwargs = {
PLAN_CONTEXT_TITLE: f"Redirecting to {source.name}...",
PLAN_CONTEXT_CONSENT_TITLE: f"Redirecting to {source.name}...",
PLAN_CONTEXT_ATTRS: {
"SAMLRequest": saml_request,
"RelayState": relay_state,
},
PLAN_CONTEXT_URL: source.sso_url,
}
# For just POST we add a consent stage,
# otherwise we default to POST_AUTO, with direct redirect
if source.binding_type == SAMLBindingTypes.POST:
injected_stages.append(in_memory_stage(ConsentStageView))
plan_kwargs[PLAN_CONTEXT_CONSENT_HEADER] = f"Continue to {source.name}"
injected_stages.append(in_memory_stage(AutosubmitStageView))
return self.handle_login_flow(
source,
*injected_stages,
**plan_kwargs,
)
@method_decorator(csrf_exempt, name="dispatch")
class ACSView(View):
"""AssertionConsumerService, consume assertion and log user in"""
def post(self, request: HttpRequest, source_slug: str) -> HttpResponse:
"""Handles a POSTed SSO Assertion and logs the user in."""
source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
if not source.enabled:
raise Http404
processor = ResponseProcessor(source)
try:
processor.parse(request)
except MissingSAMLResponse as exc:
return bad_request_message(request, str(exc))
except VerificationError as exc:
return bad_request_message(request, str(exc))
try:
return processor.prepare_flow(request)
except UnsupportedNameIDFormat as exc:
return bad_request_message(request, str(exc))
class SLOView(LoginRequiredMixin, View):
"""Single-Logout-View"""
def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse:
"""Log user out and redirect them to the IdP's SLO URL."""
source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
if not source.enabled:
raise Http404
logout(request)
return redirect(source.slo_url)
class MetadataView(View):
"""Return XML Metadata for IDP"""
def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse:
"""Replies with the XML Metadata SPSSODescriptor."""
source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
metadata = MetadataProcessor(source, request).build_entity_descriptor()
return HttpResponse(metadata, content_type="text/xml")