263 lines
		
	
	
		
			9.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			263 lines
		
	
	
		
			9.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """passbook SAML IDP Views"""
 | |
| from typing import Optional
 | |
| 
 | |
| from django.contrib.auth import logout
 | |
| from django.contrib.auth.mixins import AccessMixin
 | |
| from django.core.exceptions import ValidationError
 | |
| from django.core.validators import URLValidator
 | |
| from django.http import HttpRequest, HttpResponse, HttpResponseBadRequest
 | |
| from django.shortcuts import get_object_or_404, redirect, render, reverse
 | |
| from django.utils.datastructures import MultiValueDictKeyError
 | |
| from django.utils.decorators import method_decorator
 | |
| from django.utils.translation import gettext as _
 | |
| from django.views import View
 | |
| from django.views.decorators.csrf import csrf_exempt
 | |
| from signxml.util import strip_pem_header
 | |
| from structlog import get_logger
 | |
| 
 | |
| from passbook.audit.models import Event, EventAction
 | |
| from passbook.core.models import Application
 | |
| from passbook.lib.mixins import CSRFExemptMixin
 | |
| from passbook.lib.utils.template import render_to_string
 | |
| from passbook.policies.engine import PolicyEngine
 | |
| from passbook.providers.saml import exceptions
 | |
| from passbook.providers.saml.models import SAMLProvider
 | |
| 
 | |
| LOGGER = get_logger()
 | |
| URL_VALIDATOR = URLValidator(schemes=("http", "https"))
 | |
| 
 | |
| 
 | |
| def _generate_response(request: HttpRequest, provider: SAMLProvider) -> HttpResponse:
 | |
|     """Generate a SAML response using processor_instance and return it in the proper Django
 | |
|     response."""
 | |
|     try:
 | |
|         provider.processor.init_deep_link(request, "")
 | |
|         ctx = provider.processor.generate_response()
 | |
|         ctx["remote"] = provider
 | |
|         ctx["is_login"] = True
 | |
|     except exceptions.UserNotAuthorized:
 | |
|         return render(request, "saml/idp/invalid_user.html")
 | |
| 
 | |
|     return render(request, "saml/idp/login.html", ctx)
 | |
| 
 | |
| 
 | |
| class AccessRequiredView(AccessMixin, View):
 | |
|     """Mixin class for Views using a provider instance"""
 | |
| 
 | |
|     _provider: Optional[SAMLProvider] = None
 | |
| 
 | |
|     @property
 | |
|     def provider(self) -> SAMLProvider:
 | |
|         """Get provider instance"""
 | |
|         if not self._provider:
 | |
|             application = get_object_or_404(
 | |
|                 Application, slug=self.kwargs["application"]
 | |
|             )
 | |
|             self._provider = get_object_or_404(SAMLProvider, pk=application.provider_id)
 | |
|         return self._provider
 | |
| 
 | |
|     def _has_access(self) -> bool:
 | |
|         """Check if user has access to application"""
 | |
|         LOGGER.debug(
 | |
|             "_has_access", user=self.request.user, app=self.provider.application
 | |
|         )
 | |
|         policy_engine = PolicyEngine(
 | |
|             self.provider.application.policies.all(), self.request.user, self.request
 | |
|         )
 | |
|         policy_engine.build()
 | |
|         return policy_engine.passing
 | |
| 
 | |
|     def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 | |
|         if not request.user.is_authenticated:
 | |
|             return self.handle_no_permission()
 | |
|         if not self._has_access():
 | |
|             return render(
 | |
|                 request,
 | |
|                 "login/denied.html",
 | |
|                 {
 | |
|                     "title": _("You don't have access to this application"),
 | |
|                     "is_login": True,
 | |
|                 },
 | |
|             )
 | |
|         return super().dispatch(request, *args, **kwargs)
 | |
| 
 | |
| 
 | |
| class LoginBeginView(AccessRequiredView):
 | |
|     """Receives a SAML 2.0 AuthnRequest from a Service Provider and
 | |
|     stores it in the session prior to enforcing login."""
 | |
| 
 | |
|     @method_decorator(csrf_exempt)
 | |
