providers/saml: big cleanup, simplify base processor

add New fields for
 - assertion_valid_not_before
 - assertion_valid_not_on_or_after
 - session_valid_not_on_or_after
allow flexible time durations for these fields
fall back to Provider's ACS if none is specified in AuthNRequest
This commit is contained in:
Jens Langhammer
2020-02-14 15:19:48 +01:00
parent 2be026dd44
commit e36d7928e4
19 changed files with 495 additions and 392 deletions

View File

@ -0,0 +1,18 @@
"""Small helper functions"""
import uuid
from django.http import HttpRequest, HttpResponse
from django.shortcuts import render
from django.template.context import Context
def render_xml(request: HttpRequest, template: str, ctx: Context) -> HttpResponse:
"""Render template with content_type application/xml"""
return render(request, template, context=ctx, content_type="application/xml")
def get_random_id() -> str:
"""Random hex id"""
# It is very important that these random IDs NOT start with a number.
random_id = "_" + uuid.uuid4().hex
return random_id

View File

@ -0,0 +1,84 @@
"""Create self-signed certificates"""
import datetime
import uuid
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
class CertificateBuilder:
"""Build self-signed certificates"""
__public_key = None
__private_key = None
__builder = None
__certificate = None
def __init__(self):
self.__public_key = None
self.__private_key = None
self.__builder = None
self.__certificate = None
def build(self):
"""Build self-signed certificate"""
one_day = datetime.timedelta(1, 0, 0)
self.__private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
self.__public_key = self.__private_key.public_key()
self.__builder = (
x509.CertificateBuilder()
.subject_name(
x509.Name(
[
x509.NameAttribute(
NameOID.COMMON_NAME,
u"passbook Self-signed SAML Certificate",
),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"passbook"),
x509.NameAttribute(
NameOID.ORGANIZATIONAL_UNIT_NAME, u"Self-signed"
),
]
)
)
.issuer_name(
x509.Name(
[
x509.NameAttribute(
NameOID.COMMON_NAME,
u"passbook Self-signed SAML Certificate",
),
]
)
)
.not_valid_before(datetime.datetime.today() - one_day)
.not_valid_after(datetime.datetime.today() + datetime.timedelta(days=365))
.serial_number(int(uuid.uuid4()))
.public_key(self.__public_key)
)
self.__certificate = self.__builder.sign(
private_key=self.__private_key,
algorithm=hashes.SHA256(),
backend=default_backend(),
)
@property
def private_key(self):
"""Return private key in PEM format"""
return self.__private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8")
@property
def certificate(self):
"""Return certificate in PEM format"""
return self.__certificate.public_bytes(
encoding=serialization.Encoding.PEM,
).decode("utf-8")

View File

@ -0,0 +1,21 @@
"""Wrappers to de/encode and de/inflate strings"""
import base64
import zlib
def decode_base64_and_inflate(b64string):
"""Base64 decode and ZLib decompress b64string"""
decoded_data = base64.b64decode(b64string)
return zlib.decompress(decoded_data, -15)
def deflate_and_base64_encode(string_val):
"""Base64 and ZLib Compress b64string"""
zlibbed_str = zlib.compress(string_val)
compressed_string = zlibbed_str[2:-4]
return base64.b64encode(compressed_string)
def nice64(src):
""" Returns src base64-encoded and formatted nicely for our XML. """
return base64.b64encode(src).decode("utf-8").replace("\n", "")

View File

@ -0,0 +1,45 @@
"""Time utilities"""
import datetime
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
ALLOWED_KEYS = (
"days",
"seconds",
"microseconds",
"milliseconds",
"minutes",
"hours",
"weeks",
)
def timedelta_string_validator(value: str):
"""Validator for Django that checks if value can be parsed with `timedelta_from_string`"""
try:
timedelta_from_string(value)
except ValueError as exc:
raise ValidationError(
_("%(value)s is not in the correct format of 'hours=3;minutes=1'."),
params={"value": value},
) from exc
def timedelta_from_string(expr: str) -> datetime.timedelta:
"""Convert a string with the format of 'hours=1;minute=3;seconds=5' to a
`datetime.timedelta` Object with hours = 1, minutes = 3, seconds = 5"""
kwargs = {}
for duration_pair in expr.split(";"):
key, value = duration_pair.split("=")
if key.lower() not in ALLOWED_KEYS:
continue
kwargs[key.lower()] = float(value)
return datetime.timedelta(**kwargs)
def get_time_string(delta: datetime.timedelta = None) -> str:
"""Get Data formatted in SAML format"""
now = datetime.datetime.now()
final = now + delta
return final.strftime("%Y-%m-%dT%H:%M:%SZ")

