policies/password: fix PasswordStage not being usable with prompt stages, rework validation logic
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
		| @ -8,8 +8,11 @@ from structlog.stdlib import get_logger | |||||||
|  |  | ||||||
| from authentik.policies.models import Policy | from authentik.policies.models import Policy | ||||||
| from authentik.policies.types import PolicyRequest, PolicyResult | from authentik.policies.types import PolicyRequest, PolicyResult | ||||||
|  | from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT | ||||||
|  |  | ||||||
| LOGGER = get_logger() | LOGGER = get_logger() | ||||||
|  | RE_LOWER = re.compile("[a-z]") | ||||||
|  | RE_UPPER = re.compile("[A-Z]") | ||||||
|  |  | ||||||
|  |  | ||||||
| class PasswordPolicy(Policy): | class PasswordPolicy(Policy): | ||||||
| @ -38,31 +41,39 @@ class PasswordPolicy(Policy): | |||||||
|         return "ak-policy-password-form" |         return "ak-policy-password-form" | ||||||
|  |  | ||||||
|     def passes(self, request: PolicyRequest) -> PolicyResult: |     def passes(self, request: PolicyRequest) -> PolicyResult: | ||||||
|         if self.password_field not in request.context: |         if ( | ||||||
|  |             self.password_field not in request.context | ||||||
|  |             and self.password_field not in request.context.get(PLAN_CONTEXT_PROMPT, {}) | ||||||
|  |         ): | ||||||
|             LOGGER.warning( |             LOGGER.warning( | ||||||
|                 "Password field not set in Policy Request", |                 "Password field not set in Policy Request", | ||||||
|                 field=self.password_field, |                 field=self.password_field, | ||||||
|                 fields=request.context.keys(), |                 fields=request.context.keys(), | ||||||
|  |                 prompt_fields=request.context.get(PLAN_CONTEXT_PROMPT, {}).keys(), | ||||||
|             ) |             ) | ||||||
|             return PolicyResult(False, _("Password not set in context")) |             return PolicyResult(False, _("Password not set in context")) | ||||||
|  |  | ||||||
|  |         if self.password_field in request.context: | ||||||
|             password = request.context[self.password_field] |             password = request.context[self.password_field] | ||||||
|  |         else: | ||||||
|  |             password = request.context[PLAN_CONTEXT_PROMPT][self.password_field] | ||||||
|  |  | ||||||
|         filter_regex = [] |         if len(password) < self.length_min: | ||||||
|         if self.amount_lowercase > 0: |             LOGGER.debug("password failed", reason="length", p=password) | ||||||
|             filter_regex.append(r"[a-z]{%d,}" % self.amount_lowercase) |             return PolicyResult(False, self.error_message) | ||||||
|         if self.amount_uppercase > 0: |  | ||||||
|             filter_regex.append(r"[A-Z]{%d,}" % self.amount_uppercase) |  | ||||||
|         if self.amount_symbols > 0: |  | ||||||
|             filter_regex.append(r"[%s]{%d,}" % (self.symbol_charset, self.amount_symbols)) |  | ||||||
|         full_regex = "|".join(filter_regex) |  | ||||||
|         LOGGER.debug("Built regex", regexp=full_regex) |  | ||||||
|         result = bool(re.compile(full_regex).match(password)) |  | ||||||
|  |  | ||||||
|         result = result and len(password) >= self.length_min |         if self.amount_lowercase > 0 and len(RE_LOWER.findall(password)) < self.amount_lowercase: | ||||||
|  |             LOGGER.debug("password failed", reason="amount_lowercase", p=password) | ||||||
|  |             return PolicyResult(False, self.error_message) | ||||||
|  |         if self.amount_uppercase > 0 and len(RE_UPPER.findall(password)) < self.amount_lowercase: | ||||||
|  |             LOGGER.debug("password failed", reason="amount_uppercase", p=password) | ||||||
|  |             return PolicyResult(False, self.error_message) | ||||||
|  |         regex = re.compile(r"[%s]" % self.symbol_charset) | ||||||
|  |         if self.amount_symbols > 0 and len(regex.findall(password)) < self.amount_symbols: | ||||||
|  |             LOGGER.debug("password failed", reason="amount_symbols", p=password) | ||||||
|  |             return PolicyResult(False, self.error_message) | ||||||
|  |  | ||||||
|         if not result: |         return PolicyResult(True) | ||||||
|             return PolicyResult(result, self.error_message) |  | ||||||
|         return PolicyResult(result) |  | ||||||
|  |  | ||||||
|     class Meta: |     class Meta: | ||||||
|  |  | ||||||
|  | |||||||
| @ -1,57 +0,0 @@ | |||||||
| """Password Policy tests""" |  | ||||||
| from django.test import TestCase |  | ||||||
| from guardian.shortcuts import get_anonymous_user |  | ||||||
|  |  | ||||||
| from authentik.policies.password.models import PasswordPolicy |  | ||||||
| from authentik.policies.types import PolicyRequest, PolicyResult |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class TestPasswordPolicy(TestCase): |  | ||||||
|     """Test Password Policy""" |  | ||||||
|  |  | ||||||
|     def test_invalid(self): |  | ||||||
|         """Test without password""" |  | ||||||
|         policy = PasswordPolicy.objects.create( |  | ||||||
|             name="test_invalid", |  | ||||||
|             amount_uppercase=1, |  | ||||||
|             amount_lowercase=2, |  | ||||||
|             amount_symbols=3, |  | ||||||
|             length_min=24, |  | ||||||
|             error_message="test message", |  | ||||||
|         ) |  | ||||||
|         request = PolicyRequest(get_anonymous_user()) |  | ||||||
|         result: PolicyResult = policy.passes(request) |  | ||||||
|         self.assertFalse(result.passing) |  | ||||||
|         self.assertEqual(result.messages[0], "Password not set in context") |  | ||||||
|  |  | ||||||
|     def test_false(self): |  | ||||||
|         """Failing password case""" |  | ||||||
|         policy = PasswordPolicy.objects.create( |  | ||||||
|             name="test_false", |  | ||||||
|             amount_uppercase=1, |  | ||||||
|             amount_lowercase=2, |  | ||||||
|             amount_symbols=3, |  | ||||||
|             length_min=24, |  | ||||||
|             error_message="test message", |  | ||||||
|         ) |  | ||||||
|         request = PolicyRequest(get_anonymous_user()) |  | ||||||
|         request.context["password"] = "test" |  | ||||||
|         result: PolicyResult = policy.passes(request) |  | ||||||
|         self.assertFalse(result.passing) |  | ||||||
|         self.assertEqual(result.messages, ("test message",)) |  | ||||||
|  |  | ||||||
|     def test_true(self): |  | ||||||
|         """Positive password case""" |  | ||||||
|         policy = PasswordPolicy.objects.create( |  | ||||||
|             name="test_true", |  | ||||||
|             amount_uppercase=1, |  | ||||||
|             amount_lowercase=2, |  | ||||||
|             amount_symbols=3, |  | ||||||
|             length_min=3, |  | ||||||
|             error_message="test message", |  | ||||||
|         ) |  | ||||||
|         request = PolicyRequest(get_anonymous_user()) |  | ||||||
|         request.context["password"] = "Test()!" |  | ||||||
|         result: PolicyResult = policy.passes(request) |  | ||||||
|         self.assertTrue(result.passing) |  | ||||||
|         self.assertEqual(result.messages, tuple()) |  | ||||||
							
								
								
									
										0
									
								
								authentik/policies/password/tests/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								authentik/policies/password/tests/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										80
									
								
								authentik/policies/password/tests/test_flows.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										80
									
								
								authentik/policies/password/tests/test_flows.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,80 @@ | |||||||
