 8949464294
			
		
	
	8949464294
	
	
	
		
			
			* format files Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix pyright Signed-off-by: Jens Langhammer <jens@goauthentik.io> * revert #8367 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * sigh Signed-off-by: Jens Langhammer <jens@goauthentik.io> --------- Signed-off-by: Jens Langhammer <jens@goauthentik.io>
		
			
				
	
	
		
			260 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			260 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Test token view"""
 | |
| 
 | |
| from base64 import b64encode, urlsafe_b64encode
 | |
| from hashlib import sha256
 | |
| 
 | |
| from django.test import RequestFactory
 | |
| from django.urls import reverse
 | |
| 
 | |
| from authentik.core.models import Application
 | |
| from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 | |
| from authentik.flows.challenge import ChallengeTypes
 | |
| from authentik.lib.generators import generate_id
 | |
| from authentik.providers.oauth2.constants import GRANT_TYPE_AUTHORIZATION_CODE
 | |
| from authentik.providers.oauth2.models import AuthorizationCode, OAuth2Provider
 | |
| from authentik.providers.oauth2.tests.utils import OAuthTestCase
 | |
| 
 | |
| 
 | |
| class TestTokenPKCE(OAuthTestCase):
 | |
|     """Test token view"""
 | |
| 
 | |
|     def setUp(self) -> None:
 | |
|         super().setUp()
 | |
|         self.factory = RequestFactory()
 | |
|         self.app = Application.objects.create(name=generate_id(), slug="test")
 | |
| 
 | |
|     def test_pkce_missing_in_authorize(self):
 | |
|         """Test PKCE with code_challenge in authorize request
 | |
|         and missing verifier in token request"""
 | |
|         flow = create_test_flow()
 | |
|         provider = OAuth2Provider.objects.create(
 | |
|             name=generate_id(),
 | |
|             client_id="test",
 | |
|             authorization_flow=flow,
 | |
|             redirect_uris="foo://localhost",
 | |
|             access_code_validity="seconds=100",
 | |
|         )
 | |
|         Application.objects.create(name="app", slug="app", provider=provider)
 | |
|         state = generate_id()
 | |
|         user = create_test_admin_user()
 | |
|         self.client.force_login(user)
 | |
|         challenge = generate_id()
 | |
|         header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 | |
|         # Step 1, initiate params and get redirect to flow
 | |
|         self.client.get(
 | |
|             reverse("authentik_providers_oauth2:authorize"),
 | |
|             data={
 | |
|                 "response_type": "code",
 | |
|                 "client_id": "test",
 | |
|                 "state": state,
 | |
|                 "redirect_uri": "foo://localhost",
 | |
|                 "code_challenge": challenge,
 | |
|                 "code_challenge_method": "S256",
 | |
|             },
 | |
|         )
 | |
|         response = self.client.get(
 | |
|             reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 | |
|         )
 | |
|         code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
 | |
|         self.assertJSONEqual(
 | |
|             response.content.decode(),
 | |
|             {
 | |
|                 "component": "xak-flow-redirect",
 | |
|                 "type": ChallengeTypes.REDIRECT.value,
 | |
|                 "to": f"foo://localhost?code={code.code}&state={state}",
 | |
|             },
 | |
|         )
 | |
|         response = self.client.post(
 | |
|             reverse("authentik_providers_oauth2:token"),
 | |
|             data={
 | |
|                 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 | |
|                 "code": code.code,
 | |
|                 # Missing the code_verifier here
 | |
|                 "redirect_uri": "foo://localhost",
 | |
|             },
 | |
|             HTTP_AUTHORIZATION=f"Basic {header}",
 | |
|         )
 | |
|         self.assertJSONEqual(
 | |
|             response.content,
 | |
|             {
 | |
|                 "error": "invalid_grant",
 | |
|                 "error_description": (
 | |
|                     "The provided authorization grant or refresh token is invalid, expired, "
 | |
|                     "revoked, does not match the redirection URI used in the authorization "
 | |
|                     "request, or was issued to another client"
 | |
|                 ),
 | |
|             },
 | |
|         )
 | |
|         self.assertEqual(response.status_code, 400)
 | |
| 
 | |
|     def test_pkce_missing_in_token(self):
 | |
|         """Test PKCE with missing code_challenge in authorization request but verifier
 | |
