Files
authentik/authentik/sources/saml/views.py
2021-03-24 09:22:15 +01:00

179 lines
7.2 KiB
Python

"""saml sp views"""
from django.contrib.auth import logout
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import Http404, HttpRequest, HttpResponse
from django.http.response import HttpResponseBadRequest
from django.shortcuts import get_object_or_404, redirect
from django.utils.decorators import method_decorator
from django.utils.http import urlencode
from django.utils.translation import gettext_lazy as _
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from xmlsec import VerificationError
from authentik.flows.challenge import Challenge, ChallengeResponse, ChallengeTypes
from authentik.flows.models import in_memory_stage
from authentik.flows.planner import (
PLAN_CONTEXT_REDIRECT,
PLAN_CONTEXT_SOURCE,
PLAN_CONTEXT_SSO,
FlowPlanner,
)
from authentik.flows.stage import ChallengeStageView
from authentik.flows.views import NEXT_ARG_NAME, SESSION_KEY_GET, SESSION_KEY_PLAN
from authentik.lib.utils.urls import redirect_with_qs
from authentik.lib.views import bad_request_message
from authentik.providers.saml.utils.encoding import nice64
from authentik.providers.saml.views.flows import AutosubmitChallenge
from authentik.sources.saml.exceptions import (
MissingSAMLResponse,
UnsupportedNameIDFormat,
)
from authentik.sources.saml.models import SAMLBindingTypes, SAMLSource
from authentik.sources.saml.processors.metadata import MetadataProcessor
from authentik.sources.saml.processors.request import RequestProcessor
from authentik.sources.saml.processors.response import ResponseProcessor
from authentik.stages.consent.stage import (
PLAN_CONTEXT_CONSENT_HEADER,
PLAN_CONTEXT_CONSENT_TITLE,
ConsentStageView,
)
PLAN_CONTEXT_TITLE = "title"
PLAN_CONTEXT_URL = "url"
PLAN_CONTEXT_ATTRS = "attrs"
class AutosubmitStageView(ChallengeStageView):
"""Wrapper stage to create an autosubmit challenge from plan context variables"""
def get_challenge(self, *args, **kwargs) -> Challenge:
return AutosubmitChallenge(
data={
"type": ChallengeTypes.native.value,
"component": "ak-stage-autosubmit",
"title": self.executor.plan.context.get(PLAN_CONTEXT_TITLE, ""),
"url": self.executor.plan.context.get(PLAN_CONTEXT_URL, ""),
"attrs": self.executor.plan.context.get(PLAN_CONTEXT_ATTRS, ""),
},
)
# Since `ak-stage-autosubmit` redirects off site, we don't have anything to check
def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
return HttpResponseBadRequest()
class InitiateView(View):
"""Get the Form with SAML Request, which sends us to the IDP"""
def handle_login_flow(
self, source: SAMLSource, *stages_to_append, **kwargs
) -> HttpResponse:
"""Prepare Authentication Plan, redirect user FlowExecutor"""
# Ensure redirect is carried through when user was trying to
# authorize application
final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get(
NEXT_ARG_NAME, "authentik_core:if-admin"
)
kwargs.update(
{
PLAN_CONTEXT_SSO: True,
PLAN_CONTEXT_SOURCE: source,
PLAN_CONTEXT_REDIRECT: final_redirect,
}
)
# We run the Flow planner here so we can pass the Pending user in the context
planner = FlowPlanner(source.pre_authentication_flow)
planner.allow_empty_flows = True
plan = planner.plan(self.request, kwargs)
for stage in stages_to_append:
plan.append(stage)
self.request.session[SESSION_KEY_PLAN] = plan
return redirect_with_qs(
"authentik_core:if-flow",
self.request.GET,
flow_slug=source.pre_authentication_flow.slug,
)
def get(self, request: HttpRequest, source_slug: str) -> HttpResponse:
"""Replies with an XHTML SSO Request."""
source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
if not source.enabled:
raise Http404
relay_state = request.GET.get("next", "")
auth_n_req = RequestProcessor(source, request, relay_state)
# If the source is configured for Redirect bindings, we can just redirect there
if source.binding_type == SAMLBindingTypes.Redirect:
url_args = urlencode(auth_n_req.build_auth_n_detached())
return redirect(f"{source.sso_url}?{url_args}")
# As POST Binding we show a form
saml_request = nice64(auth_n_req.build_auth_n())
injected_stages = []
plan_kwargs = {
PLAN_CONTEXT_TITLE: _("Redirecting to %(app)s..." % {"app": source.name}),
PLAN_CONTEXT_CONSENT_TITLE: _(
"Redirecting to %(app)s..." % {"app": source.name}
),
PLAN_CONTEXT_ATTRS: {
"SAMLRequest": saml_request,
"RelayState": relay_state,
},
PLAN_CONTEXT_URL: source.sso_url,
}
# For just POST we add a consent stage,
# otherwise we default to POST_AUTO, with direct redirect
if source.binding_type == SAMLBindingTypes.POST:
injected_stages.append(in_memory_stage(ConsentStageView))
plan_kwargs[PLAN_CONTEXT_CONSENT_HEADER] = f"Continue to {source.name}"
injected_stages.append(in_memory_stage(AutosubmitStageView))
return self.handle_login_flow(
source,
*injected_stages,
**plan_kwargs,
)
@method_decorator(csrf_exempt, name="dispatch")
class ACSView(View):
"""AssertionConsumerService, consume assertion and log user in"""
def post(self, request: HttpRequest, source_slug: str) -> HttpResponse:
"""Handles a POSTed SSO Assertion and logs the user in."""
source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
if not source.enabled:
raise Http404
processor = ResponseProcessor(source)
try:
processor.parse(request)
except MissingSAMLResponse as exc:
return bad_request_message(request, str(exc))
except VerificationError as exc:
return bad_request_message(request, str(exc))
try:
return processor.prepare_flow(request)
except UnsupportedNameIDFormat as exc:
return bad_request_message(request, str(exc))
class SLOView(LoginRequiredMixin, View):
"""Single-Logout-View"""
def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse:
"""Log user out and redirect them to the IdP's SLO URL."""
source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
if not source.enabled:
raise Http404
logout(request)
return redirect(source.slo_url)
class MetadataView(View):
"""Return XML Metadata for IDP"""
def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse:
"""Replies with the XML Metadata SPSSODescriptor."""
source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
metadata = MetadataProcessor(source, request).build_entity_descriptor()
return HttpResponse(metadata, content_type="text/xml")