|  | """Password flow tests""" | ||||||
|  | from django.test import TestCase | ||||||
|  | from django.urls.base import reverse | ||||||
|  | from django.utils.encoding import force_str | ||||||
|  |  | ||||||
|  | from authentik.core.models import User | ||||||
|  | from authentik.flows.challenge import ChallengeTypes | ||||||
|  | from authentik.flows.models import Flow, FlowDesignation, FlowStageBinding | ||||||
|  | from authentik.policies.password.models import PasswordPolicy | ||||||
|  | from authentik.stages.prompt.models import FieldTypes, Prompt, PromptStage | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class TestPasswordPolicyFlow(TestCase): | ||||||
|  |     """Test Password Policy""" | ||||||
|  |  | ||||||
|  |     def setUp(self) -> None: | ||||||
|  |         self.user = User.objects.create(username="unittest", email="test@beryju.org") | ||||||
|  |  | ||||||
|  |         self.flow = Flow.objects.create( | ||||||
|  |             name="test-prompt", | ||||||
|  |             slug="test-prompt", | ||||||
|  |             designation=FlowDesignation.AUTHENTICATION, | ||||||
|  |         ) | ||||||
|  |         password_prompt = Prompt.objects.create( | ||||||
|  |             field_key="password", | ||||||
|  |             label="PASSWORD_LABEL", | ||||||
|  |             type=FieldTypes.PASSWORD, | ||||||
|  |             required=True, | ||||||
|  |             placeholder="PASSWORD_PLACEHOLDER", | ||||||
|  |         ) | ||||||
|  |  | ||||||
|  |         self.policy = PasswordPolicy.objects.create( | ||||||
|  |             name="test_true", | ||||||
|  |             amount_uppercase=1, | ||||||
|  |             amount_lowercase=2, | ||||||
|  |             amount_symbols=3, | ||||||
|  |             length_min=3, | ||||||
|  |             error_message="test message", | ||||||
|  |         ) | ||||||
|  |         stage = PromptStage.objects.create(name="prompt-stage") | ||||||
|  |         stage.validation_policies.set([self.policy]) | ||||||
|  |         stage.fields.set( | ||||||
|  |             [ | ||||||
|  |                 password_prompt, | ||||||
|  |             ] | ||||||
|  |         ) | ||||||
|  |         FlowStageBinding.objects.create(target=self.flow, stage=stage, order=2) | ||||||
|  |  | ||||||
|  |     def test_prompt_data(self): | ||||||
|  |         """Test policy attached to a prompt stage""" | ||||||
|  |         response = self.client.post( | ||||||
|  |             reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), | ||||||
|  |             {"password": "akadmin"}, | ||||||
|  |         ) | ||||||
|  |         self.assertEqual(response.status_code, 200) | ||||||
|  |         self.assertJSONEqual( | ||||||
|  |             force_str(response.content), | ||||||
|  |             { | ||||||
|  |                 "component": "ak-stage-prompt", | ||||||
|  |                 "fields": [ | ||||||
|  |                     { | ||||||
|  |                         "field_key": "password", | ||||||
|  |                         "label": "PASSWORD_LABEL", | ||||||
|  |                         "order": 0, | ||||||
|  |                         "placeholder": "PASSWORD_PLACEHOLDER", | ||||||
|  |                         "required": True, | ||||||
|  |                         "type": "password", | ||||||
|  |                     } | ||||||
|  |                 ], | ||||||
|  |                 "flow_info": { | ||||||
|  |                     "background": self.flow.background_url, | ||||||
|  |                     "cancel_url": reverse("authentik_flows:cancel"), | ||||||
|  |                     "title": "", | ||||||
|  |                 }, | ||||||
|  |                 "response_errors": { | ||||||
|  |                     "non_field_errors": [{"code": "invalid", "string": self.policy.error_message}] | ||||||
|  |                 }, | ||||||
|  |                 "type": ChallengeTypes.NATIVE.value, | ||||||
|  |             }, | ||||||
|  |         ) | ||||||
							
								
								
									
										68
									
								
								authentik/policies/password/tests/test_policy.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										68
									
								
								authentik/policies/password/tests/test_policy.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,68 @@ | |||||||