|         set in token request"""
 | |
|         flow = create_test_flow()
 | |
|         provider = OAuth2Provider.objects.create(
 | |
|             name=generate_id(),
 | |
|             client_id="test",
 | |
|             authorization_flow=flow,
 | |
|             redirect_uris="foo://localhost",
 | |
|             access_code_validity="seconds=100",
 | |
|         )
 | |
|         Application.objects.create(name="app", slug="app", provider=provider)
 | |
|         state = generate_id()
 | |
|         user = create_test_admin_user()
 | |
|         self.client.force_login(user)
 | |
|         header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 | |
|         # Step 1, initiate params and get redirect to flow
 | |
|         self.client.get(
 | |
|             reverse("authentik_providers_oauth2:authorize"),
 | |
|             data={
 | |
|                 "response_type": "code",
 | |
|                 "client_id": "test",
 | |
|                 "state": state,
 | |
|                 "redirect_uri": "foo://localhost",
 | |
|                 # "code_challenge": challenge,
 | |
|                 # "code_challenge_method": "S256",
 | |
|             },
 | |
|         )
 | |
|         response = self.client.get(
 | |
|             reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 | |
|         )
 | |
|         code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
 | |
|         self.assertJSONEqual(
 | |
|             response.content.decode(),
 | |
|             {
 | |
|                 "component": "xak-flow-redirect",
 | |
|                 "type": ChallengeTypes.REDIRECT.value,
 | |
|                 "to": f"foo://localhost?code={code.code}&state={state}",
 | |
|             },
 | |
|         )
 | |
|         response = self.client.post(
 | |
|             reverse("authentik_providers_oauth2:token"),
 | |
|             data={
 | |
|                 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 | |
|                 "code": code.code,
 | |
|                 "code_verifier": generate_id(),
 | |
|                 "redirect_uri": "foo://localhost",
 | |
|             },
 | |
|             HTTP_AUTHORIZATION=f"Basic {header}",
 | |
|         )
 | |
|         self.assertJSONEqual(
 | |
|             response.content,
 | |
|             {
 | |
|                 "error": "invalid_grant",
 | |
|                 "error_description": (
 | |
|                     "The provided authorization grant or refresh token is invalid, expired, "
 | |
|                     "revoked, does not match the redirection URI used in the authorization "
 | |
|                     "request, or was issued to another client"
 | |
|                 ),
 | |
|             },
 | |
|         )
 | |
|         self.assertEqual(response.status_code, 400)
 | |
| 
 | |
|     def test_pkce_correct_s256(self):
 | |
|         """Test full with pkce"""
 | |
|         flow = create_test_flow()
 | |
|         provider = OAuth2Provider.objects.create(
 | |
|             name=generate_id(),
 | |
|             client_id="test",
 | |
|             authorization_flow=flow,
 | |
|             redirect_uris="foo://localhost",
 | |
|             access_code_validity="seconds=100",
 | |
|         )
 | |
|         Application.objects.create(name="app", slug="app", provider=provider)
 | |
|         state = generate_id()
 | |
|         user = create_test_admin_user()
 | |
|         self.client.force_login(user)
 | |
|         verifier = generate_id()
 | |
|         challenge = (
 | |
|             urlsafe_b64encode(sha256(verifier.encode("ascii")).digest())
 | |
|             .decode("utf-8")
 | |
|             .replace("=", "")
 | |
|         )
 | |
|         header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 | |
|         # Step 1, initiate params and get redirect to flow
 | |
|         self.client.get(
 | |
|             reverse("authentik_providers_oauth2:authorize"),
 | |
|             data={
 | |
|                 "response_type": "code",
 | |
|                 "client_id": "test",
 | |
|                 "state": state,
 | |
|                 "redirect_uri": "foo://localhost",
 | |
|                 "code_challenge": challenge,
 | |
|                 "code_challenge_method": "S256",
 | |
|             },
 | |
|         )
 | |
|         response = self.client.get(
 | |
|             reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 | |
|         )
 | |
|         code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
 | |
|         self.assertJSONEqual(
 | |
|             response.content.decode(),
 | |
|             {
 | |
|                 "component": "xak-flow-redirect",
 | |
|                 "type": ChallengeTypes.REDIRECT.value,
 | |
|                 "to": f"foo://localhost?code={code.code}&state={state}",
 | |
|             },
 | |
|         )
 | |
|         response = self.client.post(
 | |
|             reverse("authentik_providers_oauth2:token"),
 | |
|             data={
 | |
|                 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 | |
|                 "code": code.code,
 | |
|                 "code_verifier": verifier,
 | |
|                 "redirect_uri": "foo://localhost",
 | |
|             },
 | |
|             HTTP_AUTHORIZATION=f"Basic {header}",
 | |
|         )
 | |
|         self.assertEqual(response.status_code, 200)
 | |
| 
 | |
|     def test_pkce_correct_plain(self):
 | |
|         """Test full with pkce"""
 | |
|         flow = create_test_flow()
 | |
|         provider = OAuth2Provider.objects.create(
 | |
|             name=generate_id(),
 | |
|             client_id="test",
 | |
|             authorization_flow=flow,
 | |
|             redirect_uris="foo://localhost",
 | |
|             access_code_validity="seconds=100",
 | |
|         )
 | |
|         Application.objects.create(name="app", slug="app", provider=provider)
 | |
|         state = generate_id()
 | |
|         user = create_test_admin_user()
 | |
|         self.client.force_login(user)
 | |
|         verifier = generate_id()
 | |
|         header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 | |
|         # Step 1, initiate params and get redirect to flow
 | |
|         self.client.get(
 | |
|             reverse("authentik_providers_oauth2:authorize"),
 | |
|             data={
 | |
|                 "response_type": "code",
 | |
|                 "client_id": "test",
 | |
|                 "state": state,
 | |
|                 "redirect_uri": "foo://localhost",
 | |
|                 "code_challenge": verifier,
 | |
|             },
 | |
|         )
 | |
|         response = self.client.get(
 | |
|             reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 | |
|         )
 | |
|         code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
 | |
|         self.assertJSONEqual(
 | |
|             response.content.decode(),
 | |
|             {
 | |
|                 "component": "xak-flow-redirect",
 | |
|                 "type": ChallengeTypes.REDIRECT.value,
 | |
|                 "to": f"foo://localhost?code={code.code}&state={state}",
 | |
|             },
 | |
|         )
 | |
|         response = self.client.post(
 | |
|             reverse("authentik_providers_oauth2:token"),
 | |
|             data={
 | |
|                 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 | |
|                 "code": code.code,
 | |
|                 "code_verifier": verifier,
 | |
|                 "redirect_uri": "foo://localhost",
 | |
|             },
 | |
|             HTTP_AUTHORIZATION=f"Basic {header}",
 | |
|         )
 | |
|         self.assertEqual(response.status_code, 200)
 |