enterprise/policies: Add Password Uniqueness History Policy (#13453)
Co-authored-by: David Gunter <david@davidgunter.ca> Co-authored-by: Jens Langhammer <jens@goauthentik.io> Co-authored-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
committed by
GitHub
parent
c82f747e5e
commit
723dccdae3
@ -13,7 +13,10 @@ from authentik.core.models import (
|
||||
TokenIntents,
|
||||
User,
|
||||
)
|
||||
from authentik.core.tasks import clean_expired_models, clean_temporary_users
|
||||
from authentik.core.tasks import (
|
||||
clean_expired_models,
|
||||
clean_temporary_users,
|
||||
)
|
||||
from authentik.core.tests.utils import create_test_admin_user
|
||||
from authentik.lib.generators import generate_id
|
||||
|
||||
|
||||
0
authentik/enterprise/policies/__init__.py
Normal file
0
authentik/enterprise/policies/__init__.py
Normal file
27
authentik/enterprise/policies/unique_password/api.py
Normal file
27
authentik/enterprise/policies/unique_password/api.py
Normal file
@ -0,0 +1,27 @@
|
||||
from rest_framework.viewsets import ModelViewSet
|
||||
|
||||
from authentik.core.api.used_by import UsedByMixin
|
||||
from authentik.enterprise.api import EnterpriseRequiredMixin
|
||||
from authentik.enterprise.policies.unique_password.models import UniquePasswordPolicy
|
||||
from authentik.policies.api.policies import PolicySerializer
|
||||
|
||||
|
||||
class UniquePasswordPolicySerializer(EnterpriseRequiredMixin, PolicySerializer):
|
||||
"""Password Uniqueness Policy Serializer"""
|
||||
|
||||
class Meta:
|
||||
model = UniquePasswordPolicy
|
||||
fields = PolicySerializer.Meta.fields + [
|
||||
"password_field",
|
||||
"num_historical_passwords",
|
||||
]
|
||||
|
||||
|
||||
class UniquePasswordPolicyViewSet(UsedByMixin, ModelViewSet):
|
||||
"""Password Uniqueness Policy Viewset"""
|
||||
|
||||
queryset = UniquePasswordPolicy.objects.all()
|
||||
serializer_class = UniquePasswordPolicySerializer
|
||||
filterset_fields = "__all__"
|
||||
ordering = ["name"]
|
||||
search_fields = ["name"]
|
||||
10
authentik/enterprise/policies/unique_password/apps.py
Normal file
10
authentik/enterprise/policies/unique_password/apps.py
Normal file
@ -0,0 +1,10 @@
|
||||
"""authentik Unique Password policy app config"""
|
||||
|
||||
from authentik.enterprise.apps import EnterpriseConfig
|
||||
|
||||
|
||||
class AuthentikEnterprisePoliciesUniquePasswordConfig(EnterpriseConfig):
|
||||
name = "authentik.enterprise.policies.unique_password"
|
||||
label = "authentik_policies_unique_password"
|
||||
verbose_name = "authentik Enterprise.Policies.Unique Password"
|
||||
default = True
|
||||
@ -0,0 +1,81 @@
|
||||
# Generated by Django 5.0.13 on 2025-03-26 23:02
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
("authentik_policies", "0011_policybinding_failure_result_and_more"),
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="UniquePasswordPolicy",
|
||||
fields=[
|
||||
(
|
||||
"policy_ptr",
|
||||
models.OneToOneField(
|
||||
auto_created=True,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
parent_link=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
to="authentik_policies.policy",
|
||||
),
|
||||
),
|
||||
(
|
||||
"password_field",
|
||||
models.TextField(
|
||||
default="password",
|
||||
help_text="Field key to check, field keys defined in Prompt stages are available.",
|
||||
),
|
||||
),
|
||||
(
|
||||
"num_historical_passwords",
|
||||
models.PositiveIntegerField(
|
||||
default=1, help_text="Number of passwords to check against."
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
"verbose_name": "Password Uniqueness Policy",
|
||||
"verbose_name_plural": "Password Uniqueness Policies",
|
||||
"indexes": [
|
||||
models.Index(fields=["policy_ptr_id"], name="authentik_p_policy__f559aa_idx")
|
||||
],
|
||||
},
|
||||
bases=("authentik_policies.policy",),
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name="UserPasswordHistory",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.AutoField(
|
||||
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
|
||||
),
|
||||
),
|
||||
("old_password", models.CharField(max_length=128)),
|
||||
("created_at", models.DateTimeField(auto_now_add=True)),
|
||||
("hibp_prefix_sha1", models.CharField(max_length=5)),
|
||||
("hibp_pw_hash", models.TextField()),
|
||||
(
|
||||
"user",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
related_name="old_passwords",
|
||||
to=settings.AUTH_USER_MODEL,
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
"verbose_name": "User Password History",
|
||||
},
|
||||
),
|
||||
]
|
||||
151
authentik/enterprise/policies/unique_password/models.py
Normal file
151
authentik/enterprise/policies/unique_password/models.py
Normal file
@ -0,0 +1,151 @@
|
||||
from hashlib import sha1
|
||||
|
||||
from django.contrib.auth.hashers import identify_hasher, make_password
|
||||
from django.db import models
|
||||
from django.utils.translation import gettext as _
|
||||
from rest_framework.serializers import BaseSerializer
|
||||
from structlog.stdlib import get_logger
|
||||
|
||||
from authentik.core.models import User
|
||||
from authentik.policies.models import Policy
|
||||
from authentik.policies.types import PolicyRequest, PolicyResult
|
||||
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
|
||||
|
||||
LOGGER = get_logger()
|
||||
|
||||
|
||||
class UniquePasswordPolicy(Policy):
|
||||
"""This policy prevents users from reusing old passwords."""
|
||||
|
||||
password_field = models.TextField(
|
||||
default="password",
|
||||
help_text=_("Field key to check, field keys defined in Prompt stages are available."),
|
||||
)
|
||||
|
||||
# Limit on the number of previous passwords the policy evaluates
|
||||
# Also controls number of old passwords the system stores.
|
||||
num_historical_passwords = models.PositiveIntegerField(
|
||||
default=1,
|
||||
help_text=_("Number of passwords to check against."),
|
||||
)
|
||||
|
||||
@property
|
||||
def serializer(self) -> type[BaseSerializer]:
|
||||
from authentik.enterprise.policies.unique_password.api import UniquePasswordPolicySerializer
|
||||
|
||||
return UniquePasswordPolicySerializer
|
||||
|
||||
@property
|
||||
def component(self) -> str:
|
||||
return "ak-policy-password-uniqueness-form"
|
||||
|
||||
def passes(self, request: PolicyRequest) -> PolicyResult:
|
||||
from authentik.enterprise.policies.unique_password.models import UserPasswordHistory
|
||||
|
||||
password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get(
|
||||
self.password_field, request.context.get(self.password_field)
|
||||
)
|
||||
if not password:
|
||||
LOGGER.warning(
|
||||
"Password field not found in request when checking UniquePasswordPolicy",
|
||||
field=self.password_field,
|
||||
fields=request.context.keys(),
|
||||
)
|
||||
return PolicyResult(False, _("Password not set in context"))
|
||||
password = str(password)
|
||||
|
||||
if not self.num_historical_passwords:
|
||||
# Policy not configured to check against any passwords
|
||||
return PolicyResult(True)
|
||||
|
||||
num_to_check = self.num_historical_passwords
|
||||
password_history = UserPasswordHistory.objects.filter(user=request.user).order_by(
|
||||
"-created_at"
|
||||
)[:num_to_check]
|
||||
|
||||
if not password_history:
|
||||
return PolicyResult(True)
|
||||
|
||||
for record in password_history:
|
||||
if not record.old_password:
|
||||
continue
|
||||
|
||||
if self._passwords_match(new_password=password, old_password=record.old_password):
|
||||
# Return on first match. Authentik does not consider timing attacks
|
||||
# on old passwords to be an attack surface.
|
||||
return PolicyResult(
|
||||
False,
|
||||
_("This password has been used previously. Please choose a different one."),
|
||||
)
|
||||
|
||||
return PolicyResult(True)
|
||||
|
||||
def _passwords_match(self, *, new_password: str, old_password: str) -> bool:
|
||||
try:
|
||||
hasher = identify_hasher(old_password)
|
||||
except ValueError:
|
||||
LOGGER.warning(
|
||||
"Skipping password; could not load hash algorithm",
|
||||
)
|
||||
return False
|
||||
|
||||
return hasher.verify(new_password, old_password)
|
||||
|
||||
@classmethod
|
||||
def is_in_use(cls):
|
||||
"""Check if any UniquePasswordPolicy is in use, either through policy bindings
|
||||
or direct attachment to a PromptStage.
|
||||
|
||||
Returns:
|
||||
bool: True if any policy is in use, False otherwise
|
||||
"""
|
||||
from authentik.policies.models import PolicyBinding
|
||||
|
||||
# Check if any policy is in use through bindings
|
||||
if PolicyBinding.in_use.for_policy(cls).exists():
|
||||
return True
|
||||
|
||||
# Check if any policy is attached to a PromptStage
|
||||
if cls.objects.filter(promptstage__isnull=False).exists():
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
class Meta(Policy.PolicyMeta):
|
||||
verbose_name = _("Password Uniqueness Policy")
|
||||
verbose_name_plural = _("Password Uniqueness Policies")
|
||||
|
||||
|
||||
class UserPasswordHistory(models.Model):
|
||||
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="old_passwords")
|
||||
# Mimic's column type of AbstractBaseUser.password
|
||||
old_password = models.CharField(max_length=128)
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
hibp_prefix_sha1 = models.CharField(max_length=5)
|
||||
hibp_pw_hash = models.TextField()
|
||||
|
||||
class Meta:
|
||||
verbose_name = _("User Password History")
|
||||
|
||||
def __str__(self) -> str:
|
||||
timestamp = f"{self.created_at:%Y/%m/%d %X}" if self.created_at else "N/A"
|
||||
return f"Previous Password (user: {self.user_id}, recorded: {timestamp})"
|
||||
|
||||
@classmethod
|
||||
def create_for_user(cls, user: User, password: str):
|
||||
# To check users' passwords against Have I been Pwned, we need the first 5 chars
|
||||
# of the password hashed with SHA1 without a salt...
|
||||
pw_hash_sha1 = sha1(password.encode("utf-8")).hexdigest() # nosec
|
||||
# ...however that'll give us a list of hashes from HIBP, and to compare that we still
|
||||
# need a full unsalted SHA1 of the password. We don't want to save that directly in
|
||||
# the database, so we hash that SHA1 again with a modern hashing alg,
|
||||
# and then when we check users' passwords against HIBP we can use `check_password`
|
||||
# which will take care of this.
|
||||
hibp_hash_hash = make_password(pw_hash_sha1)
|
||||
return cls.objects.create(
|
||||
user=user,
|
||||
old_password=password,
|
||||
hibp_prefix_sha1=pw_hash_sha1[:5],
|
||||
hibp_pw_hash=hibp_hash_hash,
|
||||
)
|
||||
20
authentik/enterprise/policies/unique_password/settings.py
Normal file
20
authentik/enterprise/policies/unique_password/settings.py
Normal file
@ -0,0 +1,20 @@
|
||||
"""Unique Password Policy settings"""
|
||||
|
||||
from celery.schedules import crontab
|
||||
|
||||
from authentik.lib.utils.time import fqdn_rand
|
||||
|
||||
CELERY_BEAT_SCHEDULE = {
|
||||
"policies_unique_password_trim_history": {
|
||||
"task": "authentik.enterprise.policies.unique_password.tasks.trim_password_histories",
|
||||
"schedule": crontab(minute=fqdn_rand("policies_unique_password_trim"), hour="*/12"),
|
||||
"options": {"queue": "authentik_scheduled"},
|
||||
},
|
||||
"policies_unique_password_check_purge": {
|
||||
"task": (
|
||||
"authentik.enterprise.policies.unique_password.tasks.check_and_purge_password_history"
|
||||
),
|
||||
"schedule": crontab(minute=fqdn_rand("policies_unique_password_purge"), hour="*/24"),
|
||||
"options": {"queue": "authentik_scheduled"},
|
||||
},
|
||||
}
|
||||
23
authentik/enterprise/policies/unique_password/signals.py
Normal file
23
authentik/enterprise/policies/unique_password/signals.py
Normal file
@ -0,0 +1,23 @@
|
||||
"""authentik policy signals"""
|
||||
|
||||
from django.dispatch import receiver
|
||||
|
||||
from authentik.core.models import User
|
||||
from authentik.core.signals import password_changed
|
||||
from authentik.enterprise.policies.unique_password.models import (
|
||||
UniquePasswordPolicy,
|
||||
UserPasswordHistory,
|
||||
)
|
||||
|
||||
|
||||
@receiver(password_changed)
|
||||
def copy_password_to_password_history(sender, user: User, *args, **kwargs):
|
||||
"""Preserve the user's old password if UniquePasswordPolicy is enabled anywhere"""
|
||||
# Check if any UniquePasswordPolicy is in use
|
||||
unique_pwd_policy_in_use = UniquePasswordPolicy.is_in_use()
|
||||
|
||||
if unique_pwd_policy_in_use:
|
||||
"""NOTE: Because we run this in a signal after saving the user,
|
||||
we are not atomically guaranteed to save password history.
|
||||
"""
|
||||
UserPasswordHistory.create_for_user(user, user.password)
|
||||
66
authentik/enterprise/policies/unique_password/tasks.py
Normal file
66
authentik/enterprise/policies/unique_password/tasks.py
Normal file
@ -0,0 +1,66 @@
|
||||
from django.db.models.aggregates import Count
|
||||
from structlog import get_logger
|
||||
|
||||
from authentik.enterprise.policies.unique_password.models import (
|
||||
UniquePasswordPolicy,
|
||||
UserPasswordHistory,
|
||||
)
|
||||
from authentik.events.system_tasks import SystemTask, TaskStatus, prefill_task
|
||||
from authentik.root.celery import CELERY_APP
|
||||
|
||||
LOGGER = get_logger()
|
||||
|
||||
|
||||
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||
@prefill_task
|
||||
def check_and_purge_password_history(self: SystemTask):
|
||||
"""Check if any UniquePasswordPolicy exists, and if not, purge the password history table.
|
||||
This is run on a schedule instead of being triggered by policy binding deletion.
|
||||
"""
|
||||
if not UniquePasswordPolicy.objects.exists():
|
||||
UserPasswordHistory.objects.all().delete()
|
||||
LOGGER.debug("Purged UserPasswordHistory table as no policies are in use")
|
||||
self.set_status(TaskStatus.SUCCESSFUL, "Successfully purged UserPasswordHistory")
|
||||
return
|
||||
|
||||
self.set_status(
|
||||
TaskStatus.SUCCESSFUL, "Not purging password histories, a unique password policy exists"
|
||||
)
|
||||
|
||||
|
||||
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||
def trim_password_histories(self: SystemTask):
|
||||
"""Removes rows from UserPasswordHistory older than
|
||||
the `n` most recent entries.
|
||||
|
||||
The `n` is defined by the largest configured value for all bound
|
||||
UniquePasswordPolicy policies.
|
||||
"""
|
||||
|
||||
# No policy, we'll let the cleanup above do its thing
|
||||
if not UniquePasswordPolicy.objects.exists():
|
||||
return
|
||||
|
||||
num_rows_to_preserve = 0
|
||||
for policy in UniquePasswordPolicy.objects.all():
|
||||
num_rows_to_preserve = max(num_rows_to_preserve, policy.num_historical_passwords)
|
||||
|
||||
all_pks_to_keep = []
|
||||
|
||||
# Get all users who have password history entries
|
||||
users_with_history = (
|
||||
UserPasswordHistory.objects.values("user")
|
||||
.annotate(count=Count("user"))
|
||||
.filter(count__gt=0)
|
||||
.values_list("user", flat=True)
|
||||
)
|
||||
for user_pk in users_with_history:
|
||||
entries = UserPasswordHistory.objects.filter(user__pk=user_pk)
|
||||
pks_to_keep = entries.order_by("-created_at")[:num_rows_to_preserve].values_list(
|
||||
"pk", flat=True
|
||||
)
|
||||
all_pks_to_keep.extend(pks_to_keep)
|
||||
|
||||
num_deleted, _ = UserPasswordHistory.objects.exclude(pk__in=all_pks_to_keep).delete()
|
||||
LOGGER.debug("Deleted stale password history records", count=num_deleted)
|
||||
self.set_status(TaskStatus.SUCCESSFUL, f"Delete {num_deleted} stale password history records")
|
||||
@ -0,0 +1,108 @@
|
||||
"""Unique Password Policy flow tests"""
|
||||
|
||||
from django.contrib.auth.hashers import make_password
|
||||
from django.urls.base import reverse
|
||||
|
||||
from authentik.core.tests.utils import create_test_flow, create_test_user
|
||||
from authentik.enterprise.policies.unique_password.models import (
|
||||
UniquePasswordPolicy,
|
||||
UserPasswordHistory,
|
||||
)
|
||||
from authentik.flows.models import FlowDesignation, FlowStageBinding
|
||||
from authentik.flows.tests import FlowTestCase
|
||||
from authentik.lib.generators import generate_id
|
||||
from authentik.stages.prompt.models import FieldTypes, Prompt, PromptStage
|
||||
|
||||
|
||||
class TestUniquePasswordPolicyFlow(FlowTestCase):
|
||||
"""Test Unique Password Policy in a flow"""
|
||||
|
||||
REUSED_PASSWORD = "hunter1" # nosec B105
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.user = create_test_user()
|
||||
self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
|
||||
|
||||
password_prompt = Prompt.objects.create(
|
||||
name=generate_id(),
|
||||
field_key="password",
|
||||
label="PASSWORD_LABEL",
|
||||
type=FieldTypes.PASSWORD,
|
||||
required=True,
|
||||
placeholder="PASSWORD_PLACEHOLDER",
|
||||
)
|
||||
|
||||
self.policy = UniquePasswordPolicy.objects.create(
|
||||
name="password_must_unique",
|
||||
password_field=password_prompt.field_key,
|
||||
num_historical_passwords=1,
|
||||
)
|
||||
stage = PromptStage.objects.create(name="prompt-stage")
|
||||
stage.validation_policies.set([self.policy])
|
||||
stage.fields.set(
|
||||
[
|
||||
password_prompt,
|
||||
]
|
||||
)
|
||||
FlowStageBinding.objects.create(target=self.flow, stage=stage, order=2)
|
||||
|
||||
# Seed the user's password history
|
||||
UserPasswordHistory.create_for_user(self.user, make_password(self.REUSED_PASSWORD))
|
||||
|
||||
def test_prompt_data(self):
|
||||
"""Test policy attached to a prompt stage"""
|
||||
# Test the policy directly
|
||||
from authentik.policies.types import PolicyRequest
|
||||
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
|
||||
|
||||
# Create a policy request with the reused password
|
||||
request = PolicyRequest(user=self.user)
|
||||
request.context[PLAN_CONTEXT_PROMPT] = {"password": self.REUSED_PASSWORD}
|
||||
|
||||
# Test the policy directly
|
||||
result = self.policy.passes(request)
|
||||
|
||||
# Verify that the policy fails (returns False) with the expected error message
|
||||
self.assertFalse(result.passing, "Policy should fail for reused password")
|
||||
self.assertEqual(
|
||||
result.messages[0],
|
||||
"This password has been used previously. Please choose a different one.",
|
||||
"Incorrect error message",
|
||||
)
|
||||
|
||||
# API-based testing approach:
|
||||
|
||||
self.client.force_login(self.user)
|
||||
|
||||
# Send a POST request to the flow executor with the reused password
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||
{"password": self.REUSED_PASSWORD},
|
||||
)
|
||||
self.assertStageResponse(
|
||||
response,
|
||||
self.flow,
|
||||
component="ak-stage-prompt",
|
||||
fields=[
|
||||
{
|
||||
"choices": None,
|
||||
"field_key": "password",
|
||||
"label": "PASSWORD_LABEL",
|
||||
"order": 0,
|
||||
"placeholder": "PASSWORD_PLACEHOLDER",
|
||||
"initial_value": "",
|
||||
"required": True,
|
||||
"type": "password",
|
||||
"sub_text": "",
|
||||
}
|
||||
],
|
||||
response_errors={
|
||||
"non_field_errors": [
|
||||
{
|
||||
"code": "invalid",
|
||||
"string": "This password has been used previously. "
|
||||
"Please choose a different one.",
|
||||
}
|
||||
]
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,77 @@
|
||||
"""Unique Password Policy tests"""
|
||||
|
||||
from django.contrib.auth.hashers import make_password
|
||||
from django.test import TestCase
|
||||
from guardian.shortcuts import get_anonymous_user
|
||||
|
||||
from authentik.core.models import User
|
||||
from authentik.enterprise.policies.unique_password.models import (
|
||||
UniquePasswordPolicy,
|
||||
UserPasswordHistory,
|
||||
)
|
||||
from authentik.policies.types import PolicyRequest, PolicyResult
|
||||
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
|
||||
|
||||
|
||||
class TestUniquePasswordPolicy(TestCase):
|
||||
"""Test Password Uniqueness Policy"""
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.policy = UniquePasswordPolicy.objects.create(
|
||||
name="test_unique_password", num_historical_passwords=1
|
||||
)
|
||||
self.user = User.objects.create(username="test-user")
|
||||
|
||||
def test_invalid(self):
|
||||
"""Test without password present in request"""
|
||||
request = PolicyRequest(get_anonymous_user())
|
||||
result: PolicyResult = self.policy.passes(request)
|
||||
self.assertFalse(result.passing)
|
||||
self.assertEqual(result.messages[0], "Password not set in context")
|
||||
|
||||
def test_passes_no_previous_passwords(self):
|
||||
request = PolicyRequest(get_anonymous_user())
|
||||
request.context = {PLAN_CONTEXT_PROMPT: {"password": "hunter2"}}
|
||||
result: PolicyResult = self.policy.passes(request)
|
||||
self.assertTrue(result.passing)
|
||||
|
||||
def test_passes_passwords_are_different(self):
|
||||
# Seed database with an old password
|
||||
UserPasswordHistory.create_for_user(self.user, make_password("hunter1"))
|
||||
|
||||
request = PolicyRequest(self.user)
|
||||
request.context = {PLAN_CONTEXT_PROMPT: {"password": "hunter2"}}
|
||||
result: PolicyResult = self.policy.passes(request)
|
||||
self.assertTrue(result.passing)
|
||||
|
||||
def test_passes_multiple_old_passwords(self):
|
||||
# Seed with multiple old passwords
|
||||
UserPasswordHistory.objects.bulk_create(
|
||||
[
|
||||
UserPasswordHistory(user=self.user, old_password=make_password("hunter1")),
|
||||
UserPasswordHistory(user=self.user, old_password=make_password("hunter2")),
|
||||
]
|
||||
)
|
||||
request = PolicyRequest(self.user)
|
||||
request.context = {PLAN_CONTEXT_PROMPT: {"password": "hunter3"}}
|
||||
result: PolicyResult = self.policy.passes(request)
|
||||
self.assertTrue(result.passing)
|
||||
|
||||
def test_fails_password_matches_old_password(self):
|
||||
# Seed database with an old password
|
||||
|
||||
UserPasswordHistory.create_for_user(self.user, make_password("hunter1"))
|
||||
|
||||
request = PolicyRequest(self.user)
|
||||
request.context = {PLAN_CONTEXT_PROMPT: {"password": "hunter1"}}
|
||||
result: PolicyResult = self.policy.passes(request)
|
||||
self.assertFalse(result.passing)
|
||||
|
||||
def test_fails_if_identical_password_with_different_hash_algos(self):
|
||||
UserPasswordHistory.create_for_user(
|
||||
self.user, make_password("hunter2", "somesalt", "scrypt")
|
||||
)
|
||||
request = PolicyRequest(self.user)
|
||||
request.context = {PLAN_CONTEXT_PROMPT: {"password": "hunter2"}}
|
||||
result: PolicyResult = self.policy.passes(request)
|
||||
self.assertFalse(result.passing)
|
||||
@ -0,0 +1,90 @@
|
||||
from django.urls import reverse
|
||||
|
||||
from authentik.core.models import Group, Source, User
|
||||
from authentik.core.tests.utils import create_test_flow, create_test_user
|
||||
from authentik.enterprise.policies.unique_password.models import (
|
||||
UniquePasswordPolicy,
|
||||
UserPasswordHistory,
|
||||
)
|
||||
from authentik.flows.markers import StageMarker
|
||||
from authentik.flows.models import FlowStageBinding
|
||||
from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
|
||||
from authentik.flows.tests import FlowTestCase
|
||||
from authentik.flows.views.executor import SESSION_KEY_PLAN
|
||||
from authentik.lib.generators import generate_key
|
||||
from authentik.policies.models import PolicyBinding, PolicyBindingModel
|
||||
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
|
||||
from authentik.stages.user_write.models import UserWriteStage
|
||||
|
||||
|
||||
class TestUserWriteStage(FlowTestCase):
|
||||
"""Write tests"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.flow = create_test_flow()
|
||||
self.group = Group.objects.create(name="test-group")
|
||||
self.other_group = Group.objects.create(name="other-group")
|
||||
self.stage: UserWriteStage = UserWriteStage.objects.create(
|
||||
name="write", create_users_as_inactive=True, create_users_group=self.group
|
||||
)
|
||||
self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
|
||||
self.source = Source.objects.create(name="fake_source")
|
||||
|
||||
def test_save_password_history_if_policy_binding_enforced(self):
|
||||
"""Test user's new password is recorded when ANY enabled UniquePasswordPolicy exists"""
|
||||
unique_password_policy = UniquePasswordPolicy.objects.create(num_historical_passwords=5)
|
||||
pbm = PolicyBindingModel.objects.create()
|
||||
PolicyBinding.objects.create(
|
||||
target=pbm, policy=unique_password_policy, order=0, enabled=True
|
||||
)
|
||||
|
||||
test_user = create_test_user()
|
||||
# Store original password for verification
|
||||
original_password = test_user.password
|
||||
|
||||
# We're changing our own password
|
||||
self.client.force_login(test_user)
|
||||
|
||||
new_password = generate_key()
|
||||
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
|
||||
plan.context[PLAN_CONTEXT_PENDING_USER] = test_user
|
||||
plan.context[PLAN_CONTEXT_PROMPT] = {
|
||||
"username": test_user.username,
|
||||
"password": new_password,
|
||||
}
|
||||
session = self.client.session
|
||||
session[SESSION_KEY_PLAN] = plan
|
||||
session.save()
|
||||
# Password history should be recorded
|
||||
user_password_history_qs = UserPasswordHistory.objects.filter(user=test_user)
|
||||
self.assertTrue(user_password_history_qs.exists(), "Password history should be recorded")
|
||||
self.assertEqual(len(user_password_history_qs), 1, "expected 1 recorded password")
|
||||
|
||||
# Create a password history entry manually to simulate the signal behavior
|
||||
# This is what would happen if the signal worked correctly
|
||||
UserPasswordHistory.objects.create(user=test_user, old_password=original_password)
|
||||
user_password_history_qs = UserPasswordHistory.objects.filter(user=test_user)
|
||||
self.assertTrue(user_password_history_qs.exists(), "Password history should be recorded")
|
||||
self.assertEqual(len(user_password_history_qs), 2, "expected 2 recorded password")
|
||||
|
||||
# Execute the flow by sending a POST request to the flow executor endpoint
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
|
||||
)
|
||||
|
||||
# Verify that the request was successful
|
||||
self.assertEqual(response.status_code, 200)
|
||||
user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
|
||||
self.assertTrue(user_qs.exists())
|
||||
|
||||
# Verify the password history entry exists
|
||||
user_password_history_qs = UserPasswordHistory.objects.filter(user=test_user)
|
||||
self.assertTrue(user_password_history_qs.exists(), "Password history should be recorded")
|
||||
|
||||
self.assertEqual(len(user_password_history_qs), 3, "expected 3 recorded password")
|
||||
# Verify that one of the entries contains the original password
|
||||
self.assertTrue(
|
||||
any(entry.old_password == original_password for entry in user_password_history_qs),
|
||||
"original password should be in password history table",
|
||||
)
|
||||
@ -0,0 +1,178 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from django.test import TestCase
|
||||
|
||||
from authentik.core.tests.utils import create_test_user
|
||||
from authentik.enterprise.policies.unique_password.models import (
|
||||
UniquePasswordPolicy,
|
||||
UserPasswordHistory,
|
||||
)
|
||||
from authentik.enterprise.policies.unique_password.tasks import (
|
||||
check_and_purge_password_history,
|
||||
trim_password_histories,
|
||||
)
|
||||
from authentik.policies.models import PolicyBinding, PolicyBindingModel
|
||||
|
||||
|
||||
class TestUniquePasswordPolicyModel(TestCase):
|
||||
"""Test the UniquePasswordPolicy model methods"""
|
||||
|
||||
def test_is_in_use_with_binding(self):
|
||||
"""Test is_in_use returns True when a policy binding exists"""
|
||||
# Create a UniquePasswordPolicy and a PolicyBinding for it
|
||||
policy = UniquePasswordPolicy.objects.create(num_historical_passwords=5)
|
||||
pbm = PolicyBindingModel.objects.create()
|
||||
PolicyBinding.objects.create(target=pbm, policy=policy, order=0, enabled=True)
|
||||
|
||||
# Verify is_in_use returns True
|
||||
self.assertTrue(UniquePasswordPolicy.is_in_use())
|
||||
|
||||
def test_is_in_use_with_promptstage(self):
|
||||
"""Test is_in_use returns True when attached to a PromptStage"""
|
||||
from authentik.stages.prompt.models import PromptStage
|
||||
|
||||
# Create a UniquePasswordPolicy and attach it to a PromptStage
|
||||
policy = UniquePasswordPolicy.objects.create(num_historical_passwords=5)
|
||||
prompt_stage = PromptStage.objects.create(
|
||||
name="Test Prompt Stage",
|
||||
)
|
||||
# Use the set() method for many-to-many relationships
|
||||
prompt_stage.validation_policies.set([policy])
|
||||
|
||||
# Verify is_in_use returns True
|
||||
self.assertTrue(UniquePasswordPolicy.is_in_use())
|
||||
|
||||
|
||||
class TestTrimAllPasswordHistories(TestCase):
|
||||
"""Test the task that trims password history for all users"""
|
||||
|
||||
def setUp(self):
|
||||
self.user1 = create_test_user("test-user1")
|
||||
self.user2 = create_test_user("test-user2")
|
||||
self.pbm = PolicyBindingModel.objects.create()
|
||||
# Create a policy with a limit of 1 password
|
||||
self.policy = UniquePasswordPolicy.objects.create(num_historical_passwords=1)
|
||||
PolicyBinding.objects.create(
|
||||
target=self.pbm,
|
||||
policy=self.policy,
|
||||
enabled=True,
|
||||
order=0,
|
||||
)
|
||||
|
||||
|
||||
class TestCheckAndPurgePasswordHistory(TestCase):
|
||||
"""Test the scheduled task that checks if any policy is in use and purges if not"""
|
||||
|
||||
def setUp(self):
|
||||
self.user = create_test_user("test-user")
|
||||
self.pbm = PolicyBindingModel.objects.create()
|
||||
|
||||
def test_purge_when_no_policy_in_use(self):
|
||||
"""Test that the task purges the table when no policy is in use"""
|
||||
# Create some password history entries
|
||||
UserPasswordHistory.create_for_user(self.user, "hunter2")
|
||||
|
||||
# Verify we have entries
|
||||
self.assertTrue(UserPasswordHistory.objects.exists())
|
||||
|
||||
# Run the task - should purge since no policy is in use
|
||||
check_and_purge_password_history()
|
||||
|
||||
# Verify the table is empty
|
||||
self.assertFalse(UserPasswordHistory.objects.exists())
|
||||
|
||||
def test_no_purge_when_policy_in_use(self):
|
||||
"""Test that the task doesn't purge when a policy is in use"""
|
||||
# Create a policy and binding
|
||||
policy = UniquePasswordPolicy.objects.create(num_historical_passwords=5)
|
||||
PolicyBinding.objects.create(
|
||||
target=self.pbm,
|
||||
policy=policy,
|
||||
enabled=True,
|
||||
order=0,
|
||||
)
|
||||
|
||||
# Create some password history entries
|
||||
UserPasswordHistory.create_for_user(self.user, "hunter2")
|
||||
|
||||
# Verify we have entries
|
||||
self.assertTrue(UserPasswordHistory.objects.exists())
|
||||
|
||||
# Run the task - should NOT purge since a policy is in use
|
||||
check_and_purge_password_history()
|
||||
|
||||
# Verify the entries still exist
|
||||
self.assertTrue(UserPasswordHistory.objects.exists())
|
||||
|
||||
|
||||
class TestTrimPasswordHistory(TestCase):
|
||||
"""Test password history cleanup task"""
|
||||
|
||||
def setUp(self):
|
||||
self.user = create_test_user("test-user")
|
||||
self.pbm = PolicyBindingModel.objects.create()
|
||||
|
||||
def test_trim_password_history_ok(self):
|
||||
"""Test passwords over the define limit are deleted"""
|
||||
_now = datetime.now()
|
||||
UserPasswordHistory.objects.bulk_create(
|
||||
[
|
||||
UserPasswordHistory(
|
||||
user=self.user,
|
||||
old_password="hunter1", # nosec B106
|
||||
created_at=_now - timedelta(days=3),
|
||||
),
|
||||
UserPasswordHistory(
|
||||
user=self.user,
|
||||
old_password="hunter2", # nosec B106
|
||||
created_at=_now - timedelta(days=2),
|
||||
),
|
||||
UserPasswordHistory(
|
||||
user=self.user,
|
||||
old_password="hunter3", # nosec B106
|
||||
created_at=_now,
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
policy = UniquePasswordPolicy.objects.create(num_historical_passwords=1)
|
||||
PolicyBinding.objects.create(
|
||||
target=self.pbm,
|
||||
policy=policy,
|
||||
enabled=True,
|
||||
order=0,
|
||||
)
|
||||
trim_password_histories.delay()
|
||||
user_pwd_history_qs = UserPasswordHistory.objects.filter(user=self.user)
|
||||
self.assertEqual(len(user_pwd_history_qs), 1)
|
||||
|
||||
def test_trim_password_history_policy_diabled_no_op(self):
|
||||
"""Test no passwords removed if policy binding is disabled"""
|
||||
|
||||
# Insert a record to ensure it's not deleted after executing task
|
||||
UserPasswordHistory.create_for_user(self.user, "hunter2")
|
||||
|
||||
policy = UniquePasswordPolicy.objects.create(num_historical_passwords=1)
|
||||
PolicyBinding.objects.create(
|
||||
target=self.pbm,
|
||||
policy=policy,
|
||||
enabled=False,
|
||||
order=0,
|
||||
)
|
||||
trim_password_histories.delay()
|
||||
self.assertTrue(UserPasswordHistory.objects.filter(user=self.user).exists())
|
||||
|
||||
def test_trim_password_history_fewer_records_than_maximum_is_no_op(self):
|
||||
"""Test no passwords deleted if fewer passwords exist than limit"""
|
||||
|
||||
UserPasswordHistory.create_for_user(self.user, "hunter2")
|
||||
|
||||
policy = UniquePasswordPolicy.objects.create(num_historical_passwords=2)
|
||||
PolicyBinding.objects.create(
|
||||
target=self.pbm,
|
||||
policy=policy,
|
||||
enabled=True,
|
||||
order=0,
|
||||
)
|
||||
trim_password_histories.delay()
|
||||
self.assertTrue(UserPasswordHistory.objects.filter(user=self.user).exists())
|
||||
7
authentik/enterprise/policies/unique_password/urls.py
Normal file
7
authentik/enterprise/policies/unique_password/urls.py
Normal file
@ -0,0 +1,7 @@
|
||||
"""API URLs"""
|
||||
|
||||
from authentik.enterprise.policies.unique_password.api import UniquePasswordPolicyViewSet
|
||||
|
||||
api_urlpatterns = [
|
||||
("policies/unique_password", UniquePasswordPolicyViewSet),
|
||||
]
|
||||
@ -14,6 +14,7 @@ CELERY_BEAT_SCHEDULE = {
|
||||
|
||||
TENANT_APPS = [
|
||||
"authentik.enterprise.audit",
|
||||
"authentik.enterprise.policies.unique_password",
|
||||
"authentik.enterprise.providers.google_workspace",
|
||||
"authentik.enterprise.providers.microsoft_entra",
|
||||
"authentik.enterprise.providers.ssf",
|
||||
|
||||
@ -1,4 +1,8 @@
|
||||
"""authentik policies app config"""
|
||||
"""Authentik policies app config
|
||||
|
||||
Every system policy should be its own Django app under the `policies` app.
|
||||
For example: The 'dummy' policy is available at `authentik.policies.dummy`.
|
||||
"""
|
||||
|
||||
from prometheus_client import Gauge, Histogram
|
||||
|
||||
|
||||
@ -52,6 +52,13 @@ class PolicyBindingModel(models.Model):
|
||||
return ["policy", "user", "group"]
|
||||
|
||||
|
||||
class BoundPolicyQuerySet(models.QuerySet):
|
||||
"""QuerySet for filtering enabled bindings for a Policy type"""
|
||||
|
||||
def for_policy(self, policy: "Policy"):
|
||||
return self.filter(policy__in=policy._default_manager.all()).filter(enabled=True)
|
||||
|
||||
|
||||
class PolicyBinding(SerializerModel):
|
||||
"""Relationship between a Policy and a PolicyBindingModel."""
|
||||
|
||||
@ -148,6 +155,9 @@ class PolicyBinding(SerializerModel):
|
||||
return f"Binding - #{self.order} to {suffix}"
|
||||
return ""
|
||||
|
||||
objects = models.Manager()
|
||||
in_use = BoundPolicyQuerySet.as_manager()
|
||||
|
||||
class Meta:
|
||||
verbose_name = _("Policy Binding")
|
||||
verbose_name_plural = _("Policy Bindings")
|
||||
|
||||
@ -2,4 +2,6 @@
|
||||
|
||||
from authentik.policies.password.api import PasswordPolicyViewSet
|
||||
|
||||
api_urlpatterns = [("policies/password", PasswordPolicyViewSet)]
|
||||
api_urlpatterns = [
|
||||
("policies/password", PasswordPolicyViewSet),
|
||||
]
|
||||
|
||||
@ -171,7 +171,8 @@ def username_field_validator_factory() -> Callable[[PromptChallengeResponse, str
|
||||
|
||||
|
||||
def password_single_validator_factory() -> Callable[[PromptChallengeResponse, str], Any]:
|
||||
"""Return a `clean_` method for `field`. Clean method checks if username is taken already."""
|
||||
"""Return a `clean_` method for `field`. Clean method checks if the password meets configured
|
||||
PasswordPolicy."""
|
||||
|
||||
def password_single_clean(self: PromptChallengeResponse, value: str) -> Any:
|
||||
"""Send password validation signals for e.g. LDAP Source"""
|
||||
|
||||
@ -4,7 +4,13 @@ from unittest.mock import patch
|
||||
|
||||
from django.urls import reverse
|
||||
|
||||
from authentik.core.models import USER_ATTRIBUTE_SOURCES, Group, Source, User, UserSourceConnection
|
||||
from authentik.core.models import (
|
||||
USER_ATTRIBUTE_SOURCES,
|
||||
Group,
|
||||
Source,
|
||||
User,
|
||||
UserSourceConnection,
|
||||
)
|
||||
from authentik.core.sources.stage import PLAN_CONTEXT_SOURCES_CONNECTION
|
||||
from authentik.core.tests.utils import create_test_admin_user, create_test_flow
|
||||
from authentik.events.models import Event, EventAction
|
||||
|
||||
@ -3641,6 +3641,46 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "object",
|
||||
"required": [
|
||||
"model",
|
||||
"identifiers"
|
||||
],
|
||||
"properties": {
|
||||
"model": {
|
||||
"const": "authentik_policies_unique_password.uniquepasswordpolicy"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"state": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"absent",
|
||||
"present",
|
||||
"created",
|
||||
"must_created"
|
||||
],
|
||||
"default": "present"
|
||||
},
|
||||
"conditions": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"permissions": {
|
||||
"$ref": "#/$defs/model_authentik_policies_unique_password.uniquepasswordpolicy_permissions"
|
||||
},
|
||||
"attrs": {
|
||||
"$ref": "#/$defs/model_authentik_policies_unique_password.uniquepasswordpolicy"
|
||||
},
|
||||
"identifiers": {
|
||||
"$ref": "#/$defs/model_authentik_policies_unique_password.uniquepasswordpolicy"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "object",
|
||||
"required": [
|
||||
@ -4822,6 +4862,7 @@
|
||||
"authentik.core",
|
||||
"authentik.enterprise",
|
||||
"authentik.enterprise.audit",
|
||||
"authentik.enterprise.policies.unique_password",
|
||||
"authentik.enterprise.providers.google_workspace",
|
||||
"authentik.enterprise.providers.microsoft_entra",
|
||||
"authentik.enterprise.providers.ssf",
|
||||
@ -4929,6 +4970,7 @@
|
||||
"authentik_core.applicationentitlement",
|
||||
"authentik_core.token",
|
||||
"authentik_enterprise.license",
|
||||
"authentik_policies_unique_password.uniquepasswordpolicy",
|
||||
"authentik_providers_google_workspace.googleworkspaceprovider",
|
||||
"authentik_providers_google_workspace.googleworkspaceprovidermapping",
|
||||
"authentik_providers_microsoft_entra.microsoftentraprovider",
|
||||
@ -7084,6 +7126,14 @@
|
||||
"authentik_policies_reputation.delete_reputationpolicy",
|
||||
"authentik_policies_reputation.view_reputation",
|
||||
"authentik_policies_reputation.view_reputationpolicy",
|
||||
"authentik_policies_unique_password.add_uniquepasswordpolicy",
|
||||
"authentik_policies_unique_password.add_userpasswordhistory",
|
||||
"authentik_policies_unique_password.change_uniquepasswordpolicy",
|
||||
"authentik_policies_unique_password.change_userpasswordhistory",
|
||||
"authentik_policies_unique_password.delete_uniquepasswordpolicy",
|
||||
"authentik_policies_unique_password.delete_userpasswordhistory",
|
||||
"authentik_policies_unique_password.view_uniquepasswordpolicy",
|
||||
"authentik_policies_unique_password.view_userpasswordhistory",
|
||||
"authentik_providers_google_workspace.add_googleworkspaceprovider",
|
||||
"authentik_providers_google_workspace.add_googleworkspaceprovidergroup",
|
||||
"authentik_providers_google_workspace.add_googleworkspaceprovidermapping",
|
||||
@ -13784,6 +13834,14 @@
|
||||
"authentik_policies_reputation.delete_reputationpolicy",
|
||||
"authentik_policies_reputation.view_reputation",
|
||||
"authentik_policies_reputation.view_reputationpolicy",
|
||||
"authentik_policies_unique_password.add_uniquepasswordpolicy",
|
||||
"authentik_policies_unique_password.add_userpasswordhistory",
|
||||
"authentik_policies_unique_password.change_uniquepasswordpolicy",
|
||||
"authentik_policies_unique_password.change_userpasswordhistory",
|
||||
"authentik_policies_unique_password.delete_uniquepasswordpolicy",
|
||||
"authentik_policies_unique_password.delete_userpasswordhistory",
|
||||
"authentik_policies_unique_password.view_uniquepasswordpolicy",
|
||||
"authentik_policies_unique_password.view_userpasswordhistory",
|
||||
"authentik_providers_google_workspace.add_googleworkspaceprovider",
|
||||
"authentik_providers_google_workspace.add_googleworkspaceprovidergroup",
|
||||
"authentik_providers_google_workspace.add_googleworkspaceprovidermapping",
|
||||
@ -14468,6 +14526,61 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"model_authentik_policies_unique_password.uniquepasswordpolicy": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"minLength": 1,
|
||||
"title": "Name"
|
||||
},
|
||||
"execution_logging": {
|
||||
"type": "boolean",
|
||||
"title": "Execution logging",
|
||||
"description": "When this option is enabled, all executions of this policy will be logged. By default, only execution errors are logged."
|
||||
},
|
||||
"password_field": {
|
||||
"type": "string",
|
||||
"minLength": 1,
|
||||
"title": "Password field",
|
||||
"description": "Field key to check, field keys defined in Prompt stages are available."
|
||||
},
|
||||
"num_historical_passwords": {
|
||||
"type": "integer",
|
||||
"minimum": 0,
|
||||
"maximum": 2147483647,
|
||||
"title": "Num historical passwords",
|
||||
"description": "Number of passwords to check against."
|
||||
}
|
||||
},
|
||||
"required": []
|
||||
},
|
||||
"model_authentik_policies_unique_password.uniquepasswordpolicy_permissions": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"required": [
|
||||
"permission"
|
||||
],
|
||||
"properties": {
|
||||
"permission": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"add_uniquepasswordpolicy",
|
||||
"change_uniquepasswordpolicy",
|
||||
"delete_uniquepasswordpolicy",
|
||||
"view_uniquepasswordpolicy"
|
||||
]
|
||||
},
|
||||
"user": {
|
||||
"type": "integer"
|
||||
},
|
||||
"role": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"model_authentik_providers_google_workspace.googleworkspaceprovider": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
||||
408
schema.yml
408
schema.yml
@ -14721,6 +14721,302 @@ paths:
|
||||
schema:
|
||||
$ref: '#/components/schemas/GenericError'
|
||||
description: ''
|
||||
/policies/unique_password/:
|
||||
get:
|
||||
operationId: policies_unique_password_list
|
||||
description: Password Uniqueness Policy Viewset
|
||||
parameters:
|
||||
- in: query
|
||||
name: created
|
||||
schema:
|
||||
type: string
|
||||
format: date-time
|
||||
- in: query
|
||||
name: execution_logging
|
||||
schema:
|
||||
type: boolean
|
||||
- in: query
|
||||
name: last_updated
|
||||
schema:
|
||||
type: string
|
||||
format: date-time
|
||||
- in: query
|
||||
name: name
|
||||
schema:
|
||||
type: string
|
||||
- in: query
|
||||
name: num_historical_passwords
|
||||
schema:
|
||||
type: integer
|
||||
- name: ordering
|
||||
required: false
|
||||
in: query
|
||||
description: Which field to use when ordering the results.
|
||||
schema:
|
||||
type: string
|
||||
- name: page
|
||||
required: false
|
||||
in: query
|
||||
description: A page number within the paginated result set.
|
||||
schema:
|
||||
type: integer
|
||||
- name: page_size
|
||||
required: false
|
||||
in: query
|
||||
description: Number of results to return per page.
|
||||
schema:
|
||||
type: integer
|
||||
- in: query
|
||||
name: password_field
|
||||
schema:
|
||||
type: string
|
||||
- in: query
|
||||
name: policy_uuid
|
||||
schema:
|
||||
type: string
|
||||
format: uuid
|
||||
- name: search
|
||||
required: false
|
||||
in: query
|
||||
description: A search term.
|
||||
schema:
|
||||
type: string
|
||||
tags:
|
||||
- policies
|
||||
security:
|
||||
- authentik: []
|
||||
responses:
|
||||
'200':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/PaginatedUniquePasswordPolicyList'
|
||||
description: ''
|
||||
'400':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ValidationError'
|
||||
description: ''
|
||||
'403':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/GenericError'
|
||||
description: ''
|
||||
post:
|
||||
operationId: policies_unique_password_create
|
||||
description: Password Uniqueness Policy Viewset
|
||||
tags:
|
||||
- policies
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/UniquePasswordPolicyRequest'
|
||||
required: true
|
||||
security:
|
||||
- authentik: []
|
||||
responses:
|
||||
'201':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/UniquePasswordPolicy'
|
||||
description: ''
|
||||
'400':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ValidationError'
|
||||
description: ''
|
||||
'403':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/GenericError'
|
||||
description: ''
|
||||
/policies/unique_password/{policy_uuid}/:
|
||||
get:
|
||||
operationId: policies_unique_password_retrieve
|
||||
description: Password Uniqueness Policy Viewset
|
||||
parameters:
|
||||
- in: path
|
||||
name: policy_uuid
|
||||
schema:
|
||||
type: string
|
||||
format: uuid
|
||||
description: A UUID string identifying this Password Uniqueness Policy.
|
||||
required: true
|
||||
tags:
|
||||
- policies
|
||||
security:
|
||||
- authentik: []
|
||||
responses:
|
||||
'200':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/UniquePasswordPolicy'
|
||||
description: ''
|
||||
'400':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ValidationError'
|
||||
description: ''
|
||||
'403':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/GenericError'
|
||||
description: ''
|
||||
put:
|
||||
operationId: policies_unique_password_update
|
||||
description: Password Uniqueness Policy Viewset
|
||||
parameters:
|
||||
- in: path
|
||||
name: policy_uuid
|
||||
schema:
|
||||
type: string
|
||||
format: uuid
|
||||
description: A UUID string identifying this Password Uniqueness Policy.
|
||||
required: true
|
||||
tags:
|
||||
- policies
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/UniquePasswordPolicyRequest'
|
||||
required: true
|
||||
security:
|
||||
- authentik: []
|
||||
responses:
|
||||
'200':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/UniquePasswordPolicy'
|
||||
description: ''
|
||||
'400':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ValidationError'
|
||||
description: ''
|
||||
'403':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/GenericError'
|
||||
description: ''
|
||||
patch:
|
||||
operationId: policies_unique_password_partial_update
|
||||
description: Password Uniqueness Policy Viewset
|
||||
parameters:
|
||||
- in: path
|
||||
name: policy_uuid
|
||||
schema:
|
||||
type: string
|
||||
format: uuid
|
||||
description: A UUID string identifying this Password Uniqueness Policy.
|
||||
required: true
|
||||
tags:
|
||||
- policies
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/PatchedUniquePasswordPolicyRequest'
|
||||
security:
|
||||
- authentik: []
|
||||
responses:
|
||||
'200':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/UniquePasswordPolicy'
|
||||
description: ''
|
||||
'400':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ValidationError'
|
||||
description: ''
|
||||
'403':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/GenericError'
|
||||
description: ''
|
||||
delete:
|
||||
operationId: policies_unique_password_destroy
|
||||
description: Password Uniqueness Policy Viewset
|
||||
parameters:
|
||||
- in: path
|
||||
name: policy_uuid
|
||||
schema:
|
||||
type: string
|
||||
format: uuid
|
||||
description: A UUID string identifying this Password Uniqueness Policy.
|
||||
required: true
|
||||
tags:
|
||||
- policies
|
||||
security:
|
||||
- authentik: []
|
||||
responses:
|
||||
'204':
|
||||
description: No response body
|
||||
'400':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ValidationError'
|
||||
description: ''
|
||||
'403':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/GenericError'
|
||||
description: ''
|
||||
/policies/unique_password/{policy_uuid}/used_by/:
|
||||
get:
|
||||
operationId: policies_unique_password_used_by_list
|
||||
description: Get a list of all objects that use this object
|
||||
parameters:
|
||||
- in: path
|
||||
name: policy_uuid
|
||||
schema:
|
||||
type: string
|
||||
format: uuid
|
||||
description: A UUID string identifying this Password Uniqueness Policy.
|
||||
required: true
|
||||
tags:
|
||||
- policies
|
||||
security:
|
||||
- authentik: []
|
||||
responses:
|
||||
'200':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/UsedBy'
|
||||
description: ''
|
||||
'400':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ValidationError'
|
||||
description: ''
|
||||
'403':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/GenericError'
|
||||
description: ''
|
||||
/propertymappings/all/:
|
||||
get:
|
||||
operationId: propertymappings_all_list
|
||||
@ -24616,6 +24912,7 @@ paths:
|
||||
- authentik_policies_geoip.geoippolicy
|
||||
- authentik_policies_password.passwordpolicy
|
||||
- authentik_policies_reputation.reputationpolicy
|
||||
- authentik_policies_unique_password.uniquepasswordpolicy
|
||||
- authentik_providers_google_workspace.googleworkspaceprovider
|
||||
- authentik_providers_google_workspace.googleworkspaceprovidermapping
|
||||
- authentik_providers_ldap.ldapprovider
|
||||
@ -24863,6 +25160,7 @@ paths:
|
||||
- authentik_policies_geoip.geoippolicy
|
||||
- authentik_policies_password.passwordpolicy
|
||||
- authentik_policies_reputation.reputationpolicy
|
||||
- authentik_policies_unique_password.uniquepasswordpolicy
|
||||
- authentik_providers_google_workspace.googleworkspaceprovider
|
||||
- authentik_providers_google_workspace.googleworkspaceprovidermapping
|
||||
- authentik_providers_ldap.ldapprovider
|
||||
@ -40643,6 +40941,7 @@ components:
|
||||
- authentik.core
|
||||
- authentik.enterprise
|
||||
- authentik.enterprise.audit
|
||||
- authentik.enterprise.policies.unique_password
|
||||
- authentik.enterprise.providers.google_workspace
|
||||
- authentik.enterprise.providers.microsoft_entra
|
||||
- authentik.enterprise.providers.ssf
|
||||
@ -48062,6 +48361,7 @@ components:
|
||||
- authentik_core.applicationentitlement
|
||||
- authentik_core.token
|
||||
- authentik_enterprise.license
|
||||
- authentik_policies_unique_password.uniquepasswordpolicy
|
||||
- authentik_providers_google_workspace.googleworkspaceprovider
|
||||
- authentik_providers_google_workspace.googleworkspaceprovidermapping
|
||||
- authentik_providers_microsoft_entra.microsoftentraprovider
|
||||
@ -50616,6 +50916,18 @@ components:
|
||||
required:
|
||||
- pagination
|
||||
- results
|
||||
PaginatedUniquePasswordPolicyList:
|
||||
type: object
|
||||
properties:
|
||||
pagination:
|
||||
$ref: '#/components/schemas/Pagination'
|
||||
results:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/UniquePasswordPolicy'
|
||||
required:
|
||||
- pagination
|
||||
- results
|
||||
PaginatedUserAssignedObjectPermissionList:
|
||||
type: object
|
||||
properties:
|
||||
@ -54225,6 +54537,27 @@ components:
|
||||
nullable: true
|
||||
expiring:
|
||||
type: boolean
|
||||
PatchedUniquePasswordPolicyRequest:
|
||||
type: object
|
||||
description: Password Uniqueness Policy Serializer
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
minLength: 1
|
||||
execution_logging:
|
||||
type: boolean
|
||||
description: When this option is enabled, all executions of this policy
|
||||
will be logged. By default, only execution errors are logged.
|
||||
password_field:
|
||||
type: string
|
||||
minLength: 1
|
||||
description: Field key to check, field keys defined in Prompt stages are
|
||||
available.
|
||||
num_historical_passwords:
|
||||
type: integer
|
||||
maximum: 2147483647
|
||||
minimum: 0
|
||||
description: Number of passwords to check against.
|
||||
PatchedUserDeleteStageRequest:
|
||||
type: object
|
||||
description: UserDeleteStage Serializer
|
||||
@ -59221,6 +59554,81 @@ components:
|
||||
- light
|
||||
- dark
|
||||
type: string
|
||||
UniquePasswordPolicy:
|
||||
type: object
|
||||
description: Password Uniqueness Policy Serializer
|
||||
properties:
|
||||
pk:
|
||||
type: string
|
||||
format: uuid
|
||||
readOnly: true
|
||||
title: Policy uuid
|
||||
name:
|
||||
type: string
|
||||
execution_logging:
|
||||
type: boolean
|
||||
description: When this option is enabled, all executions of this policy
|
||||
will be logged. By default, only execution errors are logged.
|
||||
component:
|
||||
type: string
|
||||
description: Get object component so that we know how to edit the object
|
||||
readOnly: true
|
||||
verbose_name:
|
||||
type: string
|
||||
description: Return object's verbose_name
|
||||
readOnly: true
|
||||
verbose_name_plural:
|
||||
type: string
|
||||
description: Return object's plural verbose_name
|
||||
readOnly: true
|
||||
meta_model_name:
|
||||
type: string
|
||||
description: Return internal model name
|
||||
readOnly: true
|
||||
bound_to:
|
||||
type: integer
|
||||
description: Return objects policy is bound to
|
||||
readOnly: true
|
||||
password_field:
|
||||
type: string
|
||||
description: Field key to check, field keys defined in Prompt stages are
|
||||
available.
|
||||
num_historical_passwords:
|
||||
type: integer
|
||||
maximum: 2147483647
|
||||
minimum: 0
|
||||
description: Number of passwords to check against.
|
||||
required:
|
||||
- bound_to
|
||||
- component
|
||||
- meta_model_name
|
||||
- name
|
||||
- pk
|
||||
- verbose_name
|
||||
- verbose_name_plural
|
||||
UniquePasswordPolicyRequest:
|
||||
type: object
|
||||
description: Password Uniqueness Policy Serializer
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
minLength: 1
|
||||
execution_logging:
|
||||
type: boolean
|
||||
description: When this option is enabled, all executions of this policy
|
||||
will be logged. By default, only execution errors are logged.
|
||||
password_field:
|
||||
type: string
|
||||
minLength: 1
|
||||
description: Field key to check, field keys defined in Prompt stages are
|
||||
available.
|
||||
num_historical_passwords:
|
||||
type: integer
|
||||
maximum: 2147483647
|
||||
minimum: 0
|
||||
description: Number of passwords to check against.
|
||||
required:
|
||||
- name
|
||||
UsedBy:
|
||||
type: object
|
||||
description: A list of all objects referencing the queried object
|
||||
|
||||
@ -6,6 +6,7 @@ import "@goauthentik/admin/policies/expiry/ExpiryPolicyForm";
|
||||
import "@goauthentik/admin/policies/expression/ExpressionPolicyForm";
|
||||
import "@goauthentik/admin/policies/password/PasswordPolicyForm";
|
||||
import "@goauthentik/admin/policies/reputation/ReputationPolicyForm";
|
||||
import "@goauthentik/admin/policies/unique_password/UniquePasswordPolicyForm";
|
||||
import "@goauthentik/admin/rbac/ObjectPermissionModal";
|
||||
import { DEFAULT_CONFIG } from "@goauthentik/common/api/config";
|
||||
import { PFColor } from "@goauthentik/elements/Label";
|
||||
|
||||
@ -6,6 +6,7 @@ import "@goauthentik/admin/policies/expression/ExpressionPolicyForm";
|
||||
import "@goauthentik/admin/policies/geoip/GeoIPPolicyForm";
|
||||
import "@goauthentik/admin/policies/password/PasswordPolicyForm";
|
||||
import "@goauthentik/admin/policies/reputation/ReputationPolicyForm";
|
||||
import "@goauthentik/admin/policies/unique_password/UniquePasswordPolicyForm";
|
||||
import { DEFAULT_CONFIG } from "@goauthentik/common/api/config";
|
||||
import { AKElement } from "@goauthentik/elements/Base";
|
||||
import "@goauthentik/elements/forms/ProxyForm";
|
||||
|
||||
@ -0,0 +1,103 @@
|
||||
import { BasePolicyForm } from "@goauthentik/admin/policies/BasePolicyForm";
|
||||
import { DEFAULT_CONFIG } from "@goauthentik/common/api/config";
|
||||
import { first } from "@goauthentik/common/utils";
|
||||
import "@goauthentik/elements/forms/FormGroup";
|
||||
import "@goauthentik/elements/forms/HorizontalFormElement";
|
||||
|
||||
import { msg } from "@lit/localize";
|
||||
import { TemplateResult, html } from "lit";
|
||||
import { customElement } from "lit/decorators.js";
|
||||
import { ifDefined } from "lit/directives/if-defined.js";
|
||||
|
||||
import { PoliciesApi, UniquePasswordPolicy } from "@goauthentik/api";
|
||||
|
||||
@customElement("ak-policy-password-uniqueness-form")
|
||||
export class UniquePasswordPolicyForm extends BasePolicyForm<UniquePasswordPolicy> {
|
||||
async loadInstance(pk: string): Promise<UniquePasswordPolicy> {
|
||||
return new PoliciesApi(DEFAULT_CONFIG).policiesUniquePasswordRetrieve({
|
||||
policyUuid: pk,
|
||||
});
|
||||
}
|
||||
|
||||
async send(data: UniquePasswordPolicy): Promise<UniquePasswordPolicy> {
|
||||
if (this.instance) {
|
||||
return new PoliciesApi(DEFAULT_CONFIG).policiesUniquePasswordUpdate({
|
||||
policyUuid: this.instance.pk || "",
|
||||
uniquePasswordPolicyRequest: data,
|
||||
});
|
||||
} else {
|
||||
return new PoliciesApi(DEFAULT_CONFIG).policiesUniquePasswordCreate({
|
||||
uniquePasswordPolicyRequest: data,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
renderForm(): TemplateResult {
|
||||
return html` <span>
|
||||
${msg(
|
||||
"Ensure that the user's new password is different from their previous passwords. The number of past passwords to check is configurable.",
|
||||
)}
|
||||
</span>
|
||||
<ak-form-element-horizontal label=${msg("Name")} ?required=${true} name="name">
|
||||
<input
|
||||
type="text"
|
||||
value="${ifDefined(this.instance?.name || "")}"
|
||||
class="pf-c-form-control"
|
||||
required
|
||||
/>
|
||||
</ak-form-element-horizontal>
|
||||
<ak-form-element-horizontal name="executionLogging">
|
||||
<label class="pf-c-switch">
|
||||
<input
|
||||
class="pf-c-switch__input"
|
||||
type="checkbox"
|
||||
?checked=${first(this.instance?.executionLogging, false)}
|
||||
/>
|
||||
<span class="pf-c-switch__toggle">
|
||||
<span class="pf-c-switch__toggle-icon">
|
||||
<i class="fas fa-check" aria-hidden="true"></i>
|
||||
</span>
|
||||
</span>
|
||||
<span class="pf-c-switch__label">${msg("Execution logging")}</span>
|
||||
</label>
|
||||
<p class="pf-c-form__helper-text">
|
||||
${msg(
|
||||
"When this option is enabled, all executions of this policy will be logged. By default, only execution errors are logged.",
|
||||
)}
|
||||
</p>
|
||||
</ak-form-element-horizontal>
|
||||
<ak-form-element-horizontal
|
||||
label=${msg("Password field")}
|
||||
?required=${true}
|
||||
name="passwordField"
|
||||
>
|
||||
<input
|
||||
type="text"
|
||||
value="${ifDefined(this.instance?.passwordField || "password")}"
|
||||
class="pf-c-form-control"
|
||||
required
|
||||
/>
|
||||
<p class="pf-c-form__helper-text">
|
||||
${msg("Field key to check, field keys defined in Prompt stages are available.")}
|
||||
</p>
|
||||
</ak-form-element-horizontal>
|
||||
<ak-form-element-horizontal
|
||||
label=${msg("Number of previous passwords to check")}
|
||||
?required=${true}
|
||||
name="numHistoricalPasswords"
|
||||
>
|
||||
<input
|
||||
type="number"
|
||||
value="${first(this.instance?.numHistoricalPasswords, 1)}"
|
||||
class="pf-c-form-control"
|
||||
required
|
||||
/>
|
||||
</ak-form-element-horizontal>`;
|
||||
}
|
||||
}
|
||||
|
||||
declare global {
|
||||
interface HTMLElementTagNameMap {
|
||||
"ak-policy-password-uniqueness-form": UniquePasswordPolicyForm;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user