|     def dispatch(self, request: HttpRequest, application: str) -> HttpResponse:
 | |
|         if request.method == "POST":
 | |
|             source = request.POST
 | |
|         else:
 | |
|             source = request.GET
 | |
| 
 | |
|         # Store these values now, because Django's login cycle won't preserve them.
 | |
|         try:
 | |
|             request.session["SAMLRequest"] = source["SAMLRequest"]
 | |
|         except (KeyError, MultiValueDictKeyError):
 | |
|             return HttpResponseBadRequest("the SAML request payload is missing")
 | |
| 
 | |
|         request.session["RelayState"] = source.get("RelayState", "")
 | |
|         return redirect(
 | |
|             reverse(
 | |
|                 "passbook_providers_saml:saml-login-process",
 | |
|                 kwargs={"application": application},
 | |
|             )
 | |
|         )
 | |
| 
 | |
| 
 | |
| class RedirectToSPView(AccessRequiredView):
 | |
|     """Return autosubmit form"""
 | |
| 
 | |
|     def get(
 | |
|         self, request: HttpRequest, acs_url: str, saml_response: str, relay_state: str
 | |
|     ) -> HttpResponse:
 | |
|         """Return autosubmit form"""
 | |
|         return render(
 | |
|             request,
 | |
|             "core/autosubmit_form.html",
 | |
|             {
 | |
|                 "url": acs_url,
 | |
|                 "attrs": {"SAMLResponse": saml_response, "RelayState": relay_state},
 | |
|             },
 | |
|         )
 | |
| 
 | |
| 
 | |
| class LoginProcessView(AccessRequiredView):
 | |
|     """Processor-based login continuation.
 | |
|     Presents a SAML 2.0 Assertion for POSTing back to the Service Provider."""
 | |
| 
 | |
|     # pylint: disable=unused-argument
 | |
|     def get(self, request: HttpRequest, application: str) -> HttpResponse:
 | |
|         """Handle get request, i.e. render form"""
 | |
|         # User access gets checked in dispatch
 | |
|         if self.provider.application.skip_authorization:
 | |
|             ctx = self.provider.processor.generate_response()
 | |
|             # Log Application Authorization
 | |
|             Event.new(
 | |
|                 EventAction.AUTHORIZE_APPLICATION,
 | |
|                 authorized_application=self.provider.application,
 | |
|                 skipped_authorization=True,
 | |
|             ).from_http(request)
 | |
|             return RedirectToSPView.as_view()(
 | |
|                 request=request,
 | |
|                 acs_url=ctx["acs_url"],
 | |
|                 saml_response=ctx["saml_response"],
 | |
|                 relay_state=ctx["relay_state"],
 | |
|             )
 | |
|         try:
 | |
|             return _generate_response(request, self.provider)
 | |
|         except exceptions.CannotHandleAssertion as exc:
 | |
|             LOGGER.debug(exc)
 | |
|         return HttpResponseBadRequest()
 | |
| 
 | |
|     # pylint: disable=unused-argument
 | |
|     def post(self, request: HttpRequest, application: str) -> HttpResponse:
 | |
|         """Handle post request, return back to ACS"""
 | |
|         # User access gets checked in dispatch
 | |
|         if request.POST.get("ACSUrl", None):
 | |
|             # User accepted request
 | |
|             Event.new(
 | |
|                 EventAction.AUTHORIZE_APPLICATION,
 | |
|                 authorized_application=self.provider.application,
 | |
|                 skipped_authorization=False,
 | |
|             ).from_http(request)
 | |
|             return RedirectToSPView.as_view()(
 | |
|                 request=request,
 | |
|                 acs_url=request.POST.get("ACSUrl"),
 | |
|                 saml_response=request.POST.get("SAMLResponse"),
 | |
|                 relay_state=request.POST.get("RelayState"),
 | |
|             )
 | |
|         try:
 | |
|             return _generate_response(request, self.provider)
 | |
|         except exceptions.CannotHandleAssertion as exc:
 | |
|             LOGGER.debug(exc)
 | |
|         return HttpResponseBadRequest()
 | |
| 
 | |
| 
 | |
