stages/authenticator_validate: fix error when using pretend_user (#8447)

This commit is contained in:
Jens L
2024-02-07 21:21:16 +01:00
committed by GitHub
parent b92630804f
commit c90792d876
5 changed files with 52 additions and 2 deletions

View File

@ -43,7 +43,9 @@ class TokenBackend(InbuiltBackend):
self, request: HttpRequest, username: Optional[str], password: Optional[str], **kwargs: Any self, request: HttpRequest, username: Optional[str], password: Optional[str], **kwargs: Any
) -> Optional[User]: ) -> Optional[User]:
try: try:
# pylint: disable=no-member
user = User._default_manager.get_by_natural_key(username) user = User._default_manager.get_by_natural_key(username)
# pylint: disable=no-member
except User.DoesNotExist: except User.DoesNotExist:
# Run the default password hasher once to reduce the timing # Run the default password hasher once to reduce the timing
# difference between an existing and a nonexistent user (#20760). # difference between an existing and a nonexistent user (#20760).

View File

@ -37,6 +37,7 @@ def clean_expired_models(self: SystemTask):
messages.append(f"Expired {amount} {cls._meta.verbose_name_plural}") messages.append(f"Expired {amount} {cls._meta.verbose_name_plural}")
# Special case # Special case
amount = 0 amount = 0
# pylint: disable=no-member
for session in AuthenticatedSession.objects.all(): for session in AuthenticatedSession.objects.all():
cache_key = f"{KEY_PREFIX}{session.session_key}" cache_key = f"{KEY_PREFIX}{session.session_key}"
value = None value = None
@ -49,6 +50,7 @@ def clean_expired_models(self: SystemTask):
session.delete() session.delete()
amount += 1 amount += 1
LOGGER.debug("Expired sessions", model=AuthenticatedSession, amount=amount) LOGGER.debug("Expired sessions", model=AuthenticatedSession, amount=amount)
# pylint: disable=no-member
messages.append(f"Expired {amount} {AuthenticatedSession._meta.verbose_name_plural}") messages.append(f"Expired {amount} {AuthenticatedSession._meta.verbose_name_plural}")
self.set_status(TaskStatus.SUCCESSFUL, *messages) self.set_status(TaskStatus.SUCCESSFUL, *messages)

View File

@ -14,7 +14,7 @@ from authentik.core.api.utils import JSONDictField, PassiveSerializer
from authentik.core.models import User from authentik.core.models import User
from authentik.events.models import Event, EventAction from authentik.events.models import Event, EventAction
from authentik.flows.challenge import ChallengeResponse, ChallengeTypes, WithUserInfoChallenge from authentik.flows.challenge import ChallengeResponse, ChallengeTypes, WithUserInfoChallenge
from authentik.flows.exceptions import FlowSkipStageException from authentik.flows.exceptions import FlowSkipStageException, StageInvalidException
from authentik.flows.models import FlowDesignation, NotConfiguredAction, Stage from authentik.flows.models import FlowDesignation, NotConfiguredAction, Stage
from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
from authentik.flows.stage import ChallengeStageView from authentik.flows.stage import ChallengeStageView
@ -154,6 +154,16 @@ class AuthenticatorValidateStageView(ChallengeStageView):
def get_device_challenges(self) -> list[dict]: def get_device_challenges(self) -> list[dict]:
"""Get a list of all device challenges applicable for the current stage""" """Get a list of all device challenges applicable for the current stage"""
challenges = [] challenges = []
pending_user = self.get_pending_user()
if pending_user.is_anonymous:
# We shouldn't get here without any kind of authentication data
raise StageInvalidException()
# When `pretend_user_exists` is enabled in the identification stage,
# `pending_user` will be a user model that isn't save to the DB
# hence it doesn't have a PK. In that case we just return an empty list of
# authenticators
if not pending_user.pk:
return []
# Convert to a list to have usable log output instead of just <generator ...> # Convert to a list to have usable log output instead of just <generator ...>
user_devices = list(devices_for_user(self.get_pending_user())) user_devices = list(devices_for_user(self.get_pending_user()))
self.logger.debug("Got devices for user", devices=user_devices) self.logger.debug("Got devices for user", devices=user_devices)

View File

@ -123,7 +123,7 @@ class IdentificationChallengeResponse(ChallengeResponse):
if not current_stage.show_matched_user: if not current_stage.show_matched_user:
self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = uid_field self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = uid_field
# when `pretend` is enabled, continue regardless # when `pretend` is enabled, continue regardless
if current_stage.pretend_user_exists: if current_stage.pretend_user_exists and not current_stage.password_stage:
return attrs return attrs
raise ValidationError("Failed to authenticate.") raise ValidationError("Failed to authenticate.")
self.pre_user = pre_user self.pre_user = pre_user

View File

@ -100,6 +100,42 @@ class TestIdentificationStage(FlowTestCase):
user_fields=["email"], user_fields=["email"],
) )
def test_invalid_with_password_pretend(self):
"""Test with invalid email and invalid password in single step (with pretend_user_exists)"""
self.stage.pretend_user_exists = True
pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT])
self.stage.password_stage = pw_stage
self.stage.save()
form_data = {
"uid_field": self.user.email + "test",
"password": self.user.username + "test",
}
url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
response = self.client.post(url, form_data)
self.assertStageResponse(
response,
self.flow,
component="ak-stage-identification",
password_fields=True,
primary_action="Log in",
response_errors={
"non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
},
sources=[
{
"challenge": {
"component": "xak-flow-redirect",
"to": "/source/oauth/login/test/",
"type": ChallengeTypes.REDIRECT.value,
},
"icon_url": "/static/authentik/sources/default.svg",
"name": "test",
}
],
show_source_labels=False,
user_fields=["email"],
)
def test_invalid_with_username(self): def test_invalid_with_username(self):
"""Test invalid with username (user exists but stage only allows email)""" """Test invalid with username (user exists but stage only allows email)"""
form_data = {"uid_field": self.user.username} form_data = {"uid_field": self.user.username}