core: add key field to token for easier rotation
This commit is contained in:
@ -1,43 +1,54 @@
|
||||
"""API Authentication"""
|
||||
from base64 import b64decode
|
||||
from typing import Any, Tuple, Union
|
||||
from typing import Any, Optional, Tuple, Union
|
||||
|
||||
from django.utils.translation import gettext as _
|
||||
from rest_framework import HTTP_HEADER_ENCODING, exceptions
|
||||
from rest_framework.authentication import BaseAuthentication, get_authorization_header
|
||||
from rest_framework.request import Request
|
||||
from structlog import get_logger
|
||||
|
||||
from passbook.core.models import Token, TokenIntents, User
|
||||
|
||||
LOGGER = get_logger()
|
||||
|
||||
|
||||
def token_from_header(raw_header: bytes) -> Optional[Token]:
|
||||
"""raw_header in the Format of `Basic dGVzdDp0ZXN0`"""
|
||||
auth_credentials = raw_header.decode()
|
||||
# Accept headers with Type format and without
|
||||
if " " in auth_credentials:
|
||||
auth_type, auth_credentials = auth_credentials.split()
|
||||
if auth_type.lower() != "basic":
|
||||
LOGGER.debug(
|
||||
"Unsupported authentication type, denying", type=auth_type.lower()
|
||||
)
|
||||
return None
|
||||
auth_credentials = b64decode(auth_credentials.encode()).decode()
|
||||
# Accept credentials with username and without
|
||||
if ":" in auth_credentials:
|
||||
_, password = auth_credentials.split(":")
|
||||
else:
|
||||
password = auth_credentials
|
||||
if password == "":
|
||||
return None
|
||||
tokens = Token.filter_not_expired(key=password, intent=TokenIntents.INTENT_API)
|
||||
if not tokens.exists():
|
||||
LOGGER.debug("Token not found")
|
||||
return None
|
||||
return tokens.first()
|
||||
|
||||
|
||||
class PassbookTokenAuthentication(BaseAuthentication):
|
||||
"""Token-based authentication using HTTP Basic authentication"""
|
||||
|
||||
def authenticate(self, request: Request) -> Union[Tuple[User, Any], None]:
|
||||
"""Token-based authentication using HTTP Basic authentication"""
|
||||
auth = get_authorization_header(request).split()
|
||||
auth = get_authorization_header(request)
|
||||
|
||||
if not auth or auth[0].lower() != b"basic":
|
||||
token = token_from_header(auth)
|
||||
if not token:
|
||||
return None
|
||||
|
||||
if len(auth) == 1:
|
||||
msg = _("Invalid basic header. No credentials provided.")
|
||||
raise exceptions.AuthenticationFailed(msg)
|
||||
if len(auth) > 2:
|
||||
msg = _(
|
||||
"Invalid basic header. Credentials string should not contain spaces."
|
||||
)
|
||||
raise exceptions.AuthenticationFailed(msg)
|
||||
|
||||
header_data = b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(":")
|
||||
|
||||
tokens = Token.filter_not_expired(
|
||||
token_uuid=header_data[2], intent=TokenIntents.INTENT_API
|
||||
)
|
||||
if not tokens.exists():
|
||||
raise exceptions.AuthenticationFailed(_("Invalid token."))
|
||||
|
||||
return (tokens.first().user, None)
|
||||
return (token.user, None)
|
||||
|
||||
def authenticate_header(self, request: Request) -> str:
|
||||
return 'Basic realm="passbook"'
|
||||
|
Reference in New Issue
Block a user