125 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			125 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""passbook OAuth2 Token Introspection Views"""
 | 
						|
from dataclasses import dataclass, field
 | 
						|
 | 
						|
from django.http import HttpRequest, HttpResponse
 | 
						|
from django.views import View
 | 
						|
from structlog import get_logger
 | 
						|
 | 
						|
from passbook.providers.oauth2.errors import TokenIntrospectionError
 | 
						|
from passbook.providers.oauth2.models import IDToken, OAuth2Provider, RefreshToken
 | 
						|
from passbook.providers.oauth2.utils import (
 | 
						|
    TokenResponse,
 | 
						|
    extract_access_token,
 | 
						|
    extract_client_auth,
 | 
						|
)
 | 
						|
 | 
						|
LOGGER = get_logger()
 | 
						|
 | 
						|
 | 
						|
@dataclass
 | 
						|
class TokenIntrospectionParams:
 | 
						|
    """Parameters for Token Introspection"""
 | 
						|
 | 
						|
    token: RefreshToken
 | 
						|
 | 
						|
    provider: OAuth2Provider = field(init=False)
 | 
						|
    id_token: IDToken = field(init=False)
 | 
						|
 | 
						|
    def __post_init__(self):
 | 
						|
        if self.token.is_expired:
 | 
						|
            LOGGER.debug("Token is not valid")
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
 | 
						|
        self.provider = self.token.provider
 | 
						|
        self.id_token = self.token.id_token
 | 
						|
 | 
						|
        if not self.token.id_token:
 | 
						|
            LOGGER.debug(
 | 
						|
                "token not an authentication token",
 | 
						|
                token=self.token,
 | 
						|
            )
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
 | 
						|
    def authenticate_basic(self, request: HttpRequest) -> bool:
 | 
						|
        """Attempt to authenticate via Basic auth of client_id:client_secret"""
 | 
						|
        client_id, client_secret = extract_client_auth(request)
 | 
						|
        if client_id == client_secret == "":
 | 
						|
            return False
 | 
						|
        if (
 | 
						|
            client_id != self.provider.client_id
 | 
						|
            or client_secret != self.provider.client_secret
 | 
						|
        ):
 | 
						|
            LOGGER.debug("(basic) Provider for basic auth does not exist")
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
        return True
 | 
						|
 | 
						|
    def authenticate_bearer(self, request: HttpRequest) -> bool:
 | 
						|
        """Attempt to authenticate via token sent as bearer header"""
 | 
						|
        body_token = extract_access_token(request)
 | 
						|
        if not body_token:
 | 
						|
            return False
 | 
						|
        tokens = RefreshToken.objects.filter(access_token=body_token).select_related(
 | 
						|
            "provider"
 | 
						|
        )
 | 
						|
        if not tokens.exists():
 | 
						|
            LOGGER.debug("(bearer) Token does not exist")
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
        if tokens.first().provider != self.provider:
 | 
						|
            LOGGER.debug("(bearer) Token providers don't match")
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
        return True
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def from_request(request: HttpRequest) -> "TokenIntrospectionParams":
 | 
						|
        """Extract required Parameters from HTTP Request"""
 | 
						|
        raw_token = request.POST.get("token")
 | 
						|
        token_type_hint = request.POST.get("token_type_hint", "access_token")
 | 
						|
        token_filter = {token_type_hint: raw_token}
 | 
						|
 | 
						|
        if token_type_hint not in ["access_token", "refresh_token"]:
 | 
						|
            LOGGER.debug("token_type_hint has invalid value", value=token_type_hint)
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
 | 
						|
        try:
 | 
						|
            token: RefreshToken = RefreshToken.objects.select_related("provider").get(
 | 
						|
                **token_filter
 | 
						|
            )
 | 
						|
        except RefreshToken.DoesNotExist:
 | 
						|
            LOGGER.debug("Token does not exist", token=raw_token)
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
 | 
						|
        params = TokenIntrospectionParams(token=token)
 | 
						|
        if not any(
 | 
						|
            [params.authenticate_basic(request), params.authenticate_bearer(request)]
 | 
						|
        ):
 | 
						|
            LOGGER.debug("Not authenticated")
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
        return params
 | 
						|
 | 
						|
 | 
						|
class TokenIntrospectionView(View):
 | 
						|
    """Token Introspection
 | 
						|
    https://tools.ietf.org/html/rfc7662"""
 | 
						|
 | 
						|
    token: RefreshToken
 | 
						|
    params: TokenIntrospectionParams
 | 
						|
    provider: OAuth2Provider
 | 
						|
    id_token: IDToken
 | 
						|
 | 
						|
    def post(self, request: HttpRequest) -> HttpResponse:
 | 
						|
        """Introspection handler"""
 | 
						|
        try:
 | 
						|
            self.params = TokenIntrospectionParams.from_request(request)
 | 
						|
 | 
						|
            response_dic = {}
 | 
						|
            if self.params.id_token:
 | 
						|
                token_dict = self.params.id_token.to_dict()
 | 
						|
                for k in ("aud", "sub", "exp", "iat", "iss"):
 | 
						|
                    response_dic[k] = token_dict[k]
 | 
						|
            response_dic["active"] = True
 | 
						|
            response_dic["client_id"] = self.params.token.provider.client_id
 | 
						|
 | 
						|
            return TokenResponse(response_dic)
 | 
						|
        except TokenIntrospectionError:
 | 
						|
            return TokenResponse({"active": False})
 |