View File

@ -0,0 +1,97 @@
"""Functions for creating XML output."""
from __future__ import annotations
from typing import TYPE_CHECKING
from structlog import get_logger
from passbook.lib.utils.template import render_to_string
from passbook.providers.saml.utils.xml_signing import (
get_signature_xml,
sign_with_signxml,
)
if TYPE_CHECKING:
from passbook.providers.saml.models import SAMLProvider
LOGGER = get_logger()
def _get_attribute_statement(params):
"""Inserts AttributeStatement, if we have any attributes.
Modifies the params dict.
PRE-REQ: params['SUBJECT'] has already been created (usually by a call to
_get_subject()."""
attributes = params.get("ATTRIBUTES", [])
if not attributes:
params["ATTRIBUTE_STATEMENT"] = ""
return
# Build complete AttributeStatement.
params["ATTRIBUTE_STATEMENT"] = render_to_string(
"saml/xml/attributes.xml", {"attributes": attributes}
)
def _get_in_response_to(params):
"""Insert InResponseTo if we have a RequestID.
Modifies the params dict."""
# NOTE: I don't like this. We're mixing templating logic here, but the
# current design requires this; maybe refactor using better templates, or
# just bite the bullet and use elementtree to produce the XML; see comments
# in xml_templates about Canonical XML.
request_id = params.get("REQUEST_ID", None)
if request_id:
params["IN_RESPONSE_TO"] = 'InResponseTo="%s" ' % request_id
else:
params["IN_RESPONSE_TO"] = ""
def _get_subject(params):
"""Insert Subject. Modifies the params dict."""
params["SUBJECT_STATEMENT"] = render_to_string("saml/xml/subject.xml", params)
def get_assertion_xml(template, parameters, signed=False):
"""Get XML for Assertion"""
# Reset signature.
params = {}
params.update(parameters)
params["ASSERTION_SIGNATURE"] = ""
_get_in_response_to(params)
_get_subject(params) # must come before _get_attribute_statement()
_get_attribute_statement(params)
unsigned = render_to_string(template, params)
if not signed:
return unsigned
# Sign it.
signature_xml = get_signature_xml()
params["ASSERTION_SIGNATURE"] = signature_xml
return render_to_string(template, params)
def get_response_xml(parameters, saml_provider: SAMLProvider, assertion_id=""):
"""Returns XML for response, with signatures, if signed is True."""
# Reset signatures.
params = {}
params.update(parameters)
params["RESPONSE_SIGNATURE"] = ""
_get_in_response_to(params)
raw_response = render_to_string("saml/xml/response.xml", params)
if not saml_provider.signing:
return raw_response
signature_xml = get_signature_xml()
params["RESPONSE_SIGNATURE"] = signature_xml
signed = sign_with_signxml(
saml_provider.signing_key,
raw_response,
saml_provider.signing_cert,
reference_uri=assertion_id,
)
return signed

View File

@ -0,0 +1,31 @@
"""Signing code goes here."""
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from lxml import etree # nosec
from signxml import XMLSigner, XMLVerifier
from structlog import get_logger
from passbook.lib.utils.template import render_to_string
LOGGER = get_logger()
def sign_with_signxml(private_key, data, cert, reference_uri=None):
"""Sign Data with signxml"""
key = serialization.load_pem_private_key(
str.encode("\n".join([x.strip() for x in private_key.split("\n")])),
password=None,
backend=default_backend(),
)
# defused XML is not used here because it messes up XML namespaces
# Data is trusted, so lxml is ok
root = etree.fromstring(data) # nosec
signer = XMLSigner(c14n_algorithm="http://www.w3.org/2001/10/xml-exc-c14n#")
signed = signer.sign(root, key=key, cert=[cert], reference_uri=reference_uri)
XMLVerifier().verify(signed, x509_cert=cert)
return etree.tostring(signed).decode("utf-8") # nosec
def get_signature_xml():
"""Returns XML Signature for subject."""
return render_to_string("saml/xml/signature.xml", {})