134 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			134 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""passbook OAuth2 Views"""
 | 
						|
from django.contrib.auth.mixins import LoginRequiredMixin
 | 
						|
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
 | 
						|
from django.shortcuts import get_object_or_404
 | 
						|
from django.views import View
 | 
						|
from oauth2_provider.exceptions import OAuthToolkitError
 | 
						|
from oauth2_provider.scopes import get_scopes_backend
 | 
						|
from oauth2_provider.views.base import AuthorizationView
 | 
						|
from structlog import get_logger
 | 
						|
 | 
						|
from passbook.audit.models import Event, EventAction
 | 
						|
from passbook.core.models import Application
 | 
						|
from passbook.flows.models import in_memory_stage
 | 
						|
from passbook.flows.planner import (
 | 
						|
    PLAN_CONTEXT_APPLICATION,
 | 
						|
    PLAN_CONTEXT_SSO,
 | 
						|
    FlowPlanner,
 | 
						|
)
 | 
						|
from passbook.flows.stage import StageView
 | 
						|
from passbook.flows.views import SESSION_KEY_PLAN
 | 
						|
from passbook.lib.utils.urls import redirect_with_qs
 | 
						|
from passbook.policies.mixins import PolicyAccessMixin
 | 
						|
from passbook.providers.oauth.models import OAuth2Provider
 | 
						|
from passbook.stages.consent.stage import PLAN_CONTEXT_CONSENT_TEMPLATE
 | 
						|
 | 
						|
LOGGER = get_logger()
 | 
						|
 | 
						|
PLAN_CONTEXT_CLIENT_ID = "client_id"
 | 
						|
PLAN_CONTEXT_REDIRECT_URI = "redirect_uri"
 | 
						|
PLAN_CONTEXT_RESPONSE_TYPE = "response_type"
 | 
						|
PLAN_CONTEXT_STATE = "state"
 | 
						|
 | 
						|
PLAN_CONTEXT_CODE_CHALLENGE = "code_challenge"
 | 
						|
PLAN_CONTEXT_CODE_CHALLENGE_METHOD = "code_challenge_method"
 | 
						|
PLAN_CONTEXT_SCOPE = "scope"
 | 
						|
PLAN_CONTEXT_NONCE = "nonce"
 | 
						|
PLAN_CONTEXT_SCOPE_DESCRIPTION = "scope_descriptions"
 | 
						|
 | 
						|
 | 
						|
class AuthorizationFlowInitView(PolicyAccessMixin, LoginRequiredMixin, View):
 | 
						|
    """OAuth2 Flow initializer, checks access to application and starts flow"""
 | 
						|
 | 
						|
    # pylint: disable=unused-argument
 | 
						|
    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 | 
						|
        """Check access to application, start FlowPLanner, return to flow executor shell"""
 | 
						|
        client_id = request.GET.get("client_id")
 | 
						|
        provider = get_object_or_404(OAuth2Provider, client_id=client_id)
 | 
						|
        try:
 | 
						|
            application = self.provider_to_application(provider)
 | 
						|
        except Application.DoesNotExist:
 | 
						|
            return self.handle_no_permission_authorized()
 | 
						|
        # Check permissions
 | 
						|
        result = self.user_has_access(application)
 | 
						|
        if not result.passing:
 | 
						|
            return self.handle_no_permission_authorized()
 | 
						|
        # Regardless, we start the planner and return to it
 | 
						|
        planner = FlowPlanner(provider.authorization_flow)
 | 
						|
        planner.allow_empty_flows = True
 | 
						|
        # Save scope descriptions
 | 
						|
        scopes = request.GET.get(PLAN_CONTEXT_SCOPE)
 | 
						|
        all_scopes = get_scopes_backend().get_all_scopes()
 | 
						|
 | 
						|
        plan = planner.plan(
 | 
						|
            self.request,
 | 
						|
            {
 | 
						|
                PLAN_CONTEXT_SSO: True,
 | 
						|
                PLAN_CONTEXT_APPLICATION: application,
 | 
						|
                PLAN_CONTEXT_CLIENT_ID: client_id,
 | 
						|
                PLAN_CONTEXT_REDIRECT_URI: request.GET.get(PLAN_CONTEXT_REDIRECT_URI),
 | 
						|
                PLAN_CONTEXT_RESPONSE_TYPE: request.GET.get(PLAN_CONTEXT_RESPONSE_TYPE),
 | 
						|
                PLAN_CONTEXT_STATE: request.GET.get(PLAN_CONTEXT_STATE),
 | 
						|
                PLAN_CONTEXT_SCOPE: scopes,
 | 
						|
                PLAN_CONTEXT_NONCE: request.GET.get(PLAN_CONTEXT_NONCE),
 | 
						|
                PLAN_CONTEXT_SCOPE_DESCRIPTION: [
 | 
						|
                    all_scopes[scope] for scope in scopes.split(" ")
 | 
						|
                ],
 | 
						|
                PLAN_CONTEXT_CONSENT_TEMPLATE: "providers/oauth/consent.html",
 | 
						|
            },
 | 
						|
        )
 | 
						|
 | 
						|
        plan.append(in_memory_stage(OAuth2Stage))
 | 
						|
        self.request.session[SESSION_KEY_PLAN] = plan
 | 
						|
        return redirect_with_qs(
 | 
						|
            "passbook_flows:flow-executor-shell",
 | 
						|
            self.request.GET,
 | 
						|
            flow_slug=provider.authorization_flow.slug,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
class OAuth2Stage(AuthorizationView, StageView):
 | 
						|
    """OAuth2 Stage, dynamically injected into the plan"""
 | 
						|
 | 
						|
    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 | 
						|
        """Last stage in flow, finalizes OAuth Response and redirects to Client"""
 | 
						|
        application: Application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION]
 | 
						|
        provider: OAuth2Provider = application.provider
 | 
						|
 | 
						|
        Event.new(
 | 
						|
            EventAction.AUTHORIZE_APPLICATION, authorized_application=application,
 | 
						|
        ).from_http(self.request)
 | 
						|
 | 
						|
        credentials = {
 | 
						|
            "client_id": self.executor.plan.context[PLAN_CONTEXT_CLIENT_ID],
 | 
						|
            "redirect_uri": self.executor.plan.context[PLAN_CONTEXT_REDIRECT_URI],
 | 
						|
            "response_type": self.executor.plan.context.get(
 | 
						|
                PLAN_CONTEXT_RESPONSE_TYPE, None
 | 
						|
            ),
 | 
						|
            "state": self.executor.plan.context.get(PLAN_CONTEXT_STATE, None),
 | 
						|
            "nonce": self.executor.plan.context.get(PLAN_CONTEXT_NONCE, None),
 | 
						|
        }
 | 
						|
        if self.executor.plan.context.get(PLAN_CONTEXT_CODE_CHALLENGE, False):
 | 
						|
            credentials[PLAN_CONTEXT_CODE_CHALLENGE] = self.executor.plan.context.get(
 | 
						|
                PLAN_CONTEXT_CODE_CHALLENGE
 | 
						|
            )
 | 
						|
        if self.executor.plan.context.get(PLAN_CONTEXT_CODE_CHALLENGE_METHOD, False):
 | 
						|
            credentials[
 | 
						|
                PLAN_CONTEXT_CODE_CHALLENGE_METHOD
 | 
						|
            ] = self.executor.plan.context.get(PLAN_CONTEXT_CODE_CHALLENGE_METHOD)
 | 
						|
        scopes = self.executor.plan.context.get(PLAN_CONTEXT_SCOPE)
 | 
						|
 | 
						|
        try:
 | 
						|
            uri, _headers, _body, _status = self.create_authorization_response(
 | 
						|
                request=self.request,
 | 
						|
                scopes=scopes,
 | 
						|
                credentials=credentials,
 | 
						|
                allow=True,
 | 
						|
            )
 | 
						|
            LOGGER.debug("Success url for the request: {0}".format(uri))
 | 
						|
        except OAuthToolkitError as error:
 | 
						|
            return self.error_response(error, provider)
 | 
						|
 | 
						|
        self.executor.stage_ok()
 | 
						|
        return HttpResponseRedirect(self.redirect(uri, provider).url)
 |