
whenever a user tries to access an application without being authenticated to passbook, we now show notice which application they are going to continue to.
134 lines
5.0 KiB
Python
134 lines
5.0 KiB
Python
"""passbook OIDC Views"""
|
|
from django.contrib.auth.mixins import LoginRequiredMixin
|
|
from django.http import HttpRequest, HttpResponse, JsonResponse
|
|
from django.shortcuts import get_object_or_404, reverse
|
|
from django.views import View
|
|
from oidc_provider.lib.endpoints.authorize import AuthorizeEndpoint
|
|
from oidc_provider.lib.utils.common import get_issuer, get_site_url
|
|
from oidc_provider.models import ResponseType
|
|
from oidc_provider.views import AuthorizeView
|
|
from structlog import get_logger
|
|
|
|
from passbook.core.models import Application
|
|
from passbook.flows.models import in_memory_stage
|
|
from passbook.flows.planner import (
|
|
PLAN_CONTEXT_APPLICATION,
|
|
PLAN_CONTEXT_SSO,
|
|
FlowPlan,
|
|
FlowPlanner,
|
|
)
|
|
from passbook.flows.stage import StageView
|
|
from passbook.flows.views import SESSION_KEY_PLAN
|
|
from passbook.lib.utils.urls import redirect_with_qs
|
|
from passbook.policies.mixins import PolicyAccessMixin
|
|
from passbook.providers.oidc.models import OpenIDProvider
|
|
from passbook.stages.consent.stage import PLAN_CONTEXT_CONSENT_TEMPLATE
|
|
|
|
LOGGER = get_logger()
|
|
|
|
PLAN_CONTEXT_PARAMS = "params"
|
|
PLAN_CONTEXT_SCOPES = "scopes"
|
|
|
|
|
|
class AuthorizationFlowInitView(PolicyAccessMixin, LoginRequiredMixin, View):
|
|
"""OIDC Flow initializer, checks access to application and starts flow"""
|
|
|
|
# pylint: disable=unused-argument
|
|
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
|
|
"""Check access to application, start FlowPLanner, return to flow executor shell"""
|
|
client_id = request.GET.get("client_id")
|
|
provider = get_object_or_404(OpenIDProvider, oidc_client__client_id=client_id)
|
|
try:
|
|
application = self.provider_to_application(provider)
|
|
except Application.DoesNotExist:
|
|
return self.handle_no_permission_authorized()
|
|
# Check if user is unauthenticated, so we pass the application
|
|
# for the identification stage
|
|
if not request.user.is_authenticated:
|
|
return self.handle_no_permission(application)
|
|
# Check permissions
|
|
result = self.user_has_access(application)
|
|
if not result.passing:
|
|
return self.handle_no_permission_authorized()
|
|
# Extract params so we can save them in the plan context
|
|
endpoint = AuthorizeEndpoint(request)
|
|
# Regardless, we start the planner and return to it
|
|
planner = FlowPlanner(provider.authorization_flow)
|
|
# planner.use_cache = False
|
|
planner.allow_empty_flows = True
|
|
plan = planner.plan(
|
|
self.request,
|
|
{
|
|
PLAN_CONTEXT_SSO: True,
|
|
PLAN_CONTEXT_APPLICATION: application,
|
|
PLAN_CONTEXT_PARAMS: endpoint.params,
|
|
PLAN_CONTEXT_SCOPES: endpoint.get_scopes_information(),
|
|
PLAN_CONTEXT_CONSENT_TEMPLATE: "providers/oidc/consent.html",
|
|
},
|
|
)
|
|
plan.append(in_memory_stage(OIDCStage))
|
|
self.request.session[SESSION_KEY_PLAN] = plan
|
|
return redirect_with_qs(
|
|
"passbook_flows:flow-executor-shell",
|
|
self.request.GET,
|
|
flow_slug=provider.authorization_flow.slug,
|
|
)
|
|
|
|
|
|
class FlowAuthorizeEndpoint(AuthorizeEndpoint):
|
|
"""Restore params from flow context"""
|
|
|
|
def _extract_params(self):
|
|
plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
|
|
self.params = plan.context[PLAN_CONTEXT_PARAMS]
|
|
|
|
|
|
class OIDCStage(AuthorizeView, StageView):
|
|
"""Finall stage, restores params from Flow."""
|
|
|
|
authorize_endpoint_class = FlowAuthorizeEndpoint
|
|
|
|
|
|
class ProviderInfoView(View):
|
|
"""Custom ProviderInfo View which shows our URLs instead"""
|
|
|
|
# pylint: disable=unused-argument
|
|
def get(self, request, *args, **kwargs):
|
|
"""Custom ProviderInfo View which shows our URLs instead"""
|
|
dic = dict()
|
|
|
|
site_url = get_site_url(request=request)
|
|
dic["issuer"] = get_issuer(site_url=site_url, request=request)
|
|
|
|
dic["authorization_endpoint"] = site_url + reverse(
|
|
"passbook_providers_oidc:authorize"
|
|
)
|
|
dic["token_endpoint"] = site_url + reverse("oidc_provider:token")
|
|
dic["userinfo_endpoint"] = site_url + reverse("oidc_provider:userinfo")
|
|
dic["end_session_endpoint"] = site_url + reverse("oidc_provider:end-session")
|
|
dic["introspection_endpoint"] = site_url + reverse(
|
|
"oidc_provider:token-introspection"
|
|
)
|
|
|
|
types_supported = [
|
|
response_type.value for response_type in ResponseType.objects.all()
|
|
]
|
|
dic["response_types_supported"] = types_supported
|
|
|
|
dic["jwks_uri"] = site_url + reverse("oidc_provider:jwks")
|
|
|
|
dic["id_token_signing_alg_values_supported"] = ["HS256", "RS256"]
|
|
|
|
# See: http://openid.net/specs/openid-connect-core-1_0.html#SubjectIDTypes
|
|
dic["subject_types_supported"] = ["public"]
|
|
|
|
dic["token_endpoint_auth_methods_supported"] = [
|
|
"client_secret_post",
|
|
"client_secret_basic",
|
|
]
|
|
|
|
response = JsonResponse(dic)
|
|
response["Access-Control-Allow-Origin"] = "*"
|
|
|
|
return response
|