| class LogoutView(CSRFExemptMixin, AccessRequiredView):
 | |
|     """Allows a non-SAML 2.0 URL to log out the user and
 | |
|     returns a standard logged-out page. (SalesForce and others use this method,
 | |
|     though it's technically not SAML 2.0)."""
 | |
| 
 | |
|     # pylint: disable=unused-argument
 | |
|     def get(self, request: HttpRequest, application: str) -> HttpResponse:
 | |
|         """Perform logout"""
 | |
|         logout(request)
 | |
| 
 | |
|         redirect_url = request.GET.get("redirect_to", "")
 | |
| 
 | |
|         try:
 | |
|             URL_VALIDATOR(redirect_url)
 | |
|         except ValidationError:
 | |
|             pass
 | |
|         else:
 | |
|             return redirect(redirect_url)
 | |
| 
 | |
|         return render(request, "saml/idp/logged_out.html")
 | |
| 
 | |
| 
 | |
| class SLOLogout(CSRFExemptMixin, AccessRequiredView):
 | |
|     """Receives a SAML 2.0 LogoutRequest from a Service Provider,
 | |
|     logs out the user and returns a standard logged-out page."""
 | |
| 
 | |
|     # pylint: disable=unused-argument
 | |
|     def post(self, request: HttpRequest, application: str) -> HttpResponse:
 | |
|         """Perform logout"""
 | |
|         request.session["SAMLRequest"] = request.POST["SAMLRequest"]
 | |
|         # TODO: Parse SAML LogoutRequest from POST data, similar to login_process().
 | |
|         # TODO: Modify the base processor to handle logouts?
 | |
|         # TODO: Combine this with login_process(), since they are so very similar?
 | |
|         # TODO: Format a LogoutResponse and return it to the browser.
 | |
|         # XXX: For now, simply log out without validating the request.
 | |
|         logout(request)
 | |
|         return render(request, "saml/idp/logged_out.html")
 | |
| 
 | |
| 
 | |
| class DescriptorDownloadView(AccessRequiredView):
 | |
|     """Replies with the XML Metadata IDSSODescriptor."""
 | |
| 
 | |
|     def get(self, request: HttpRequest, application: str) -> HttpResponse:
 | |
|         """Replies with the XML Metadata IDSSODescriptor."""
 | |
|         entity_id = self.provider.issuer
 | |
|         slo_url = request.build_absolute_uri(
 | |
|             reverse(
 | |
|                 "passbook_providers_saml:saml-logout",
 | |
|                 kwargs={"application": application},
 | |
|             )
 | |
|         )
 | |
|         sso_url = request.build_absolute_uri(
 | |
|             reverse(
 | |
|                 "passbook_providers_saml:saml-login",
 | |
|                 kwargs={"application": application},
 | |
|             )
 | |
|         )
 | |
|         pubkey = strip_pem_header(self.provider.signing_cert.replace("\r", "")).replace(
 | |
|             "\n", ""
 | |
|         )
 | |
|         ctx = {
 | |
|             "entity_id": entity_id,
 | |
|             "cert_public_key": pubkey,
 | |
|             "slo_url": slo_url,
 | |
|             "sso_url": sso_url,
 | |
|         }
 | |
|         metadata = render_to_string("saml/xml/metadata.xml", ctx)
 | |
|         response = HttpResponse(metadata, content_type="application/xml")
 | |
|         response["Content-Disposition"] = (
 | |
|             'attachment; filename="' '%s_passbook_meta.xml"' % self.provider.name
 | |
|         )
 | |
|         return response
 | |
| 
 | |
| 
 | |
| class InitiateLoginView(AccessRequiredView):
 | |
|     """IdP-initiated Login"""
 | |
| 
 | |
|     # pylint: disable=unused-argument
 | |
|     def get(self, request: HttpRequest, application: str) -> HttpResponse:
 | |
|         """Initiates an IdP-initiated link to a simple SP resource/target URL."""
 | |
|         self.provider.processor.init_deep_link(request, "")
 | |
|         self.provider.processor.is_idp_initiated = True
 | |
|         return _generate_response(request, self.provider)
 | 
