93 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			93 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """passbook OAuth2 OpenID Userinfo views"""
 | |
| from typing import Any, Dict, List
 | |
| 
 | |
| from django.http import HttpRequest, HttpResponse
 | |
| from django.utils.translation import ugettext_lazy as _
 | |
| from django.views import View
 | |
| from structlog import get_logger
 | |
| 
 | |
| from passbook.providers.oauth2.constants import (
 | |
|     SCOPE_GITHUB_ORG_READ,
 | |
|     SCOPE_GITHUB_USER,
 | |
|     SCOPE_GITHUB_USER_EMAIL,
 | |
|     SCOPE_GITHUB_USER_READ,
 | |
| )
 | |
| from passbook.providers.oauth2.models import RefreshToken, ScopeMapping
 | |
| from passbook.providers.oauth2.utils import TokenResponse, cors_allow_any
 | |
| 
 | |
| LOGGER = get_logger()
 | |
| 
 | |
| 
 | |
| class UserInfoView(View):
 | |
|     """Create a dictionary with all the requested claims about the End-User.
 | |
|     See: http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse"""
 | |
| 
 | |
|     def get_scope_descriptions(self, scopes: List[str]) -> List[str]:
 | |
|         """Get a list of all Scopes's descriptions"""
 | |
|         scope_descriptions = []
 | |
|         for scope in ScopeMapping.objects.filter(scope_name__in=scopes).order_by(
 | |
|             "scope_name"
 | |
|         ):
 | |
|             if scope.description != "":
 | |
|                 scope_descriptions.append(scope.description)
 | |
|         # GitHub Compatibility Scopes are handeled differently, since they required custom paths
 | |
|         # Hence they don't exist as Scope objects
 | |
|         github_scope_map = {
 | |
|             SCOPE_GITHUB_USER: _("GitHub Compatibility: Access your User Information"),
 | |
|             SCOPE_GITHUB_USER_READ: _(
 | |
|                 "GitHub Compatibility: Access your User Information"
 | |
|             ),
 | |
|             SCOPE_GITHUB_USER_EMAIL: _(
 | |
|                 "GitHub Compatibility: Access you Email addresses"
 | |
|             ),
 | |
|             SCOPE_GITHUB_ORG_READ: _("GitHub Compatibility: Access your Groups"),
 | |
|         }
 | |
|         for scope in scopes:
 | |
|             if scope in github_scope_map:
 | |
|                 scope_descriptions.append(github_scope_map[scope])
 | |
|         return scope_descriptions
 | |
| 
 | |
|     def get_claims(self, token: RefreshToken) -> Dict[str, Any]:
 | |
|         """Get a dictionary of claims from scopes that the token
 | |
|         requires and are assigned to the provider."""
 | |
| 
 | |
|         scopes_from_client = token.scope
 | |
|         final_claims = {}
 | |
|         for scope in ScopeMapping.objects.filter(
 | |
|             provider=token.provider, scope_name__in=scopes_from_client
 | |
|         ).order_by("scope_name"):
 | |
|             value = scope.evaluate(
 | |
|                 user=token.user,
 | |
|                 request=self.request,
 | |
|                 provider=token.provider,
 | |
|                 token=token,
 | |
|             )
 | |
|             if value is None:
 | |
|                 continue
 | |
|             if not isinstance(value, dict):
 | |
|                 LOGGER.warning(
 | |
|                     "Scope returned a non-dict value, ignoring",
 | |
|                     scope=scope,
 | |
|                     value=value,
 | |
|                 )
 | |
|                 continue
 | |
|             LOGGER.debug("updated scope", scope=scope)
 | |
|             final_claims.update(value)
 | |
|         return final_claims
 | |
| 
 | |
|     def options(self, request: HttpRequest) -> HttpResponse:
 | |
|         return cors_allow_any(self.request, TokenResponse({}))
 | |
| 
 | |
|     def get(self, request: HttpRequest, **kwargs) -> HttpResponse:
 | |
|         """Handle GET Requests for UserInfo"""
 | |
|         token: RefreshToken = kwargs["token"]
 | |
|         claims = self.get_claims(token)
 | |
|         claims["sub"] = token.id_token.sub
 | |
|         response = TokenResponse(claims)
 | |
|         cors_allow_any(self.request, response)
 | |
|         return response
 | |
| 
 | |
|     def post(self, request: HttpRequest, **kwargs) -> HttpResponse:
 | |
|         """POST Requests behave the same as GET Requests, so the get handler is called here"""
 | |
|         return self.get(request, **kwargs)
 | 
