114 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			114 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""passbook OAuth2 Token Introspection Views"""
 | 
						|
from dataclasses import InitVar, dataclass
 | 
						|
from typing import Optional
 | 
						|
 | 
						|
from django.http import HttpRequest, HttpResponse
 | 
						|
from django.views import View
 | 
						|
from structlog import get_logger
 | 
						|
 | 
						|
from passbook.providers.oauth2.constants import SCOPE_OPENID_INTROSPECTION
 | 
						|
from passbook.providers.oauth2.errors import TokenIntrospectionError
 | 
						|
from passbook.providers.oauth2.models import IDToken, OAuth2Provider, RefreshToken
 | 
						|
from passbook.providers.oauth2.utils import TokenResponse, extract_client_auth
 | 
						|
 | 
						|
LOGGER = get_logger()
 | 
						|
 | 
						|
 | 
						|
@dataclass
 | 
						|
class TokenIntrospectionParams:
 | 
						|
    """Parameters for Token Introspection"""
 | 
						|
 | 
						|
    client_id: str
 | 
						|
    client_secret: str
 | 
						|
 | 
						|
    raw_token: InitVar[str]
 | 
						|
 | 
						|
    token: Optional[RefreshToken] = None
 | 
						|
 | 
						|
    provider: Optional[OAuth2Provider] = None
 | 
						|
    id_token: Optional[IDToken] = None
 | 
						|
 | 
						|
    def __post_init__(self, raw_token: str):
 | 
						|
        try:
 | 
						|
            self.token = RefreshToken.objects.get(access_token=raw_token)
 | 
						|
        except RefreshToken.DoesNotExist:
 | 
						|
            LOGGER.debug("Token does not exist", token=raw_token)
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
        if self.token.has_expired():
 | 
						|
            LOGGER.debug("Token is not valid", token=raw_token)
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
        try:
 | 
						|
            self.provider = OAuth2Provider.objects.get(
 | 
						|
                client_id=self.client_id, client_secret=self.client_secret,
 | 
						|
            )
 | 
						|
        except OAuth2Provider.DoesNotExist:
 | 
						|
            LOGGER.debug("provider for ID not found", client_id=self.client_id)
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
        if SCOPE_OPENID_INTROSPECTION not in self.provider.scope_names:
 | 
						|
            LOGGER.debug(
 | 
						|
                "OAuth2Provider does not have introspection scope",
 | 
						|
                client_id=self.client_id,
 | 
						|
            )
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
 | 
						|
        self.id_token = self.token.id_token
 | 
						|
 | 
						|
        if not self.token.id_token:
 | 
						|
            LOGGER.debug(
 | 
						|
                "token not an authentication token", token=self.token,
 | 
						|
            )
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
 | 
						|
        audience = self.token.id_token.aud
 | 
						|
        if not audience:
 | 
						|
            LOGGER.debug(
 | 
						|
                "No audience found for token", token=self.token,
 | 
						|
            )
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
 | 
						|
        if audience not in self.provider.scope_names:
 | 
						|
            LOGGER.debug(
 | 
						|
                "provider does not audience scope",
 | 
						|
                client_id=self.client_id,
 | 
						|
                audience=audience,
 | 
						|
            )
 | 
						|
            raise TokenIntrospectionError()
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def from_request(request: HttpRequest) -> "TokenIntrospectionParams":
 | 
						|
        """Extract required Parameters from HTTP Request"""
 | 
						|
        # Introspection only supports POST requests
 | 
						|
        client_id, client_secret = extract_client_auth(request)
 | 
						|
        return TokenIntrospectionParams(
 | 
						|
            raw_token=request.POST.get("token"),
 | 
						|
            client_id=client_id,
 | 
						|
            client_secret=client_secret,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
class TokenIntrospectionView(View):
 | 
						|
    """Token Introspection
 | 
						|
    https://tools.ietf.org/html/rfc7662"""
 | 
						|
 | 
						|
    token: RefreshToken
 | 
						|
    params: TokenIntrospectionParams
 | 
						|
    provider: OAuth2Provider
 | 
						|
    id_token: IDToken
 | 
						|
 | 
						|
    def post(self, request: HttpRequest) -> HttpResponse:
 | 
						|
        """Introspection handler"""
 | 
						|
        self.params = TokenIntrospectionParams.from_request(request)
 | 
						|
 | 
						|
        try:
 | 
						|
            response_dic = {}
 | 
						|
            if self.id_token:
 | 
						|
                token_dict = self.id_token.to_dict()
 | 
						|
                for k in ("aud", "sub", "exp", "iat", "iss"):
 | 
						|
                    response_dic[k] = token_dict[k]
 | 
						|
            response_dic["active"] = True
 | 
						|
            response_dic["client_id"] = self.token.provider.client_id
 | 
						|
 | 
						|
            return TokenResponse(response_dic)
 | 
						|
        except TokenIntrospectionError:
 | 
						|
            return TokenResponse({"active": False})
 |