sources/saml: start implementing transient NameID format

This commit is contained in:
Jens Langhammer
2020-06-24 21:50:30 +02:00
parent c0d8aa2303
commit 05999cb8c7
5 changed files with 107 additions and 37 deletions

View File

@ -1,5 +1,5 @@
"""passbook saml source processor"""
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Dict, Optional
from defusedxml import ElementTree
from django.http import HttpRequest, HttpResponse
@ -7,6 +7,7 @@ from signxml import XMLVerifier
from structlog import get_logger
from passbook.core.models import User
from passbook.flows.models import Flow
from passbook.flows.planner import (
PLAN_CONTEXT_PENDING_USER,
PLAN_CONTEXT_SSO,
@ -60,54 +61,98 @@ class Processor:
self._root_xml, x509_cert=self._source.signing_kp.certificate_data
)
def _get_email(self) -> Optional[str]:
"""
Returns the email out of the response.
def _handle_name_id_transient(self, request: HttpRequest) -> HttpResponse:
"""Handle a NameID with the Format of Transient. This is a bit more complex than other
formats, as we need to create a temporary User that is used in the session. This
user has an attribute that refers to our Source for cleanup. The user is also deleted
on logout and periodically."""
# Create a temporary User
name_id = self._get_name_id().text
user: User = User.objects.create(
username=name_id,
attributes={
"saml": {"source": self._source.pk.hex, "delete_on_logout": True}
},
)
LOGGER.debug("Created temporary user for NameID Transient", username=name_id)
user.set_unusable_password()
user.save()
return self._flow_response(
request,
self._source.authentication_flow,
**{
PLAN_CONTEXT_PENDING_USER: user,
PLAN_CONTEXT_AUTHENTICATION_BACKEND: DEFAULT_BACKEND,
},
)
At present, response must pass the email address as the Subject, eg.:
<saml:Subject>
<saml:NameID Format="urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress"
SPNameQualifier="">
email@example.com
</saml:NameID>
"""
def _get_name_id(self) -> "Element":
"""Get NameID Element"""
assertion = self._root.find("{urn:oasis:names:tc:SAML:2.0:assertion}Assertion")
subject = assertion.find("{urn:oasis:names:tc:SAML:2.0:assertion}Subject")
name_id = subject.find("{urn:oasis:names:tc:SAML:2.0:assertion}NameID")
name_id_format = name_id.attrib["Format"]
if name_id_format != "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress":
raise UnsupportedNameIDFormat(
f"Assertion contains NameID with unsupported format {name_id_format}."
)
return name_id.text
if name_id is None:
raise ValueError("NameID Element not found!")
return name_id
def _get_name_id_filter(self) -> Dict[str, str]:
"""Returns the subject's NameID as a Filter for the `User`"""
name_id_el = self._get_name_id()
name_id = name_id_el.text
if not name_id:
raise UnsupportedNameIDFormat(f"Subject's NameID is empty.")
_format = name_id_el.attrib["Format"]
if _format == "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress":
return {"email": name_id}
if _format == "urn:oasis:names:tc:SAML:2.0:nameid-format:persistent":
return {"username": name_id}
if _format == "urn:oasis:names:tc:SAML:2.0:nameid-format:X509SubjectName":
# This attribute is statically set by the LDAP source
return {"attributes__distinguishedName": name_id}
if (
_format
== "urn:oasis:names:tc:SAML:2.0:nameid-format:WindowsDomainQualifiedName"
):
if "\\" in name_id:
name_id = name_id.split("\\")[1]
return {"username": name_id}
raise UnsupportedNameIDFormat(
f"Assertion contains NameID with unsupported format {_format}."
)
def prepare_flow(self, request: HttpRequest) -> HttpResponse:
"""Prepare flow plan depending on whether or not the user exists"""
email = self._get_email()
matching_users = User.objects.filter(email=email)
name_id = self._get_name_id()
# transient NameIDs are handeled seperately as they don't have to go through flows.
if (
name_id.attrib["Format"]
== "urn:oasis:names:tc:SAML:2.0:nameid-format:transient"
):
return self._handle_name_id_transient(request)
name_id_filter = self._get_name_id_filter()
matching_users = User.objects.filter(**name_id_filter)
if matching_users.exists():
# User exists already, switch to authentication flow
flow = self._source.authentication_flow
request.session[SESSION_KEY_PLAN] = FlowPlanner(flow).plan(
return self._flow_response(
request,
{
# Data for authentication
self._source.authentication_flow,
**{
PLAN_CONTEXT_PENDING_USER: matching_users.first(),
PLAN_CONTEXT_AUTHENTICATION_BACKEND: DEFAULT_BACKEND,
PLAN_CONTEXT_SSO: True,
},
)
else:
flow = self._source.enrollment_flow
request.session[SESSION_KEY_PLAN] = FlowPlanner(flow).plan(
request,
{
# Data for enrollment
PLAN_CONTEXT_PROMPT: {"username": email, "email": email},
PLAN_CONTEXT_SSO: True,
},
)
return self._flow_response(
request,
self._source.enrollment_flow,
**{PLAN_CONTEXT_PROMPT: name_id_filter,},
)
def _flow_response(
self, request: HttpRequest, flow: Flow, **kwargs
) -> HttpResponse:
kwargs[PLAN_CONTEXT_SSO] = True
request.session[SESSION_KEY_PLAN] = FlowPlanner(flow).plan(request, kwargs,)
return redirect_with_qs(
"passbook_flows:flow-executor-shell", request.GET, flow_slug=flow.slug,
)