|  | """Password Policy tests""" | ||||||
|  | from django.test import TestCase | ||||||
|  | from guardian.shortcuts import get_anonymous_user | ||||||
|  |  | ||||||
|  | from authentik.lib.generators import generate_key | ||||||
|  | from authentik.policies.password.models import PasswordPolicy | ||||||
|  | from authentik.policies.types import PolicyRequest, PolicyResult | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class TestPasswordPolicy(TestCase): | ||||||
|  |     """Test Password Policy""" | ||||||
|  |  | ||||||
|  |     def setUp(self) -> None: | ||||||
|  |         self.policy = PasswordPolicy.objects.create( | ||||||
|  |             name="test_false", | ||||||
|  |             amount_uppercase=1, | ||||||
|  |             amount_lowercase=2, | ||||||
|  |             amount_symbols=3, | ||||||
|  |             length_min=24, | ||||||
|  |             error_message="test message", | ||||||
|  |         ) | ||||||
|  |  | ||||||
|  |     def test_invalid(self): | ||||||
|  |         """Test without password""" | ||||||
|  |         request = PolicyRequest(get_anonymous_user()) | ||||||
|  |         result: PolicyResult = self.policy.passes(request) | ||||||
|  |         self.assertFalse(result.passing) | ||||||
|  |         self.assertEqual(result.messages[0], "Password not set in context") | ||||||
|  |  | ||||||
|  |     def test_failed_length(self): | ||||||
|  |         """Password too short""" | ||||||
|  |         request = PolicyRequest(get_anonymous_user()) | ||||||
|  |         request.context["password"] = "test" | ||||||
|  |         result: PolicyResult = self.policy.passes(request) | ||||||
|  |         self.assertFalse(result.passing) | ||||||
|  |         self.assertEqual(result.messages, ("test message",)) | ||||||
|  |  | ||||||
|  |     def test_failed_lowercase(self): | ||||||
|  |         """not enough lowercase""" | ||||||
|  |         request = PolicyRequest(get_anonymous_user()) | ||||||
|  |         request.context["password"] = "TTTTTTTTTTTTTTTTTTTTTTTe" | ||||||
|  |         result: PolicyResult = self.policy.passes(request) | ||||||
|  |         self.assertFalse(result.passing) | ||||||
|  |         self.assertEqual(result.messages, ("test message",)) | ||||||
|  |  | ||||||
|  |     def test_failed_uppercase(self): | ||||||
|  |         """not enough uppercase""" | ||||||
|  |         request = PolicyRequest(get_anonymous_user()) | ||||||
|  |         request.context["password"] = "tttttttttttttttttttttttE" | ||||||
|  |         result: PolicyResult = self.policy.passes(request) | ||||||
|  |         self.assertFalse(result.passing) | ||||||
|  |         self.assertEqual(result.messages, ("test message",)) | ||||||
|  |  | ||||||
|  |     def test_failed_symbols(self): | ||||||
|  |         """not enough uppercase""" | ||||||
|  |         request = PolicyRequest(get_anonymous_user()) | ||||||
|  |         request.context["password"] = "TETETETETETETETETETETETETe!!!" | ||||||
|  |         result: PolicyResult = self.policy.passes(request) | ||||||
|  |         self.assertFalse(result.passing) | ||||||
|  |         self.assertEqual(result.messages, ("test message",)) | ||||||
|  |  | ||||||
|  |     def test_true(self): | ||||||
|  |         """Positive password case""" | ||||||
|  |         request = PolicyRequest(get_anonymous_user()) | ||||||
|  |         request.context["password"] = generate_key() + "ee!!!" | ||||||
|  |         result: PolicyResult = self.policy.passes(request) | ||||||
|  |         self.assertTrue(result.passing) | ||||||
|  |         self.assertEqual(result.messages, tuple()) | ||||||
| @ -97,7 +97,6 @@ class TestPromptStage(TestCase): | |||||||
|                 static_prompt, |                 static_prompt, | ||||||
|             ] |             ] | ||||||
|         ) |         ) | ||||||
|         self.stage.save() |  | ||||||
|  |  | ||||||
|         self.prompt_data = { |         self.prompt_data = { | ||||||
|             username_prompt.field_key: "test-username", |             username_prompt.field_key: "test-username", | ||||||
|  | |||||||
| @ -66,7 +66,7 @@ This includes the following: | |||||||
|     Can be any of: |     Can be any of: | ||||||
|  |  | ||||||
|     - `password`: Standard password login |     - `password`: Standard password login | ||||||
|     - `app_password`: App passowrd (token) |     - `app_password`: App password (token) | ||||||
|  |  | ||||||
|         Sets `context['auth_method_args']` to |         Sets `context['auth_method_args']` to | ||||||
|         ```json |         ```json | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	 Jens Langhammer
					Jens Langhammer