*: add support for bearer authentication on API
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
		| @ -10,29 +10,29 @@ from structlog.stdlib import get_logger | ||||
| from authentik.core.models import Token, TokenIntents, User | ||||
|  | ||||
| LOGGER = get_logger() | ||||
| X_AUTHENTIK_PREVENT_BASIC_HEADER = "HTTP_X_AUTHENTIK_PREVENT_BASIC" | ||||
|  | ||||
|  | ||||
| def token_from_header(raw_header: bytes) -> Optional[Token]: | ||||
|     """raw_header in the Format of `Basic dGVzdDp0ZXN0`""" | ||||
|     auth_credentials = raw_header.decode() | ||||
|     # Accept headers with Type format and without | ||||
|     if " " in auth_credentials: | ||||
|         auth_type, auth_credentials = auth_credentials.split() | ||||
|         if auth_type.lower() != "basic": | ||||
|             LOGGER.debug( | ||||
|                 "Unsupported authentication type, denying", type=auth_type.lower() | ||||
|             ) | ||||
|             return None | ||||
|     try: | ||||
|         auth_credentials = b64decode(auth_credentials.encode()).decode() | ||||
|     except (UnicodeDecodeError, Error): | ||||
|     if " " not in auth_credentials: | ||||
|         return None | ||||
|     # Accept credentials with username and without | ||||
|     if ":" in auth_credentials: | ||||
|         _, password = auth_credentials.split(":") | ||||
|     else: | ||||
|         password = auth_credentials | ||||
|     auth_type, auth_credentials = auth_credentials.split() | ||||
|     if auth_type.lower() not in ["basic", "bearer"]: | ||||
|         LOGGER.debug("Unsupported authentication type, denying", type=auth_type.lower()) | ||||
|         return None | ||||
|     password = auth_credentials | ||||
|     if auth_type.lower() == "basic": | ||||
|         try: | ||||
|             auth_credentials = b64decode(auth_credentials.encode()).decode() | ||||
|         except (UnicodeDecodeError, Error): | ||||
|             return None | ||||
|         # Accept credentials with username and without | ||||
|         if ":" in auth_credentials: | ||||
|             _, password = auth_credentials.split(":") | ||||
|         else: | ||||
|             password = auth_credentials | ||||
|     if password == "":  # nosec | ||||
|         return None | ||||
|     tokens = Token.filter_not_expired(key=password, intent=TokenIntents.INTENT_API) | ||||
| @ -43,10 +43,10 @@ def token_from_header(raw_header: bytes) -> Optional[Token]: | ||||
|  | ||||
|  | ||||
| class AuthentikTokenAuthentication(BaseAuthentication): | ||||
|     """Token-based authentication using HTTP Basic authentication""" | ||||
|     """Token-based authentication using HTTP Bearer authentication""" | ||||
|  | ||||
|     def authenticate(self, request: Request) -> Union[tuple[User, Any], None]: | ||||
|         """Token-based authentication using HTTP Basic authentication""" | ||||
|         """Token-based authentication using HTTP Bearer authentication""" | ||||
|         auth = get_authorization_header(request) | ||||
|  | ||||
|         token = token_from_header(auth) | ||||
| @ -56,6 +56,4 @@ class AuthentikTokenAuthentication(BaseAuthentication): | ||||
|         return (token.user, None) | ||||
|  | ||||
|     def authenticate_header(self, request: Request) -> str: | ||||
|         if X_AUTHENTIK_PREVENT_BASIC_HEADER in request._request.META: | ||||
|             return "" | ||||
|         return 'Basic realm="authentik"' | ||||
|         return "Bearer" | ||||
|  | ||||
| @ -11,7 +11,7 @@ from authentik.core.models import Token, TokenIntents | ||||
| class TestAPIAuth(TestCase): | ||||
|     """Test API Authentication""" | ||||
|  | ||||
|     def test_valid(self): | ||||
|     def test_valid_basic(self): | ||||
|         """Test valid token""" | ||||
|         token = Token.objects.create( | ||||
|             intent=TokenIntents.INTENT_API, user=get_anonymous_user() | ||||
| @ -19,6 +19,13 @@ class TestAPIAuth(TestCase): | ||||
|         auth = b64encode(f":{token.key}".encode()).decode() | ||||
|         self.assertEqual(token_from_header(f"Basic {auth}".encode()), token) | ||||
|  | ||||
|     def test_valid_bearer(self): | ||||
|         """Test valid token""" | ||||
|         token = Token.objects.create( | ||||
|             intent=TokenIntents.INTENT_API, user=get_anonymous_user() | ||||
|         ) | ||||
|         self.assertEqual(token_from_header(f"Bearer {token.key}".encode()), token) | ||||
|  | ||||
|     def test_invalid_type(self): | ||||
|         """Test invalid type""" | ||||
|         self.assertIsNone(token_from_header("foo bar".encode())) | ||||
|  | ||||
| @ -143,7 +143,7 @@ SWAGGER_SETTINGS = { | ||||
|         "authentik.api.pagination_schema.PaginationInspector", | ||||
|     ], | ||||
|     "SECURITY_DEFINITIONS": { | ||||
|         "token": {"type": "apiKey", "name": "Authorization", "in": "header"} | ||||
|         "Bearer": {"type": "apiKey", "name": "Authorization", "in": "header"} | ||||
|     }, | ||||
| } | ||||
|  | ||||
|  | ||||
| @ -2,7 +2,6 @@ package ak | ||||
|  | ||||
| import ( | ||||
| 	"crypto/tls" | ||||
| 	"encoding/base64" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| @ -20,7 +19,7 @@ func (ac *APIController) initWS(pbURL url.URL, outpostUUID strfmt.UUID) { | ||||
| 	pathTemplate := "%s://%s/ws/outpost/%s/" | ||||
| 	scheme := strings.ReplaceAll(pbURL.Scheme, "http", "ws") | ||||
|  | ||||
| 	authHeader := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("Basic :%s", ac.token))) | ||||
| 	authHeader := fmt.Sprintf("Bearer %s", ac.token) | ||||
|  | ||||
| 	header := http.Header{ | ||||
| 		"Authorization": []string{authHeader}, | ||||
|  | ||||
| @ -13,12 +13,12 @@ consumes: | ||||
| produces: | ||||
|   - application/json | ||||
| securityDefinitions: | ||||
|   token: | ||||
|   Bearer: | ||||
|     type: apiKey | ||||
|     name: Authorization | ||||
|     in: header | ||||
| security: | ||||
|   - token: [] | ||||
|   - Bearer: [] | ||||
| paths: | ||||
|   /admin/apps/: | ||||
|     get: | ||||
|  | ||||
| @ -16,7 +16,6 @@ export const DEFAULT_CONFIG = new Configuration({ | ||||
|     basePath: "/api/v2beta", | ||||
|     headers: { | ||||
|         "X-CSRFToken": getCookie("authentik_csrf"), | ||||
|         "X-Authentik-Prevent-Basic": "true" | ||||
|     }, | ||||
|     middleware: [ | ||||
|         API_DRAWER_MIDDLEWARE, | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 Jens Langhammer
					Jens Langhammer