Compare commits

..

2 Commits

Author SHA1 Message Date
ad9b5e98ba honor order in api and web
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2025-04-17 17:22:06 +02:00
e4a21c824a policies: constraint only one type
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2025-04-17 17:13:39 +02:00
635 changed files with 15947 additions and 32123 deletions

View File

@ -1,5 +1,5 @@
[bumpversion]
current_version = 2025.4.1
current_version = 2025.2.4
tag = True
commit = True
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(?:-(?P<rc_t>[a-zA-Z-]+)(?P<rc_n>[1-9]\\d*))?

View File

@ -118,15 +118,3 @@ updates:
prefix: "core:"
labels:
- dependencies
- package-ecosystem: docker-compose
directories:
# - /scripts # Maybe
- /tests/e2e
schedule:
interval: daily
time: "04:00"
open-pull-requests-limit: 10
commit-message:
prefix: "core:"
labels:
- dependencies

View File

@ -53,7 +53,6 @@ jobs:
signoff: true
# ID from https://api.github.com/users/authentik-automation[bot]
author: authentik-automation[bot] <135050075+authentik-automation[bot]@users.noreply.github.com>
labels: dependencies
- uses: peter-evans/enable-pull-request-automerge@v3
with:
token: ${{ steps.generate_token.outputs.token }}

View File

@ -70,18 +70,22 @@ jobs:
- name: checkout stable
run: |
# Copy current, latest config to local
# Temporarly comment the .github backup while migrating to uv
cp authentik/lib/default.yml local.env.yml
cp -R .github ..
# cp -R .github ..
cp -R scripts ..
git checkout $(git tag --sort=version:refname | grep '^version/' | grep -vE -- '-rc[0-9]+$' | tail -n1)
rm -rf .github/ scripts/
mv ../.github ../scripts .
# rm -rf .github/ scripts/
# mv ../.github ../scripts .
rm -rf scripts/
mv ../scripts .
- name: Setup authentik env (stable)
uses: ./.github/actions/setup
with:
postgresql_version: ${{ matrix.psql }}
continue-on-error: true
- name: run migrations to stable
run: uv run python -m lifecycle.migrate
run: poetry run python -m lifecycle.migrate
- name: checkout current code
run: |
set -x
@ -200,7 +204,7 @@ jobs:
uses: actions/cache@v4
with:
path: web/dist
key: ${{ runner.os }}-web-${{ hashFiles('web/package-lock.json', 'web/src/**', 'web/packages/sfe/src/**') }}-b
key: ${{ runner.os }}-web-${{ hashFiles('web/package-lock.json', 'web/src/**') }}
- name: prepare web ui
if: steps.cache-web.outputs.cache-hit != 'true'
working-directory: web
@ -208,7 +212,6 @@ jobs:
npm ci
make -C .. gen-client-ts
npm run build
npm run build:sfe
- name: run e2e
run: |
uv run coverage run manage.py test ${{ matrix.job.glob }}

View File

@ -29,7 +29,7 @@ jobs:
- name: Generate API
run: make gen-client-go
- name: golangci-lint
uses: golangci/golangci-lint-action@v8
uses: golangci/golangci-lint-action@v7
with:
version: latest
args: --timeout 5000s --verbose

View File

@ -37,7 +37,6 @@ jobs:
signoff: true
# ID from https://api.github.com/users/authentik-automation[bot]
author: authentik-automation[bot] <135050075+authentik-automation[bot]@users.noreply.github.com>
labels: dependencies
- uses: peter-evans/enable-pull-request-automerge@v3
with:
token: ${{ steps.generate_token.outputs.token }}

View File

@ -53,7 +53,6 @@ jobs:
body: ${{ steps.compress.outputs.markdown }}
delete-branch: true
signoff: true
labels: dependencies
- uses: peter-evans/enable-pull-request-automerge@v3
if: "${{ github.event_name != 'pull_request' && steps.compress.outputs.markdown != '' }}"
with:

View File

@ -3,10 +3,10 @@ on:
push:
branches: [main]
paths:
- packages/docusaurus-config/**
- packages/eslint-config/**
- packages/prettier-config/**
- packages/tsconfig/**
- packages/docusaurus-config
- packages/eslint-config
- packages/prettier-config
- packages/tsconfig
workflow_dispatch:
jobs:
publish:

View File

@ -52,6 +52,3 @@ jobs:
body: "core, web: update translations"
delete-branch: true
signoff: true
labels: dependencies
# ID from https://api.github.com/users/authentik-automation[bot]
author: authentik-automation[bot] <135050075+authentik-automation[bot]@users.noreply.github.com>

View File

@ -25,13 +25,23 @@ jobs:
env:
GH_TOKEN: ${{ steps.generate_token.outputs.token }}
run: |
title=$(gh pr view ${{ github.event.pull_request.number }} --json "title" -q ".title")
title=$(curl -q -L \
-H "Accept: application/vnd.github+json" \
-H "Authorization: Bearer ${GH_TOKEN}" \
-H "X-GitHub-Api-Version: 2022-11-28" \
https://api.github.com/repos/${GITHUB_REPOSITORY}/pulls/${{ github.event.pull_request.number }} | jq -r .title)
echo "title=${title}" >> "$GITHUB_OUTPUT"
- name: Rename
env:
GH_TOKEN: ${{ steps.generate_token.outputs.token }}
run: |
gh pr edit -t "translate: ${{ steps.title.outputs.title }}" --add-label dependencies
curl -L \
-X PATCH \
-H "Accept: application/vnd.github+json" \
-H "Authorization: Bearer ${GH_TOKEN}" \
-H "X-GitHub-Api-Version: 2022-11-28" \
https://api.github.com/repos/${GITHUB_REPOSITORY}/pulls/${{ github.event.pull_request.number }} \
-d "{\"title\":\"translate: ${{ steps.title.outputs.title }}\"}"
- uses: peter-evans/enable-pull-request-automerge@v3
with:
token: ${{ steps.generate_token.outputs.token }}

View File

@ -16,7 +16,7 @@
],
"typescript.preferences.importModuleSpecifier": "non-relative",
"typescript.preferences.importModuleSpecifierEnding": "index",
"typescript.tsdk": "./node_modules/typescript/lib",
"typescript.tsdk": "./web/node_modules/typescript/lib",
"typescript.enablePromptUseWorkspaceTsdk": true,
"yaml.schemas": {
"./blueprints/schema.json": "blueprints/**/*.yaml"
@ -30,5 +30,7 @@
}
],
"go.testFlags": ["-count=1"],
"github-actions.workflows.pinned.workflows": [".github/workflows/ci-main.yml"]
"github-actions.workflows.pinned.workflows": [
".github/workflows/ci-main.yml"
]
}

View File

@ -40,8 +40,7 @@ COPY ./web /work/web/
COPY ./website /work/website/
COPY ./gen-ts-api /work/web/node_modules/@goauthentik/api
RUN npm run build && \
npm run build:sfe
RUN npm run build
# Stage 3: Build go proxy
FROM --platform=${BUILDPLATFORM} docker.io/library/golang:1.24-bookworm AS go-builder
@ -86,17 +85,18 @@ FROM --platform=${BUILDPLATFORM} ghcr.io/maxmind/geoipupdate:v7.1.0 AS geoip
ENV GEOIPUPDATE_EDITION_IDS="GeoLite2-City GeoLite2-ASN"
ENV GEOIPUPDATE_VERBOSE="1"
ENV GEOIPUPDATE_ACCOUNT_ID_FILE="/run/secrets/GEOIPUPDATE_ACCOUNT_ID"
ENV GEOIPUPDATE_LICENSE_KEY_FILE="/run/secrets/GEOIPUPDATE_LICENSE_KEY"
USER root
RUN --mount=type=secret,id=GEOIPUPDATE_ACCOUNT_ID \
--mount=type=secret,id=GEOIPUPDATE_LICENSE_KEY \
mkdir -p /usr/share/GeoIP && \
/bin/sh -c "GEOIPUPDATE_LICENSE_KEY_FILE=/run/secrets/GEOIPUPDATE_LICENSE_KEY /usr/bin/entry.sh || echo 'Failed to get GeoIP database, disabling'; exit 0"
/bin/sh -c "/usr/bin/entry.sh || echo 'Failed to get GeoIP database, disabling'; exit 0"
# Stage 5: Download uv
FROM ghcr.io/astral-sh/uv:0.7.5 AS uv
FROM ghcr.io/astral-sh/uv:0.6.14 AS uv
# Stage 6: Base python image
FROM ghcr.io/goauthentik/fips-python:3.13.3-slim-bookworm-fips AS python-base
FROM ghcr.io/goauthentik/fips-python:3.12.10-slim-bookworm-fips AS python-base
ENV VENV_PATH="/ak-root/.venv" \
PATH="/lifecycle:/ak-root/.venv/bin:$PATH" \

View File

@ -1,7 +1,6 @@
.PHONY: gen dev-reset all clean test web website
SHELL := /bin/bash
.SHELLFLAGS += ${SHELLFLAGS} -e -o pipefail
.SHELLFLAGS += ${SHELLFLAGS} -e
PWD = $(shell pwd)
UID = $(shell id -u)
GID = $(shell id -g)
@ -9,9 +8,9 @@ NPM_VERSION = $(shell python -m scripts.generate_semver)
PY_SOURCES = authentik tests scripts lifecycle .github
DOCKER_IMAGE ?= "authentik:test"
GEN_API_TS = gen-ts-api
GEN_API_PY = gen-py-api
GEN_API_GO = gen-go-api
GEN_API_TS = "gen-ts-api"
GEN_API_PY = "gen-py-api"
GEN_API_GO = "gen-go-api"
pg_user := $(shell uv run python -m authentik.lib.config postgresql.user 2>/dev/null)
pg_host := $(shell uv run python -m authentik.lib.config postgresql.host 2>/dev/null)
@ -118,19 +117,14 @@ gen-diff: ## (Release) generate the changelog diff between the current schema a
npx prettier --write diff.md
gen-clean-ts: ## Remove generated API client for Typescript
rm -rf ${PWD}/${GEN_API_TS}/
rm -rf ${PWD}/web/node_modules/@goauthentik/api/
rm -rf ./${GEN_API_TS}/
rm -rf ./web/node_modules/@goauthentik/api/
gen-clean-go: ## Remove generated API client for Go
mkdir -p ${PWD}/${GEN_API_GO}
ifneq ($(wildcard ${PWD}/${GEN_API_GO}/.*),)
make -C ${PWD}/${GEN_API_GO} clean
else
rm -rf ${PWD}/${GEN_API_GO}
endif
rm -rf ./${GEN_API_GO}/
gen-clean-py: ## Remove generated API client for Python
rm -rf ${PWD}/${GEN_API_PY}/
rm -rf ./${GEN_API_PY}/
gen-clean: gen-clean-ts gen-clean-go gen-clean-py ## Remove generated API clients
@ -147,8 +141,8 @@ gen-client-ts: gen-clean-ts ## Build and install the authentik API for Typescri
--git-repo-id authentik \
--git-user-id goauthentik
mkdir -p web/node_modules/@goauthentik/api
cd ${PWD}/${GEN_API_TS} && npm i
\cp -rf ${PWD}/${GEN_API_TS}/* web/node_modules/@goauthentik/api
cd ./${GEN_API_TS} && npm i
\cp -rf ./${GEN_API_TS}/* web/node_modules/@goauthentik/api
gen-client-py: gen-clean-py ## Build and install the authentik API for Python
docker run \
@ -162,17 +156,24 @@ gen-client-py: gen-clean-py ## Build and install the authentik API for Python
--additional-properties=packageVersion=${NPM_VERSION} \
--git-repo-id authentik \
--git-user-id goauthentik
pip install ./${GEN_API_PY}
gen-client-go: gen-clean-go ## Build and install the authentik API for Golang
mkdir -p ${PWD}/${GEN_API_GO}
ifeq ($(wildcard ${PWD}/${GEN_API_GO}/.*),)
git clone --depth 1 https://github.com/goauthentik/client-go.git ${PWD}/${GEN_API_GO}
else
cd ${PWD}/${GEN_API_GO} && git pull
endif
cp ${PWD}/schema.yml ${PWD}/${GEN_API_GO}
make -C ${PWD}/${GEN_API_GO} build
mkdir -p ./${GEN_API_GO} ./${GEN_API_GO}/templates
wget https://raw.githubusercontent.com/goauthentik/client-go/main/config.yaml -O ./${GEN_API_GO}/config.yaml
wget https://raw.githubusercontent.com/goauthentik/client-go/main/templates/README.mustache -O ./${GEN_API_GO}/templates/README.mustache
wget https://raw.githubusercontent.com/goauthentik/client-go/main/templates/go.mod.mustache -O ./${GEN_API_GO}/templates/go.mod.mustache
cp schema.yml ./${GEN_API_GO}/
docker run \
--rm -v ${PWD}/${GEN_API_GO}:/local \
--user ${UID}:${GID} \
docker.io/openapitools/openapi-generator-cli:v6.5.0 generate \
-i /local/schema.yml \
-g go \
-o /local/ \
-c /local/config.yaml
go mod edit -replace goauthentik.io/api/v3=./${GEN_API_GO}
rm -rf ./${GEN_API_GO}/config.yaml ./${GEN_API_GO}/templates/
gen-dev-config: ## Generate a local development config file
uv run scripts/generate_config.py
@ -243,7 +244,7 @@ docker: ## Build a docker image of the current source tree
DOCKER_BUILDKIT=1 docker build . --progress plain --tag ${DOCKER_IMAGE}
test-docker:
BUILD=true ${PWD}/scripts/test_docker.sh
BUILD=true ./scripts/test_docker.sh
#########################
## CI

View File

@ -42,4 +42,4 @@ See [SECURITY.md](SECURITY.md)
## Adoption and Contributions
Your organization uses authentik? We'd love to add your logo to the readme and our website! Email us @ hello@goauthentik.io or open a GitHub Issue/PR! For more information on how to contribute to authentik, please refer to our [contribution guide](https://docs.goauthentik.io/docs/developer-docs?utm_source=github).
Your organization uses authentik? We'd love to add your logo to the readme and our website! Email us @ hello@goauthentik.io or open a GitHub Issue/PR! For more information on how to contribute to authentik, please refer to our [CONTRIBUTING.md file](./CONTRIBUTING.md).

View File

@ -20,8 +20,8 @@ Even if the issue is not a CVE, we still greatly appreciate your help in hardeni
| Version | Supported |
| --------- | --------- |
| 2024.12.x | ✅ |
| 2025.2.x | ✅ |
| 2025.4.x | ✅ |
## Reporting a Vulnerability

View File

@ -2,7 +2,7 @@
from os import environ
__version__ = "2025.4.1"
__version__ = "2025.2.4"
ENV_GIT_HASH_KEY = "GIT_BUILD_HASH"

View File

@ -1,12 +1,9 @@
"""API Authentication"""
from hmac import compare_digest
from pathlib import Path
from tempfile import gettempdir
from typing import Any
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from drf_spectacular.extensions import OpenApiAuthenticationExtension
from rest_framework.authentication import BaseAuthentication, get_authorization_header
from rest_framework.exceptions import AuthenticationFailed
@ -14,17 +11,11 @@ from rest_framework.request import Request
from structlog.stdlib import get_logger
from authentik.core.middleware import CTX_AUTH_VIA
from authentik.core.models import Token, TokenIntents, User, UserTypes
from authentik.core.models import Token, TokenIntents, User
from authentik.outposts.models import Outpost
from authentik.providers.oauth2.constants import SCOPE_AUTHENTIK_API
LOGGER = get_logger()
_tmp = Path(gettempdir())
try:
with open(_tmp / "authentik-core-ipc.key") as _f:
ipc_key = _f.read()
except OSError:
ipc_key = None
def validate_auth(header: bytes) -> str | None:
@ -82,11 +73,6 @@ def auth_user_lookup(raw_header: bytes) -> User | None:
if user:
CTX_AUTH_VIA.set("secret_key")
return user
# then try to auth via secret key (for embedded outpost/etc)
user = token_ipc(auth_credentials)
if user:
CTX_AUTH_VIA.set("ipc")
return user
raise AuthenticationFailed("Token invalid/expired")
@ -104,43 +90,6 @@ def token_secret_key(value: str) -> User | None:
return outpost.user
class IPCUser(AnonymousUser):
"""'Virtual' user for IPC communication between authentik core and the authentik router"""
username = "authentik:system"
is_active = True
is_superuser = True
@property
def type(self):
return UserTypes.INTERNAL_SERVICE_ACCOUNT
def has_perm(self, perm, obj=None):
return True
def has_perms(self, perm_list, obj=None):
return True
def has_module_perms(self, module):
return True
@property
def is_anonymous(self):
return False
@property
def is_authenticated(self):
return True
def token_ipc(value: str) -> User | None:
"""Check if the token is the secret key
and return the service account for the managed outpost"""
if not ipc_key or not compare_digest(value, ipc_key):
return None
return IPCUser()
class TokenAuthentication(BaseAuthentication):
"""Token-based authentication using HTTP Bearer authentication"""

View File

@ -54,7 +54,7 @@ def create_component(generator: SchemaGenerator, name, schema, type_=ResolvedCom
return component
def postprocess_schema_responses(result, generator: SchemaGenerator, **kwargs):
def postprocess_schema_responses(result, generator: SchemaGenerator, **kwargs): # noqa: W0613
"""Workaround to set a default response for endpoints.
Workaround suggested at
<https://github.com/tfranzel/drf-spectacular/issues/119#issuecomment-656970357>

View File

@ -164,7 +164,9 @@ class BlueprintEntry:
"""Get the blueprint model, with yaml tags resolved if present"""
return str(self.tag_resolver(self.model, blueprint))
def get_permissions(self, blueprint: "Blueprint") -> Generator[BlueprintEntryPermission]:
def get_permissions(
self, blueprint: "Blueprint"
) -> Generator[BlueprintEntryPermission, None, None]:
"""Get permissions of this entry, with all yaml tags resolved"""
for perm in self.permissions:
yield BlueprintEntryPermission(

View File

@ -59,7 +59,6 @@ class BrandSerializer(ModelSerializer):
"flow_device_code",
"default_application",
"web_certificate",
"client_certificates",
"attributes",
]
extra_kwargs = {
@ -121,7 +120,6 @@ class BrandViewSet(UsedByMixin, ModelViewSet):
"domain",
"branding_title",
"web_certificate__name",
"client_certificates__name",
]
filterset_fields = [
"brand_uuid",
@ -138,7 +136,6 @@ class BrandViewSet(UsedByMixin, ModelViewSet):
"flow_user_settings",
"flow_device_code",
"web_certificate",
"client_certificates",
]
ordering = ["domain"]

View File

@ -16,7 +16,7 @@ def migrate_custom_css(apps: Apps, schema_editor: BaseDatabaseSchemaEditor):
if not path.exists():
return
css = path.read_text()
Brand.objects.using(db_alias).all().update(branding_custom_css=css)
Brand.objects.using(db_alias).update(branding_custom_css=css)
class Migration(migrations.Migration):

View File

@ -1,37 +0,0 @@
# Generated by Django 5.1.9 on 2025-05-19 15:09
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_brands", "0009_brand_branding_default_flow_background"),
("authentik_crypto", "0004_alter_certificatekeypair_name"),
]
operations = [
migrations.AddField(
model_name="brand",
name="client_certificates",
field=models.ManyToManyField(
blank=True,
default=None,
help_text="Certificates used for client authentication.",
to="authentik_crypto.certificatekeypair",
),
),
migrations.AlterField(
model_name="brand",
name="web_certificate",
field=models.ForeignKey(
default=None,
help_text="Web Certificate used by the authentik Core webserver.",
null=True,
on_delete=django.db.models.deletion.SET_DEFAULT,
related_name="+",
to="authentik_crypto.certificatekeypair",
),
),
]

View File

@ -73,13 +73,6 @@ class Brand(SerializerModel):
default=None,
on_delete=models.SET_DEFAULT,
help_text=_("Web Certificate used by the authentik Core webserver."),
related_name="+",
)
client_certificates = models.ManyToManyField(
CertificateKeyPair,
default=None,
blank=True,
help_text=_("Certificates used for client authentication."),
)
attributes = models.JSONField(default=dict, blank=True)

View File

@ -5,10 +5,10 @@ from typing import Any
from django.db.models import F, Q
from django.db.models import Value as V
from django.http.request import HttpRequest
from sentry_sdk import get_current_span
from authentik import get_full_version
from authentik.brands.models import Brand
from authentik.lib.sentry import get_http_meta
from authentik.tenants.models import Tenant
_q_default = Q(default=True)
@ -32,9 +32,13 @@ def context_processor(request: HttpRequest) -> dict[str, Any]:
"""Context Processor that injects brand object into every template"""
brand = getattr(request, "brand", DEFAULT_BRAND)
tenant = getattr(request, "tenant", Tenant())
trace = ""
span = get_current_span()
if span:
trace = span.to_traceparent()
return {
"brand": brand,
"footer_links": tenant.footer_links,
"html_meta": {**get_http_meta()},
"sentry_trace": trace,
"version": get_full_version(),
}

View File

@ -99,17 +99,18 @@ class GroupSerializer(ModelSerializer):
if superuser
else "authentik_core.disable_group_superuser"
)
if self.instance or superuser:
has_perm = user.has_perm(perm) or user.has_perm(perm, self.instance)
if not has_perm:
raise ValidationError(
_(
(
"User does not have permission to set "
"superuser status to {superuser_status}."
).format_map({"superuser_status": superuser})
)
has_perm = user.has_perm(perm)
if self.instance and not has_perm:
has_perm = user.has_perm(perm, self.instance)
if not has_perm:
raise ValidationError(
_(
(
"User does not have permission to set "
"superuser status to {superuser_status}."
).format_map({"superuser_status": superuser})
)
)
return superuser
class Meta:

View File

@ -2,7 +2,6 @@
from django.apps import apps
from django.contrib.auth.management import create_permissions
from django.core.management import call_command
from django.core.management.base import BaseCommand, no_translations
from guardian.management import create_anonymous_user
@ -17,10 +16,6 @@ class Command(BaseCommand):
"""Check permissions for all apps"""
for tenant in Tenant.objects.filter(ready=True):
with tenant:
# See https://code.djangoproject.com/ticket/28417
# Remove potential lingering old permissions
call_command("remove_stale_contenttypes", "--no-input")
for app in apps.get_app_configs():
self.stdout.write(f"Checking app {app.name} ({app.label})\n")
create_permissions(app, verbosity=0)

View File

@ -31,10 +31,7 @@ class PickleSerializer:
def loads(self, data):
"""Unpickle data to be loaded from redis"""
try:
return pickle.loads(data) # nosec
except Exception:
return {}
return pickle.loads(data) # nosec
def _migrate_session(

View File

@ -1,27 +0,0 @@
# Generated by Django 5.1.9 on 2025-05-14 11:15
from django.apps.registry import Apps
from django.db import migrations
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
def remove_old_authenticated_session_content_type(
apps: Apps, schema_editor: BaseDatabaseSchemaEditor
):
db_alias = schema_editor.connection.alias
ContentType = apps.get_model("contenttypes", "ContentType")
ContentType.objects.using(db_alias).filter(model="oldauthenticatedsession").delete()
class Migration(migrations.Migration):
dependencies = [
("authentik_core", "0047_delete_oldauthenticatedsession"),
]
operations = [
migrations.RunPython(
code=remove_old_authenticated_session_content_type,
),
]

View File

@ -21,9 +21,7 @@
<script src="{% versioned_script 'dist/standalone/loading/index-%v.js' %}" type="module"></script>
{% block head %}
{% endblock %}
{% for key, value in html_meta.items %}
<meta name="{{key}}" content="{{ value }}" />
{% endfor %}
<meta name="sentry-trace" content="{{ sentry_trace }}" />
</head>
<body>
{% block body %}

View File

@ -124,16 +124,6 @@ class TestGroupsAPI(APITestCase):
{"is_superuser": ["User does not have permission to set superuser status to True."]},
)
def test_superuser_no_perm_no_superuser(self):
"""Test creating a group without permission and without superuser flag"""
assign_perm("authentik_core.add_group", self.login_user)
self.client.force_login(self.login_user)
res = self.client.post(
reverse("authentik_api:group-list"),
data={"name": generate_id(), "is_superuser": False},
)
self.assertEqual(res.status_code, 201)
def test_superuser_update_no_perm(self):
"""Test updating a superuser group without permission"""
group = Group.objects.create(name=generate_id(), is_superuser=True)

View File

@ -13,10 +13,7 @@ from authentik.core.models import (
TokenIntents,
User,
)
from authentik.core.tasks import (
clean_expired_models,
clean_temporary_users,
)
from authentik.core.tasks import clean_expired_models, clean_temporary_users
from authentik.core.tests.utils import create_test_admin_user
from authentik.lib.generators import generate_id

View File

@ -30,7 +30,6 @@ from structlog.stdlib import get_logger
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer, PassiveSerializer
from authentik.core.models import UserTypes
from authentik.crypto.apps import MANAGED_KEY
from authentik.crypto.builder import CertificateBuilder, PrivateKeyAlg
from authentik.crypto.models import CertificateKeyPair
@ -273,12 +272,11 @@ class CertificateKeyPairViewSet(UsedByMixin, ModelViewSet):
def view_certificate(self, request: Request, pk: str) -> Response:
"""Return certificate-key pairs certificate and log access"""
certificate: CertificateKeyPair = self.get_object()
if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT:
Event.new( # noqa # nosec
EventAction.SECRET_VIEW,
secret=certificate,
type="certificate",
).from_http(request)
Event.new( # noqa # nosec
EventAction.SECRET_VIEW,
secret=certificate,
type="certificate",
).from_http(request)
if "download" in request.query_params:
# Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html
response = HttpResponse(
@ -304,12 +302,11 @@ class CertificateKeyPairViewSet(UsedByMixin, ModelViewSet):
def view_private_key(self, request: Request, pk: str) -> Response:
"""Return certificate-key pairs private key and log access"""
certificate: CertificateKeyPair = self.get_object()
if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT:
Event.new( # noqa # nosec
EventAction.SECRET_VIEW,
secret=certificate,
type="private_key",
).from_http(request)
Event.new( # noqa # nosec
EventAction.SECRET_VIEW,
secret=certificate,
type="private_key",
).from_http(request)
if "download" in request.query_params:
# Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html
response = HttpResponse(certificate.key_data, content_type="application/x-pem-file")

View File

@ -132,14 +132,13 @@ class LicenseKey:
"""Get a summarized version of all (not expired) licenses"""
total = LicenseKey(get_license_aud(), 0, "Summarized license", 0, 0)
for lic in License.objects.all():
if lic.is_valid:
total.internal_users += lic.internal_users
total.external_users += lic.external_users
total.license_flags.extend(lic.status.license_flags)
total.internal_users += lic.internal_users
total.external_users += lic.external_users
exp_ts = int(mktime(lic.expiry.timetuple()))
if total.exp == 0:
total.exp = exp_ts
total.exp = max(total.exp, exp_ts)
total.license_flags.extend(lic.status.license_flags)
return total
@staticmethod

View File

@ -39,10 +39,6 @@ class License(SerializerModel):
internal_users = models.BigIntegerField()
external_users = models.BigIntegerField()
@property
def is_valid(self) -> bool:
return self.expiry >= now()
@property
def serializer(self) -> type[BaseSerializer]:
from authentik.enterprise.api import LicenseSerializer

View File

@ -1,27 +0,0 @@
from rest_framework.viewsets import ModelViewSet
from authentik.core.api.used_by import UsedByMixin
from authentik.enterprise.api import EnterpriseRequiredMixin
from authentik.enterprise.policies.unique_password.models import UniquePasswordPolicy
from authentik.policies.api.policies import PolicySerializer
class UniquePasswordPolicySerializer(EnterpriseRequiredMixin, PolicySerializer):
"""Password Uniqueness Policy Serializer"""
class Meta:
model = UniquePasswordPolicy
fields = PolicySerializer.Meta.fields + [
"password_field",
"num_historical_passwords",
]
class UniquePasswordPolicyViewSet(UsedByMixin, ModelViewSet):
"""Password Uniqueness Policy Viewset"""
queryset = UniquePasswordPolicy.objects.all()
serializer_class = UniquePasswordPolicySerializer
filterset_fields = "__all__"
ordering = ["name"]
search_fields = ["name"]

View File

@ -1,10 +0,0 @@
"""authentik Unique Password policy app config"""
from authentik.enterprise.apps import EnterpriseConfig
class AuthentikEnterprisePoliciesUniquePasswordConfig(EnterpriseConfig):
name = "authentik.enterprise.policies.unique_password"
label = "authentik_policies_unique_password"
verbose_name = "authentik Enterprise.Policies.Unique Password"
default = True

View File

@ -1,81 +0,0 @@
# Generated by Django 5.0.13 on 2025-03-26 23:02
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("authentik_policies", "0011_policybinding_failure_result_and_more"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="UniquePasswordPolicy",
fields=[
(
"policy_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="authentik_policies.policy",
),
),
(
"password_field",
models.TextField(
default="password",
help_text="Field key to check, field keys defined in Prompt stages are available.",
),
),
(
"num_historical_passwords",
models.PositiveIntegerField(
default=1, help_text="Number of passwords to check against."
),
),
],
options={
"verbose_name": "Password Uniqueness Policy",
"verbose_name_plural": "Password Uniqueness Policies",
"indexes": [
models.Index(fields=["policy_ptr_id"], name="authentik_p_policy__f559aa_idx")
],
},
bases=("authentik_policies.policy",),
),
migrations.CreateModel(
name="UserPasswordHistory",
fields=[
(
"id",
models.AutoField(
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
),
),
("old_password", models.CharField(max_length=128)),
("created_at", models.DateTimeField(auto_now_add=True)),
("hibp_prefix_sha1", models.CharField(max_length=5)),
("hibp_pw_hash", models.TextField()),
(
"user",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="old_passwords",
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"verbose_name": "User Password History",
},
),
]

View File

@ -1,151 +0,0 @@
from hashlib import sha1
from django.contrib.auth.hashers import identify_hasher, make_password
from django.db import models
from django.utils.translation import gettext as _
from rest_framework.serializers import BaseSerializer
from structlog.stdlib import get_logger
from authentik.core.models import User
from authentik.policies.models import Policy
from authentik.policies.types import PolicyRequest, PolicyResult
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
LOGGER = get_logger()
class UniquePasswordPolicy(Policy):
"""This policy prevents users from reusing old passwords."""
password_field = models.TextField(
default="password",
help_text=_("Field key to check, field keys defined in Prompt stages are available."),
)
# Limit on the number of previous passwords the policy evaluates
# Also controls number of old passwords the system stores.
num_historical_passwords = models.PositiveIntegerField(
default=1,
help_text=_("Number of passwords to check against."),
)
@property
def serializer(self) -> type[BaseSerializer]:
from authentik.enterprise.policies.unique_password.api import UniquePasswordPolicySerializer
return UniquePasswordPolicySerializer
@property
def component(self) -> str:
return "ak-policy-password-uniqueness-form"
def passes(self, request: PolicyRequest) -> PolicyResult:
from authentik.enterprise.policies.unique_password.models import UserPasswordHistory
password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get(
self.password_field, request.context.get(self.password_field)
)
if not password:
LOGGER.warning(
"Password field not found in request when checking UniquePasswordPolicy",
field=self.password_field,
fields=request.context.keys(),
)
return PolicyResult(False, _("Password not set in context"))
password = str(password)
if not self.num_historical_passwords:
# Policy not configured to check against any passwords
return PolicyResult(True)
num_to_check = self.num_historical_passwords
password_history = UserPasswordHistory.objects.filter(user=request.user).order_by(
"-created_at"
)[:num_to_check]
if not password_history:
return PolicyResult(True)
for record in password_history:
if not record.old_password:
continue
if self._passwords_match(new_password=password, old_password=record.old_password):
# Return on first match. Authentik does not consider timing attacks
# on old passwords to be an attack surface.
return PolicyResult(
False,
_("This password has been used previously. Please choose a different one."),
)
return PolicyResult(True)
def _passwords_match(self, *, new_password: str, old_password: str) -> bool:
try:
hasher = identify_hasher(old_password)
except ValueError:
LOGGER.warning(
"Skipping password; could not load hash algorithm",
)
return False
return hasher.verify(new_password, old_password)
@classmethod
def is_in_use(cls):
"""Check if any UniquePasswordPolicy is in use, either through policy bindings
or direct attachment to a PromptStage.
Returns:
bool: True if any policy is in use, False otherwise
"""
from authentik.policies.models import PolicyBinding
# Check if any policy is in use through bindings
if PolicyBinding.in_use.for_policy(cls).exists():
return True
# Check if any policy is attached to a PromptStage
if cls.objects.filter(promptstage__isnull=False).exists():
return True
return False
class Meta(Policy.PolicyMeta):
verbose_name = _("Password Uniqueness Policy")
verbose_name_plural = _("Password Uniqueness Policies")
class UserPasswordHistory(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="old_passwords")
# Mimic's column type of AbstractBaseUser.password
old_password = models.CharField(max_length=128)
created_at = models.DateTimeField(auto_now_add=True)
hibp_prefix_sha1 = models.CharField(max_length=5)
hibp_pw_hash = models.TextField()
class Meta:
verbose_name = _("User Password History")
def __str__(self) -> str:
timestamp = f"{self.created_at:%Y/%m/%d %X}" if self.created_at else "N/A"
return f"Previous Password (user: {self.user_id}, recorded: {timestamp})"
@classmethod
def create_for_user(cls, user: User, password: str):
# To check users' passwords against Have I been Pwned, we need the first 5 chars
# of the password hashed with SHA1 without a salt...
pw_hash_sha1 = sha1(password.encode("utf-8")).hexdigest() # nosec
# ...however that'll give us a list of hashes from HIBP, and to compare that we still
# need a full unsalted SHA1 of the password. We don't want to save that directly in
# the database, so we hash that SHA1 again with a modern hashing alg,
# and then when we check users' passwords against HIBP we can use `check_password`
# which will take care of this.
hibp_hash_hash = make_password(pw_hash_sha1)
return cls.objects.create(
user=user,
old_password=password,
hibp_prefix_sha1=pw_hash_sha1[:5],
hibp_pw_hash=hibp_hash_hash,
)

View File

@ -1,20 +0,0 @@
"""Unique Password Policy settings"""
from celery.schedules import crontab
from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = {
"policies_unique_password_trim_history": {
"task": "authentik.enterprise.policies.unique_password.tasks.trim_password_histories",
"schedule": crontab(minute=fqdn_rand("policies_unique_password_trim"), hour="*/12"),
"options": {"queue": "authentik_scheduled"},
},
"policies_unique_password_check_purge": {
"task": (
"authentik.enterprise.policies.unique_password.tasks.check_and_purge_password_history"
),
"schedule": crontab(minute=fqdn_rand("policies_unique_password_purge"), hour="*/24"),
"options": {"queue": "authentik_scheduled"},
},
}

View File

@ -1,23 +0,0 @@
"""authentik policy signals"""
from django.dispatch import receiver
from authentik.core.models import User
from authentik.core.signals import password_changed
from authentik.enterprise.policies.unique_password.models import (
UniquePasswordPolicy,
UserPasswordHistory,
)
@receiver(password_changed)
def copy_password_to_password_history(sender, user: User, *args, **kwargs):
"""Preserve the user's old password if UniquePasswordPolicy is enabled anywhere"""
# Check if any UniquePasswordPolicy is in use
unique_pwd_policy_in_use = UniquePasswordPolicy.is_in_use()
if unique_pwd_policy_in_use:
"""NOTE: Because we run this in a signal after saving the user,
we are not atomically guaranteed to save password history.
"""
UserPasswordHistory.create_for_user(user, user.password)

View File

@ -1,66 +0,0 @@
from django.db.models.aggregates import Count
from structlog import get_logger
from authentik.enterprise.policies.unique_password.models import (
UniquePasswordPolicy,
UserPasswordHistory,
)
from authentik.events.system_tasks import SystemTask, TaskStatus, prefill_task
from authentik.root.celery import CELERY_APP
LOGGER = get_logger()
@CELERY_APP.task(bind=True, base=SystemTask)
@prefill_task
def check_and_purge_password_history(self: SystemTask):
"""Check if any UniquePasswordPolicy exists, and if not, purge the password history table.
This is run on a schedule instead of being triggered by policy binding deletion.
"""
if not UniquePasswordPolicy.objects.exists():
UserPasswordHistory.objects.all().delete()
LOGGER.debug("Purged UserPasswordHistory table as no policies are in use")
self.set_status(TaskStatus.SUCCESSFUL, "Successfully purged UserPasswordHistory")
return
self.set_status(
TaskStatus.SUCCESSFUL, "Not purging password histories, a unique password policy exists"
)
@CELERY_APP.task(bind=True, base=SystemTask)
def trim_password_histories(self: SystemTask):
"""Removes rows from UserPasswordHistory older than
the `n` most recent entries.
The `n` is defined by the largest configured value for all bound
UniquePasswordPolicy policies.
"""
# No policy, we'll let the cleanup above do its thing
if not UniquePasswordPolicy.objects.exists():
return
num_rows_to_preserve = 0
for policy in UniquePasswordPolicy.objects.all():
num_rows_to_preserve = max(num_rows_to_preserve, policy.num_historical_passwords)
all_pks_to_keep = []
# Get all users who have password history entries
users_with_history = (
UserPasswordHistory.objects.values("user")
.annotate(count=Count("user"))
.filter(count__gt=0)
.values_list("user", flat=True)
)
for user_pk in users_with_history:
entries = UserPasswordHistory.objects.filter(user__pk=user_pk)
pks_to_keep = entries.order_by("-created_at")[:num_rows_to_preserve].values_list(
"pk", flat=True
)
all_pks_to_keep.extend(pks_to_keep)
num_deleted, _ = UserPasswordHistory.objects.exclude(pk__in=all_pks_to_keep).delete()
LOGGER.debug("Deleted stale password history records", count=num_deleted)
self.set_status(TaskStatus.SUCCESSFUL, f"Delete {num_deleted} stale password history records")

View File

@ -1,108 +0,0 @@
"""Unique Password Policy flow tests"""
from django.contrib.auth.hashers import make_password
from django.urls.base import reverse
from authentik.core.tests.utils import create_test_flow, create_test_user
from authentik.enterprise.policies.unique_password.models import (
UniquePasswordPolicy,
UserPasswordHistory,
)
from authentik.flows.models import FlowDesignation, FlowStageBinding
from authentik.flows.tests import FlowTestCase
from authentik.lib.generators import generate_id
from authentik.stages.prompt.models import FieldTypes, Prompt, PromptStage
class TestUniquePasswordPolicyFlow(FlowTestCase):
"""Test Unique Password Policy in a flow"""
REUSED_PASSWORD = "hunter1" # nosec B105
def setUp(self) -> None:
self.user = create_test_user()
self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
password_prompt = Prompt.objects.create(
name=generate_id(),
field_key="password",
label="PASSWORD_LABEL",
type=FieldTypes.PASSWORD,
required=True,
placeholder="PASSWORD_PLACEHOLDER",
)
self.policy = UniquePasswordPolicy.objects.create(
name="password_must_unique",
password_field=password_prompt.field_key,
num_historical_passwords=1,
)
stage = PromptStage.objects.create(name="prompt-stage")
stage.validation_policies.set([self.policy])
stage.fields.set(
[
password_prompt,
]
)
FlowStageBinding.objects.create(target=self.flow, stage=stage, order=2)
# Seed the user's password history
UserPasswordHistory.create_for_user(self.user, make_password(self.REUSED_PASSWORD))
def test_prompt_data(self):
"""Test policy attached to a prompt stage"""
# Test the policy directly
from authentik.policies.types import PolicyRequest
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
# Create a policy request with the reused password
request = PolicyRequest(user=self.user)
request.context[PLAN_CONTEXT_PROMPT] = {"password": self.REUSED_PASSWORD}
# Test the policy directly
result = self.policy.passes(request)
# Verify that the policy fails (returns False) with the expected error message
self.assertFalse(result.passing, "Policy should fail for reused password")
self.assertEqual(
result.messages[0],
"This password has been used previously. Please choose a different one.",
"Incorrect error message",
)
# API-based testing approach:
self.client.force_login(self.user)
# Send a POST request to the flow executor with the reused password
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
{"password": self.REUSED_PASSWORD},
)
self.assertStageResponse(
response,
self.flow,
component="ak-stage-prompt",
fields=[
{
"choices": None,
"field_key": "password",
"label": "PASSWORD_LABEL",
"order": 0,
"placeholder": "PASSWORD_PLACEHOLDER",
"initial_value": "",
"required": True,
"type": "password",
"sub_text": "",
}
],
response_errors={
"non_field_errors": [
{
"code": "invalid",
"string": "This password has been used previously. "
"Please choose a different one.",
}
]
},
)

View File

@ -1,77 +0,0 @@
"""Unique Password Policy tests"""
from django.contrib.auth.hashers import make_password
from django.test import TestCase
from guardian.shortcuts import get_anonymous_user
from authentik.core.models import User
from authentik.enterprise.policies.unique_password.models import (
UniquePasswordPolicy,
UserPasswordHistory,
)
from authentik.policies.types import PolicyRequest, PolicyResult
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
class TestUniquePasswordPolicy(TestCase):
"""Test Password Uniqueness Policy"""
def setUp(self) -> None:
self.policy = UniquePasswordPolicy.objects.create(
name="test_unique_password", num_historical_passwords=1
)
self.user = User.objects.create(username="test-user")
def test_invalid(self):
"""Test without password present in request"""
request = PolicyRequest(get_anonymous_user())
result: PolicyResult = self.policy.passes(request)
self.assertFalse(result.passing)
self.assertEqual(result.messages[0], "Password not set in context")
def test_passes_no_previous_passwords(self):
request = PolicyRequest(get_anonymous_user())
request.context = {PLAN_CONTEXT_PROMPT: {"password": "hunter2"}}
result: PolicyResult = self.policy.passes(request)
self.assertTrue(result.passing)
def test_passes_passwords_are_different(self):
# Seed database with an old password
UserPasswordHistory.create_for_user(self.user, make_password("hunter1"))
request = PolicyRequest(self.user)
request.context = {PLAN_CONTEXT_PROMPT: {"password": "hunter2"}}
result: PolicyResult = self.policy.passes(request)
self.assertTrue(result.passing)
def test_passes_multiple_old_passwords(self):
# Seed with multiple old passwords
UserPasswordHistory.objects.bulk_create(
[
UserPasswordHistory(user=self.user, old_password=make_password("hunter1")),
UserPasswordHistory(user=self.user, old_password=make_password("hunter2")),
]
)
request = PolicyRequest(self.user)
request.context = {PLAN_CONTEXT_PROMPT: {"password": "hunter3"}}
result: PolicyResult = self.policy.passes(request)
self.assertTrue(result.passing)
def test_fails_password_matches_old_password(self):
# Seed database with an old password
UserPasswordHistory.create_for_user(self.user, make_password("hunter1"))
request = PolicyRequest(self.user)
request.context = {PLAN_CONTEXT_PROMPT: {"password": "hunter1"}}
result: PolicyResult = self.policy.passes(request)
self.assertFalse(result.passing)
def test_fails_if_identical_password_with_different_hash_algos(self):
UserPasswordHistory.create_for_user(
self.user, make_password("hunter2", "somesalt", "scrypt")
)
request = PolicyRequest(self.user)
request.context = {PLAN_CONTEXT_PROMPT: {"password": "hunter2"}}
result: PolicyResult = self.policy.passes(request)
self.assertFalse(result.passing)

View File

@ -1,90 +0,0 @@
from django.urls import reverse
from authentik.core.models import Group, Source, User
from authentik.core.tests.utils import create_test_flow, create_test_user
from authentik.enterprise.policies.unique_password.models import (
UniquePasswordPolicy,
UserPasswordHistory,
)
from authentik.flows.markers import StageMarker
from authentik.flows.models import FlowStageBinding
from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
from authentik.flows.tests import FlowTestCase
from authentik.flows.views.executor import SESSION_KEY_PLAN
from authentik.lib.generators import generate_key
from authentik.policies.models import PolicyBinding, PolicyBindingModel
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
from authentik.stages.user_write.models import UserWriteStage
class TestUserWriteStage(FlowTestCase):
"""Write tests"""
def setUp(self):
super().setUp()
self.flow = create_test_flow()
self.group = Group.objects.create(name="test-group")
self.other_group = Group.objects.create(name="other-group")
self.stage: UserWriteStage = UserWriteStage.objects.create(
name="write", create_users_as_inactive=True, create_users_group=self.group
)
self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
self.source = Source.objects.create(name="fake_source")
def test_save_password_history_if_policy_binding_enforced(self):
"""Test user's new password is recorded when ANY enabled UniquePasswordPolicy exists"""
unique_password_policy = UniquePasswordPolicy.objects.create(num_historical_passwords=5)
pbm = PolicyBindingModel.objects.create()
PolicyBinding.objects.create(
target=pbm, policy=unique_password_policy, order=0, enabled=True
)
test_user = create_test_user()
# Store original password for verification
original_password = test_user.password
# We're changing our own password
self.client.force_login(test_user)
new_password = generate_key()
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
plan.context[PLAN_CONTEXT_PENDING_USER] = test_user
plan.context[PLAN_CONTEXT_PROMPT] = {
"username": test_user.username,
"password": new_password,
}
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
# Password history should be recorded
user_password_history_qs = UserPasswordHistory.objects.filter(user=test_user)
self.assertTrue(user_password_history_qs.exists(), "Password history should be recorded")
self.assertEqual(len(user_password_history_qs), 1, "expected 1 recorded password")
# Create a password history entry manually to simulate the signal behavior
# This is what would happen if the signal worked correctly
UserPasswordHistory.objects.create(user=test_user, old_password=original_password)
user_password_history_qs = UserPasswordHistory.objects.filter(user=test_user)
self.assertTrue(user_password_history_qs.exists(), "Password history should be recorded")
self.assertEqual(len(user_password_history_qs), 2, "expected 2 recorded password")
# Execute the flow by sending a POST request to the flow executor endpoint
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
)
# Verify that the request was successful
self.assertEqual(response.status_code, 200)
user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
self.assertTrue(user_qs.exists())
# Verify the password history entry exists
user_password_history_qs = UserPasswordHistory.objects.filter(user=test_user)
self.assertTrue(user_password_history_qs.exists(), "Password history should be recorded")
self.assertEqual(len(user_password_history_qs), 3, "expected 3 recorded password")
# Verify that one of the entries contains the original password
self.assertTrue(
any(entry.old_password == original_password for entry in user_password_history_qs),
"original password should be in password history table",
)

View File

@ -1,178 +0,0 @@
from datetime import datetime, timedelta
from django.test import TestCase
from authentik.core.tests.utils import create_test_user
from authentik.enterprise.policies.unique_password.models import (
UniquePasswordPolicy,
UserPasswordHistory,
)
from authentik.enterprise.policies.unique_password.tasks import (
check_and_purge_password_history,
trim_password_histories,
)
from authentik.policies.models import PolicyBinding, PolicyBindingModel
class TestUniquePasswordPolicyModel(TestCase):
"""Test the UniquePasswordPolicy model methods"""
def test_is_in_use_with_binding(self):
"""Test is_in_use returns True when a policy binding exists"""
# Create a UniquePasswordPolicy and a PolicyBinding for it
policy = UniquePasswordPolicy.objects.create(num_historical_passwords=5)
pbm = PolicyBindingModel.objects.create()
PolicyBinding.objects.create(target=pbm, policy=policy, order=0, enabled=True)
# Verify is_in_use returns True
self.assertTrue(UniquePasswordPolicy.is_in_use())
def test_is_in_use_with_promptstage(self):
"""Test is_in_use returns True when attached to a PromptStage"""
from authentik.stages.prompt.models import PromptStage
# Create a UniquePasswordPolicy and attach it to a PromptStage
policy = UniquePasswordPolicy.objects.create(num_historical_passwords=5)
prompt_stage = PromptStage.objects.create(
name="Test Prompt Stage",
)
# Use the set() method for many-to-many relationships
prompt_stage.validation_policies.set([policy])
# Verify is_in_use returns True
self.assertTrue(UniquePasswordPolicy.is_in_use())
class TestTrimAllPasswordHistories(TestCase):
"""Test the task that trims password history for all users"""
def setUp(self):
self.user1 = create_test_user("test-user1")
self.user2 = create_test_user("test-user2")
self.pbm = PolicyBindingModel.objects.create()
# Create a policy with a limit of 1 password
self.policy = UniquePasswordPolicy.objects.create(num_historical_passwords=1)
PolicyBinding.objects.create(
target=self.pbm,
policy=self.policy,
enabled=True,
order=0,
)
class TestCheckAndPurgePasswordHistory(TestCase):
"""Test the scheduled task that checks if any policy is in use and purges if not"""
def setUp(self):
self.user = create_test_user("test-user")
self.pbm = PolicyBindingModel.objects.create()
def test_purge_when_no_policy_in_use(self):
"""Test that the task purges the table when no policy is in use"""
# Create some password history entries
UserPasswordHistory.create_for_user(self.user, "hunter2")
# Verify we have entries
self.assertTrue(UserPasswordHistory.objects.exists())
# Run the task - should purge since no policy is in use
check_and_purge_password_history()
# Verify the table is empty
self.assertFalse(UserPasswordHistory.objects.exists())
def test_no_purge_when_policy_in_use(self):
"""Test that the task doesn't purge when a policy is in use"""
# Create a policy and binding
policy = UniquePasswordPolicy.objects.create(num_historical_passwords=5)
PolicyBinding.objects.create(
target=self.pbm,
policy=policy,
enabled=True,
order=0,
)
# Create some password history entries
UserPasswordHistory.create_for_user(self.user, "hunter2")
# Verify we have entries
self.assertTrue(UserPasswordHistory.objects.exists())
# Run the task - should NOT purge since a policy is in use
check_and_purge_password_history()
# Verify the entries still exist
self.assertTrue(UserPasswordHistory.objects.exists())
class TestTrimPasswordHistory(TestCase):
"""Test password history cleanup task"""
def setUp(self):
self.user = create_test_user("test-user")
self.pbm = PolicyBindingModel.objects.create()
def test_trim_password_history_ok(self):
"""Test passwords over the define limit are deleted"""
_now = datetime.now()
UserPasswordHistory.objects.bulk_create(
[
UserPasswordHistory(
user=self.user,
old_password="hunter1", # nosec B106
created_at=_now - timedelta(days=3),
),
UserPasswordHistory(
user=self.user,
old_password="hunter2", # nosec B106
created_at=_now - timedelta(days=2),
),
UserPasswordHistory(
user=self.user,
old_password="hunter3", # nosec B106
created_at=_now,
),
]
)
policy = UniquePasswordPolicy.objects.create(num_historical_passwords=1)
PolicyBinding.objects.create(
target=self.pbm,
policy=policy,
enabled=True,
order=0,
)
trim_password_histories.delay()
user_pwd_history_qs = UserPasswordHistory.objects.filter(user=self.user)
self.assertEqual(len(user_pwd_history_qs), 1)
def test_trim_password_history_policy_diabled_no_op(self):
"""Test no passwords removed if policy binding is disabled"""
# Insert a record to ensure it's not deleted after executing task
UserPasswordHistory.create_for_user(self.user, "hunter2")
policy = UniquePasswordPolicy.objects.create(num_historical_passwords=1)
PolicyBinding.objects.create(
target=self.pbm,
policy=policy,
enabled=False,
order=0,
)
trim_password_histories.delay()
self.assertTrue(UserPasswordHistory.objects.filter(user=self.user).exists())
def test_trim_password_history_fewer_records_than_maximum_is_no_op(self):
"""Test no passwords deleted if fewer passwords exist than limit"""
UserPasswordHistory.create_for_user(self.user, "hunter2")
policy = UniquePasswordPolicy.objects.create(num_historical_passwords=2)
PolicyBinding.objects.create(
target=self.pbm,
policy=policy,
enabled=True,
order=0,
)
trim_password_histories.delay()
self.assertTrue(UserPasswordHistory.objects.filter(user=self.user).exists())

View File

@ -1,7 +0,0 @@
"""API URLs"""
from authentik.enterprise.policies.unique_password.api import UniquePasswordPolicyViewSet
api_urlpatterns = [
("policies/unique_password", UniquePasswordPolicyViewSet),
]

View File

@ -25,7 +25,7 @@ class GoogleWorkspaceGroupClient(
"""Google client for groups"""
connection_type = GoogleWorkspaceProviderGroup
connection_attr = "googleworkspaceprovidergroup_set"
connection_type_query = "group"
can_discover = True
def __init__(self, provider: GoogleWorkspaceProvider) -> None:

View File

@ -20,7 +20,7 @@ class GoogleWorkspaceUserClient(GoogleWorkspaceSyncClient[User, GoogleWorkspaceP
"""Sync authentik users into google workspace"""
connection_type = GoogleWorkspaceProviderUser
connection_attr = "googleworkspaceprovideruser_set"
connection_type_query = "user"
can_discover = True
def __init__(self, provider: GoogleWorkspaceProvider) -> None:

View File

@ -132,11 +132,7 @@ class GoogleWorkspaceProvider(OutgoingSyncProvider, BackchannelProvider):
if type == User:
# Get queryset of all users with consistent ordering
# according to the provider's settings
base = (
User.objects.prefetch_related("googleworkspaceprovideruser_set")
.all()
.exclude_anonymous()
)
base = User.objects.all().exclude_anonymous()
if self.exclude_users_service_account:
base = base.exclude(type=UserTypes.SERVICE_ACCOUNT).exclude(
type=UserTypes.INTERNAL_SERVICE_ACCOUNT
@ -146,11 +142,7 @@ class GoogleWorkspaceProvider(OutgoingSyncProvider, BackchannelProvider):
return base.order_by("pk")
if type == Group:
# Get queryset of all groups with consistent ordering
return (
Group.objects.prefetch_related("googleworkspaceprovidergroup_set")
.all()
.order_by("pk")
)
return Group.objects.all().order_by("pk")
raise ValueError(f"Invalid type {type}")
def google_credentials(self):

View File

@ -29,7 +29,7 @@ class MicrosoftEntraGroupClient(
"""Microsoft client for groups"""
connection_type = MicrosoftEntraProviderGroup
connection_attr = "microsoftentraprovidergroup_set"
connection_type_query = "group"
can_discover = True
def __init__(self, provider: MicrosoftEntraProvider) -> None:

View File

@ -24,7 +24,7 @@ class MicrosoftEntraUserClient(MicrosoftEntraSyncClient[User, MicrosoftEntraProv
"""Sync authentik users into microsoft entra"""
connection_type = MicrosoftEntraProviderUser
connection_attr = "microsoftentraprovideruser_set"
connection_type_query = "user"
can_discover = True
def __init__(self, provider: MicrosoftEntraProvider) -> None:

View File

@ -121,11 +121,7 @@ class MicrosoftEntraProvider(OutgoingSyncProvider, BackchannelProvider):
if type == User:
# Get queryset of all users with consistent ordering
# according to the provider's settings
base = (
User.objects.prefetch_related("microsoftentraprovideruser_set")
.all()
.exclude_anonymous()
)
base = User.objects.all().exclude_anonymous()
if self.exclude_users_service_account:
base = base.exclude(type=UserTypes.SERVICE_ACCOUNT).exclude(
type=UserTypes.INTERNAL_SERVICE_ACCOUNT
@ -135,11 +131,7 @@ class MicrosoftEntraProvider(OutgoingSyncProvider, BackchannelProvider):
return base.order_by("pk")
if type == Group:
# Get queryset of all groups with consistent ordering
return (
Group.objects.prefetch_related("microsoftentraprovidergroup_set")
.all()
.order_by("pk")
)
return Group.objects.all().order_by("pk")
raise ValueError(f"Invalid type {type}")
def microsoft_credentials(self):

View File

@ -14,12 +14,10 @@ CELERY_BEAT_SCHEDULE = {
TENANT_APPS = [
"authentik.enterprise.audit",
"authentik.enterprise.policies.unique_password",
"authentik.enterprise.providers.google_workspace",
"authentik.enterprise.providers.microsoft_entra",
"authentik.enterprise.providers.ssf",
"authentik.enterprise.stages.authenticator_endpoint_gdtc",
"authentik.enterprise.stages.mtls",
"authentik.enterprise.stages.source",
]

View File

@ -1,31 +0,0 @@
"""Mutual TLS Stage API Views"""
from rest_framework.viewsets import ModelViewSet
from authentik.core.api.used_by import UsedByMixin
from authentik.enterprise.api import EnterpriseRequiredMixin
from authentik.enterprise.stages.mtls.models import MutualTLSStage
from authentik.flows.api.stages import StageSerializer
class MutualTLSStageSerializer(EnterpriseRequiredMixin, StageSerializer):
"""MutualTLSStage Serializer"""
class Meta:
model = MutualTLSStage
fields = StageSerializer.Meta.fields + [
"mode",
"certificate_authorities",
"cert_attribute",
"user_attribute",
]
class MutualTLSStageViewSet(UsedByMixin, ModelViewSet):
"""MutualTLSStage Viewset"""
queryset = MutualTLSStage.objects.all()
serializer_class = MutualTLSStageSerializer
filterset_fields = "__all__"
ordering = ["name"]
search_fields = ["name"]

View File

@ -1,12 +0,0 @@
"""authentik stage app config"""
from authentik.enterprise.apps import EnterpriseConfig
class AuthentikEnterpriseStageMTLSConfig(EnterpriseConfig):
"""authentik MTLS stage config"""
name = "authentik.enterprise.stages.mtls"
label = "authentik_stages_mtls"
verbose_name = "authentik Enterprise.Stages.MTLS"
default = True

View File

@ -1,68 +0,0 @@
# Generated by Django 5.1.9 on 2025-05-19 18:29
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("authentik_crypto", "0004_alter_certificatekeypair_name"),
("authentik_flows", "0027_auto_20231028_1424"),
]
operations = [
migrations.CreateModel(
name="MutualTLSStage",
fields=[
(
"stage_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="authentik_flows.stage",
),
),
(
"mode",
models.TextField(choices=[("optional", "Optional"), ("required", "Required")]),
),
(
"cert_attribute",
models.TextField(
choices=[
("subject", "Subject"),
("common_name", "Common Name"),
("email", "Email"),
]
),
),
(
"user_attribute",
models.TextField(choices=[("username", "Username"), ("email", "Email")]),
),
(
"certificate_authorities",
models.ManyToManyField(
blank=True,
default=None,
help_text="Configure certificate authorities to validate the certificate against. This option has a higher priority than the `client_certificate` option on `Brand`.",
to="authentik_crypto.certificatekeypair",
),
),
],
options={
"verbose_name": "Mutual TLS Stage",
"verbose_name_plural": "Mutual TLS Stages",
"permissions": [
("pass_outpost_certificate", "Permissions to pass Certificates for outposts.")
],
},
bases=("authentik_flows.stage",),
),
]

View File

@ -1,71 +0,0 @@
from django.db import models
from django.utils.translation import gettext_lazy as _
from rest_framework.serializers import Serializer
from authentik.crypto.models import CertificateKeyPair
from authentik.flows.models import Stage
from authentik.flows.stage import StageView
class TLSMode(models.TextChoices):
"""Modes the TLS Stage can operate in"""
OPTIONAL = "optional"
REQUIRED = "required"
class CertAttributes(models.TextChoices):
"""Certificate attribute used for user matching"""
SUBJECT = "subject"
COMMON_NAME = "common_name"
EMAIL = "email"
class UserAttributes(models.TextChoices):
"""User attribute for user matching"""
USERNAME = "username"
EMAIL = "email"
class MutualTLSStage(Stage):
"""Authenticate/enroll users using a client-certificate."""
mode = models.TextField(choices=TLSMode.choices)
certificate_authorities = models.ManyToManyField(
CertificateKeyPair,
default=None,
blank=True,
help_text=_(
"Configure certificate authorities to validate the certificate against. "
"This option has a higher priority than the `client_certificate` option on `Brand`."
),
)
cert_attribute = models.TextField(choices=CertAttributes.choices)
user_attribute = models.TextField(choices=UserAttributes.choices)
@property
def view(self) -> type[StageView]:
from authentik.enterprise.stages.mtls.stage import MTLSStageView
return MTLSStageView
@property
def serializer(self) -> type[Serializer]:
from authentik.enterprise.stages.mtls.api import MutualTLSStageSerializer
return MutualTLSStageSerializer
@property
def component(self) -> str:
return "ak-stage-mtls-form"
class Meta:
verbose_name = _("Mutual TLS Stage")
verbose_name_plural = _("Mutual TLS Stages")
permissions = [
("pass_outpost_certificate", _("Permissions to pass Certificates for outposts.")),
]

View File

@ -1,215 +0,0 @@
from binascii import hexlify
from urllib.parse import unquote_plus
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.x509 import Certificate, NameOID, ObjectIdentifier, load_pem_x509_certificate
from django.utils.translation import gettext_lazy as _
from authentik.brands.models import Brand
from authentik.core.models import User
from authentik.crypto.models import CertificateKeyPair
from authentik.enterprise.stages.mtls.models import (
CertAttributes,
MutualTLSStage,
TLSMode,
UserAttributes,
)
from authentik.flows.challenge import AccessDeniedChallenge
from authentik.flows.models import FlowDesignation
from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
from authentik.flows.stage import ChallengeStageView
from authentik.root.middleware import ClientIPMiddleware
from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
# All of these headers must only be accepted from "trusted" reverse proxies
# See internal/web/proxy.go:39
HEADER_PROXY_FORWARDED = "X-Forwarded-Client-Cert"
HEADER_NGINX_FORWARDED = "SSL-Client-Cert"
HEADER_TRAEFIK_FORWARDED = "X-Forwarded-TLS-Client-Cert"
HEADER_OUTPOST_FORWARDED = "X-Authentik-Outpost-Certificate"
PLAN_CONTEXT_CERTIFICATE = "certificate"
class MTLSStageView(ChallengeStageView):
def __parse_single_cert(self, raw: str | None) -> list[Certificate]:
"""Helper to parse a single certificate"""
if not raw:
return []
try:
cert = load_pem_x509_certificate(unquote_plus(raw).encode())
return [cert]
except ValueError as exc:
self.logger.info("Failed to parse certificate", exc=exc)
return []
def _parse_cert_xfcc(self) -> list[Certificate]:
"""Parse certificates in the format given to us in
the format of the authentik router/envoy"""
xfcc_raw = self.request.headers.get(HEADER_PROXY_FORWARDED)
if not xfcc_raw:
return []
certs = []
for r_cert in xfcc_raw.split(","):
el = r_cert.split(";")
raw_cert = {k.split("=")[0]: k.split("=")[1] for k in el}
if "Cert" not in raw_cert:
continue
certs.extend(self.__parse_single_cert(raw_cert["Cert"]))
return certs
def _parse_cert_nginx(self) -> list[Certificate]:
"""Parse certificates in the format nginx-ingress gives to us"""
sslcc_raw = self.request.headers.get(HEADER_NGINX_FORWARDED)
return self.__parse_single_cert(sslcc_raw)
def _parse_cert_traefik(self) -> list[Certificate]:
"""Parse certificates in the format traefik gives to us"""
ftcc_raw = self.request.headers.get(HEADER_TRAEFIK_FORWARDED)
return self.__parse_single_cert(ftcc_raw)
def _parse_cert_outpost(self) -> list[Certificate]:
"""Parse certificates in the format outposts give to us. Also authenticates
the outpost to ensure it has the permission to do so"""
user = ClientIPMiddleware.get_outpost_user(self.request)
if not user:
return []
if not user.has_perm(
"pass_outpost_certificate", self.executor.current_stage
) and not user.has_perm("authentik_stages_mtls.pass_outpost_certificate"):
return []
outpost_raw = self.request.headers.get(HEADER_OUTPOST_FORWARDED)
return self.__parse_single_cert(outpost_raw)
def get_authorities(self) -> list[CertificateKeyPair] | None:
# We can't access `certificate_authorities` on `self.executor.current_stage`, as that would
# load the certificate into the directly referenced foreign key, which we have to pickle
# as part of the flow plan, and cryptography certs can't be pickled
stage: MutualTLSStage = (
MutualTLSStage.objects.filter(pk=self.executor.current_stage.pk)
.prefetch_related("certificate_authorities")
.first()
)
if stage.certificate_authorities.exists():
return stage.certificate_authorities.order_by("name")
brand: Brand = self.request.brand
if brand.client_certificates.exists():
return brand.client_certificates.order_by("name")
return None
def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]):
for _cert in certs:
for ca in authorities:
try:
_cert.verify_directly_issued_by(ca.certificate)
return _cert
except (InvalidSignature, TypeError, ValueError) as exc:
self.logger.warning(
"Discarding cert not issued by authority", cert=_cert, authority=ca, exc=exc
)
continue
return None
def check_if_user(self, cert: Certificate):
stage: MutualTLSStage = self.executor.current_stage
cert_attr = None
user_attr = None
match stage.cert_attribute:
case CertAttributes.SUBJECT:
cert_attr = cert.subject.rfc4514_string()
case CertAttributes.COMMON_NAME:
cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME)
case CertAttributes.EMAIL:
cert_attr = self.get_cert_attribute(cert, NameOID.EMAIL_ADDRESS)
match stage.user_attribute:
case UserAttributes.USERNAME:
user_attr = "username"
case UserAttributes.EMAIL:
user_attr = "email"
if not user_attr or not cert_attr:
return None
return User.objects.filter(**{user_attr: cert_attr}).first()
def _cert_to_dict(self, cert: Certificate) -> dict:
"""Represent a certificate in a dictionary, as certificate objects cannot be pickled"""
return {
"serial_number": str(cert.serial_number),
"subject": cert.subject.rfc4514_string(),
"issuer": cert.issuer.rfc4514_string(),
"fingerprint_sha256": hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8"),
"fingerprint_sha1": hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8"),
}
def auth_user(self, user: User, cert: Certificate):
self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user
self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls")
self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
{"certificate": self._cert_to_dict(cert)}
)
def enroll_prepare_user(self, cert: Certificate):
self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {})
self.executor.plan.context[PLAN_CONTEXT_PROMPT].update(
{
"email": self.get_cert_attribute(cert, NameOID.EMAIL_ADDRESS),
"name": self.get_cert_attribute(cert, NameOID.COMMON_NAME),
}
)
self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
def get_cert_attribute(self, cert: Certificate, oid: ObjectIdentifier) -> str | None:
attr = cert.subject.get_attributes_for_oid(oid)
if len(attr) < 1:
return None
return str(attr[0].value)
def dispatch(self, request, *args, **kwargs):
stage: MutualTLSStage = self.executor.current_stage
certs = [
*self._parse_cert_xfcc(),
*self._parse_cert_nginx(),
*self._parse_cert_traefik(),
*self._parse_cert_outpost(),
]
authorities = self.get_authorities()
if not authorities:
self.logger.warning("No Certificate authority found")
if stage.mode == TLSMode.OPTIONAL:
return self.executor.stage_ok()
if stage.mode == TLSMode.REQUIRED:
return super().dispatch(request, *args, **kwargs)
cert = self.validate_cert(authorities, certs)
if not cert and stage.mode == TLSMode.REQUIRED:
self.logger.warning("Client certificate required but no certificates given")
return super().dispatch(
request,
*args,
error_message=_("Certificate required but no certificate was given."),
**kwargs,
)
if not cert and stage.mode == TLSMode.OPTIONAL:
self.logger.info("No certificate given, continuing")
return self.executor.stage_ok()
existing_user = self.check_if_user(cert)
if self.executor.flow.designation == FlowDesignation.ENROLLMENT:
self.enroll_prepare_user(cert)
elif existing_user:
self.auth_user(existing_user, cert)
else:
return super().dispatch(
request, *args, error_message=_("No user found for certificate."), **kwargs
)
return self.executor.stage_ok()
def get_challenge(self, *args, error_message: str | None = None, **kwargs):
return AccessDeniedChallenge(
data={
"component": "ak-stage-access-denied",
"error_message": str(error_message or "Unknown error"),
}
)

View File

@ -1,31 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIFXDCCA0SgAwIBAgIUBmV7zREyC1SPr72/75/L9zpwV18wDQYJKoZIhvcNAQEL
BQAwRjEaMBgGA1UEAwwRYXV0aGVudGlrIFRlc3QgQ0ExEjAQBgNVBAoMCWF1dGhl
bnRpazEUMBIGA1UECwwLU2VsZi1zaWduZWQwHhcNMjUwNDI3MTgzMDUwWhcNMzUw
MzA3MTgzMDUwWjBGMRowGAYDVQQDDBFhdXRoZW50aWsgVGVzdCBDQTESMBAGA1UE
CgwJYXV0aGVudGlrMRQwEgYDVQQLDAtTZWxmLXNpZ25lZDCCAiIwDQYJKoZIhvcN
AQEBBQADggIPADCCAgoCggIBAMc0NxZj7j1mPu0aRToo8oMPdC3T99xgxnqdr18x
LV4pWyi/YLghgZHqNQY2xNP6JIlSeUZD6KFUYT2sPL4Av/zSg5zO8bl+/lf7ckje
O1/Bt5A8xtL0CpmpMDGiI6ibdDElaywM6AohisbxrV29pygSKGq2wugF/urqGtE+
5z4y5Kt6qMdKkd0iXT+WagbQTIUlykFKgB0+qqTLzDl01lVDa/DoLl8Hqp45mVx2
pqrGsSa3TCErLIv9hUlZklF7A8UV4ZB4JL20UKcP8dKzQClviNie17tpsUpOuy3A
SQ6+guWTHTLJNCSdLn1xIqc5q+f5wd2dIDf8zXCTHj+Xp0bJE3Vgaq5R31K9+b+1
2dDWz1KcNJaLEnw2+b0O8M64wTMLxhqOv7QfLUr6Pmg1ZymghjLcZ6bnU9e31Vza
hlPKhxjqYQUC4Kq+oaYF6qdUeJy+dsYf0iDv5tTC+eReZDWIjxTPrNpwA773ZwT7
WVmL7ULGpuP2g9rNvFBcZiN+i6d7CUoN+jd/iRdo79lrI0dfXiyy4bYgW/2HeZfF
HaOsc1xsoqnJdWbWkX/ooyaCjAfm07kS3HiOzz4q3QW4wgGrwV8lEraLPxYYeOQu
YcGMOM8NfnVkjc8gmyXUxedCje5Vz/Tu5fKrQEInnCmXxVsWbwr/LzEjMKAM/ivY
0TXxAgMBAAGjQjBAMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgGGMB0G
A1UdDgQWBBTa+Ns6QzqlNvnTGszkouQQtZnVJDANBgkqhkiG9w0BAQsFAAOCAgEA
NpJEDMXjuEIzSzafkxSshvjnt5sMYmzmvjNoRlkxgN2YcWvPoxbalGAYzcpyggT2
6xZY8R4tvB1oNTCArqwf860kkofUoJCr88D/pU3Cv4JhjCWs4pmXTsvSqlBSlJbo
+jPBZwbn6it/6jcit6Be3rW2PtHe8tASd9Lf8/2r1ZvupXwPzcR84R4Z10ve2lqV
xxcWlMmBh51CaYI0b1/WTe9Ua+wgkCVkxbf9zNcDQXjxw2ICWK+nR/4ld4nmqVm2
C7nhvXwU8FAHl7ZgR2Z3PLrwPuhd+kd6NXQqNkS9A+n+1vSRLbRjmV8pwIPpdPEq
nslUAGJJBHDUBArxC3gOJSB+WtmaCfzDu2gepMf9Ng1H2ZhwSF/FH3v3fsJqZkzz
NBstT9KuNGQRYiCmAPJaoVAc9BoLa+BFML1govtWtpdmbFk8PZEcuUsP7iAZqFF1
uuldPyZ8huGpQSR6Oq2bILRHowfGY0npTZAyxg0Vs8UMy1HTwNOp9OuRtArMZmsJ
jFIx1QzRf9S1i6bYpOzOudoXj4ARkS1KmVExGjJFcIT0xlFSSERie2fEKSeEYOyG
G+PA2qRt/F51FGOMm1ZscjPXqk2kt3C4BFbz6Vvxsq7D3lmhvFLn4jVA8+OidsM0
YUrVMtWET/RkjEIbADbgRXxNUNo+jtQZDU9C1IiAdfk=
-----END CERTIFICATE-----

View File

@ -1,30 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIFOzCCAyOgAwIBAgIUbnIMy+Ewi5RvK7OBDxWMCk7wi08wDQYJKoZIhvcNAQEL
BQAwRjEaMBgGA1UEAwwRYXV0aGVudGlrIFRlc3QgQ0ExEjAQBgNVBAoMCWF1dGhl
bnRpazEUMBIGA1UECwwLU2VsZi1zaWduZWQwHhcNMjUwNDI3MTgzMTE3WhcNMjYw
NDIzMTgzMTE3WjARMQ8wDQYDVQQDDAZjbGllbnQwggIiMA0GCSqGSIb3DQEBAQUA
A4ICDwAwggIKAoICAQCdV+GEa7+7ito1i/z637OZW+0azv1kuF2aDwSzv+FJd+4L
6hCroRbVYTUFS3I3YwanOOZfau64xH0+pFM5Js8aREG68eqKBayx8vT27hyAOFhd
giEVmSQJfla4ogvPie1rJ0HVOL7CiR72HDPQvz+9k1iDX3xQ/4sdAb3XurN13e+M
Gtavhjiyqxmoo/H4WRd8BhD/BZQFWtaxWODDY8aKk5R7omw6Xf7aRv1BlHdE4Ucy
Wozvpsj2Kz0l61rRUhiMlE0D9dpijgaRYFB+M7R2casH3CdhGQbBHTRiqBkZa6iq
SDkTiTwNJQQJov8yPTsR+9P8OOuV6QN+DGm/FXJJFaPnsHw/HDy7EAbA1PcdbSyK
XvJ8nVjdNhCEGbLGVSwAQLO+78hChVIN5YH+QSrP84YBSxKZYArnf4z2e9drqAN3
KmC26TkaUzkXnndnxOXBEIOSmyCdD4Dutg1XPE/bs8rA6rVGIR3pKXbCr29Z8hZn
Cn9jbxwDwTX865ljR1Oc3dnIeCWa9AS/uHaSMdGlbGbDrt4Bj/nyyfu8xc034K/0
uPh3hF3FLWNAomRVZCvtuh/v7IEIQEgUbvQMWBhZJ8hu3HdtV8V9TIAryVKzEzGy
Q72UHuQyK0njRDTmA/T+jn7P8GWOuf9eNdzd0gH0gcEuhCZFxPPRvUAeDuC7DQID
AQABo1YwVDAdBgNVHSUEFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwFAYDVR0RAQH/
BAowCIIGY2xpZW50MB0GA1UdDgQWBBQ5KZwTD8+4CqLnbM/cBSXg8XeLXTANBgkq
hkiG9w0BAQsFAAOCAgEABDkb3iyEOl1xKq1wxyRzf2L8qfPXAQw71FxMbgHArM+a
e44wJGO3mZgPH0trOaJ+tuN6erB5YbZfsoX+xFacwskj9pKyb4QJLr/ENmJZRgyL
wp5P6PB6IUJhvryvy/GxrG938YGmFtYQ+ikeJw5PWhB6218C1aZ9hsi94wZ1Zzrc
Ry0q0D4QvIEZ0X2HL1Harc7gerE3VqhgQ7EWyImM+lCRtNDduwDQnZauwhr2r6cW
XG4VTe1RCNsDA0xinXQE2Xf9voCd0Zf6wOOXJseQtrXpf+tG4N13cy5heF5ihed1
hDxSeki0KjTM+18kVVfVm4fzxf1Zg0gm54UlzWceIWh9EtnWMUV08H0D1M9YNmW8
hWTupk7M+jAw8Y+suHOe6/RLi0+fb9NSJpIpq4GqJ5UF2kerXHX0SvuAavoXyB0j
CQrUXkRScEKOO2KAbVExSG56Ff7Ee8cRUAQ6rLC5pQRACq/R0sa6RcUsFPXul3Yv
vbO2rTuArAUPkNVFknwkndheN4lOslRd1If02HunZETmsnal6p+nmuMWt2pQ2fDA
vIguG54FyQ1T1IbF/QhfTEY62CQAebcgutnqqJHt9qe7Jr6ev57hMrJDEjotSzkY
OhOVrcYqgLldr1nBqNVlIK/4VrDaWH8H5dNJ72gA9aMNVH4/bSTJhuO7cJkLnHw=
-----END CERTIFICATE-----

View File

@ -1,213 +0,0 @@
from unittest.mock import MagicMock, patch
from urllib.parse import quote_plus
from django.urls import reverse
from guardian.shortcuts import assign_perm
from authentik.core.models import User
from authentik.core.tests.utils import create_test_brand, create_test_flow, create_test_user
from authentik.crypto.models import CertificateKeyPair
from authentik.enterprise.stages.mtls.models import (
CertAttributes,
MutualTLSStage,
TLSMode,
UserAttributes,
)
from authentik.enterprise.stages.mtls.stage import PLAN_CONTEXT_CERTIFICATE
from authentik.flows.models import FlowDesignation, FlowStageBinding
from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
from authentik.flows.tests import FlowTestCase
from authentik.lib.generators import generate_id
from authentik.lib.tests.utils import load_fixture
from authentik.outposts.models import Outpost, OutpostType
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
class MTLSStageTests(FlowTestCase):
def setUp(self):
super().setUp()
self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
self.ca = CertificateKeyPair.objects.create(
name=generate_id(),
certificate_data=load_fixture("fixtures/ca.pem"),
)
self.stage = MutualTLSStage.objects.create(
name=generate_id(),
mode=TLSMode.REQUIRED,
cert_attribute=CertAttributes.COMMON_NAME,
user_attribute=UserAttributes.USERNAME,
)
self.stage.certificate_authorities.add(self.ca)
self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
self.client_cert = load_fixture("fixtures/cert_client.pem")
# User matching the certificate
User.objects.filter(username="client").delete()
self.cert_user = create_test_user(username="client")
def test_parse_xfcc(self):
"""Test authentik Proxy/Envoy's XFCC format"""
with self.assertFlowFinishes() as plan:
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"},
)
self.assertEqual(res.status_code, 200)
self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
def test_parse_nginx(self):
"""Test nginx's format"""
with self.assertFlowFinishes() as plan:
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
headers={"SSL-Client-Cert": quote_plus(self.client_cert)},
)
self.assertEqual(res.status_code, 200)
self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
def test_parse_traefik(self):
"""Test traefik's format"""
with self.assertFlowFinishes() as plan:
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
headers={"X-Forwarded-TLS-Client-Cert": quote_plus(self.client_cert)},
)
self.assertEqual(res.status_code, 200)
self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
def test_parse_outpost_object(self):
"""Test outposts's format"""
outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
assign_perm("pass_outpost_certificate", outpost.user, self.stage)
with patch(
"authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
MagicMock(return_value=outpost.user),
):
with self.assertFlowFinishes() as plan:
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
)
self.assertEqual(res.status_code, 200)
self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
def test_parse_outpost_global(self):
"""Test outposts's format"""
outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
assign_perm("authentik_stages_mtls.pass_outpost_certificate", outpost.user)
with patch(
"authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
MagicMock(return_value=outpost.user),
):
with self.assertFlowFinishes() as plan:
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
)
self.assertEqual(res.status_code, 200)
self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
def test_parse_outpost_no_perm(self):
"""Test outposts's format"""
outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
with patch(
"authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
MagicMock(return_value=outpost.user),
):
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
)
self.assertEqual(res.status_code, 200)
self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
def test_auth_no_user(self):
"""Test auth with no user"""
User.objects.filter(username="client").delete()
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
headers={"X-Forwarded-TLS-Client-Cert": quote_plus(self.client_cert)},
)
self.assertEqual(res.status_code, 200)
self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
def test_brand_ca(self):
"""Test using a CA from the brand"""
self.stage.certificate_authorities.clear()
brand = create_test_brand()
brand.client_certificates.add(self.ca)
with self.assertFlowFinishes() as plan:
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
headers={"X-Forwarded-TLS-Client-Cert": quote_plus(self.client_cert)},
)
self.assertEqual(res.status_code, 200)
self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
def test_no_ca_optional(self):
"""Test using no CA Set"""
self.stage.mode = TLSMode.OPTIONAL
self.stage.certificate_authorities.clear()
self.stage.save()
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
headers={"X-Forwarded-TLS-Client-Cert": quote_plus(self.client_cert)},
)
self.assertEqual(res.status_code, 200)
self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
def test_no_ca_required(self):
"""Test using no CA Set"""
self.stage.certificate_authorities.clear()
self.stage.save()
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
headers={"X-Forwarded-TLS-Client-Cert": quote_plus(self.client_cert)},
)
self.assertEqual(res.status_code, 200)
self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
def test_no_cert_optional(self):
"""Test using no cert Set"""
self.stage.mode = TLSMode.OPTIONAL
self.stage.save()
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
)
self.assertEqual(res.status_code, 200)
self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
def test_enroll(self):
"""Test Enrollment flow"""
self.flow.designation = FlowDesignation.ENROLLMENT
self.flow.save()
with self.assertFlowFinishes() as plan:
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
headers={"X-Forwarded-TLS-Client-Cert": quote_plus(self.client_cert)},
)
self.assertEqual(res.status_code, 200)
self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"})
self.assertEqual(
plan().context[PLAN_CONTEXT_CERTIFICATE],
{
"fingerprint_sha1": (
"08:d4:a4:79:25:ca:c3:51:28:88:bb:30:c2:96:c3:44:5a:eb:18:07:84:ca:b4:75:27:74:61:19:8a:6a:af:fc"
),
"fingerprint_sha256": (
"08:d4:a4:79:25:ca:c3:51:28:88:bb:30:c2:96:c3:44:5a:eb:18:07:84:ca:b4:75:27:74:61:19:8a:6a:af:fc"
),
"issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA",
"serial_number": "630532384467334865093173111400266136879266564943",
"subject": "CN=client",
},
)

View File

@ -1,5 +0,0 @@
"""API URLs"""
from authentik.enterprise.stages.mtls.api import MutualTLSStageViewSet
api_urlpatterns = [("stages/mtls", MutualTLSStageViewSet)]

View File

@ -8,7 +8,6 @@ from django.test import TestCase
from django.utils.timezone import now
from rest_framework.exceptions import ValidationError
from authentik.core.models import User
from authentik.enterprise.license import LicenseKey
from authentik.enterprise.models import (
THRESHOLD_READ_ONLY_WEEKS,
@ -72,9 +71,9 @@ class TestEnterpriseLicense(TestCase):
)
def test_valid_multiple(self):
"""Check license verification"""
lic = License.objects.create(key=generate_id(), expiry=expiry_valid)
lic = License.objects.create(key=generate_id())
self.assertTrue(lic.status.status().is_valid)
lic2 = License.objects.create(key=generate_id(), expiry=expiry_valid)
lic2 = License.objects.create(key=generate_id())
self.assertTrue(lic2.status.status().is_valid)
total = LicenseKey.get_total()
self.assertEqual(total.internal_users, 200)
@ -233,9 +232,7 @@ class TestEnterpriseLicense(TestCase):
)
def test_expiry_expired(self):
"""Check license verification"""
User.objects.all().delete()
License.objects.all().delete()
License.objects.create(key=generate_id(), expiry=expiry_expired)
License.objects.create(key=generate_id())
self.assertEqual(LicenseKey.get_total().summary().status, LicenseUsageStatus.EXPIRED)
@patch(

View File

@ -57,7 +57,7 @@ class LogEventSerializer(PassiveSerializer):
@contextmanager
def capture_logs(log_default_output=True) -> Generator[list[LogEvent]]:
def capture_logs(log_default_output=True) -> Generator[list[LogEvent], None, None]:
"""Capture log entries created"""
logs = []
cap = LogCapture()

View File

@ -15,7 +15,6 @@
{% endblock %}
<link rel="stylesheet" type="text/css" href="{% static 'dist/sfe/bootstrap.min.css' %}">
<meta name="sentry-trace" content="{{ sentry_trace }}" />
<link rel="prefetch" href="{{ flow_background_url }}" />
{% include "base/header_js.html" %}
<style>
html,
@ -23,7 +22,7 @@
height: 100%;
}
body {
background-image: url("{{ flow_background_url }}");
background-image: url("{{ flow.background_url }}");
background-repeat: no-repeat;
background-size: cover;
}

View File

@ -5,9 +5,9 @@
{% block head_before %}
{{ block.super }}
<link rel="prefetch" href="{{ flow_background_url }}" />
<link rel="prefetch" href="{{ flow.background_url }}" />
{% if flow.compatibility_mode and not inspector %}
<script>ShadyDOM = { force: true };</script>
<script>ShadyDOM = { force: !navigator.webdriver };</script>
{% endif %}
{% include "base/header_js.html" %}
<script>
@ -21,7 +21,7 @@ window.authentik.flow = {
<script src="{% versioned_script 'dist/flow/FlowInterface-%v.js' %}" type="module"></script>
<style>
:root {
--ak-flow-background: url("{{ flow_background_url }}");
--ak-flow-background: url("{{ flow.background_url }}");
}
</style>
{% endblock %}

View File

@ -1,10 +1,7 @@
"""Test helpers"""
from collections.abc import Callable, Generator
from contextlib import contextmanager
from json import loads
from typing import Any
from unittest.mock import MagicMock, patch
from django.http.response import HttpResponse
from django.urls.base import reverse
@ -12,8 +9,6 @@ from rest_framework.test import APITestCase
from authentik.core.models import User
from authentik.flows.models import Flow
from authentik.flows.planner import FlowPlan
from authentik.flows.views.executor import SESSION_KEY_PLAN
class FlowTestCase(APITestCase):
@ -49,12 +44,3 @@ class FlowTestCase(APITestCase):
def assertStageRedirects(self, response: HttpResponse, to: str) -> dict[str, Any]:
"""Wrapper around assertStageResponse that checks for a redirect"""
return self.assertStageResponse(response, component="xak-flow-redirect", to=to)
@contextmanager
def assertFlowFinishes(self) -> Generator[Callable[[], FlowPlan]]:
"""Capture the flow plan before the flow finishes and return it"""
try:
with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()):
yield lambda: self.client.session.get(SESSION_KEY_PLAN)
finally:
pass

View File

@ -69,6 +69,7 @@ SESSION_KEY_APPLICATION_PRE = "authentik/flows/application_pre"
SESSION_KEY_GET = "authentik/flows/get"
SESSION_KEY_POST = "authentik/flows/post"
SESSION_KEY_HISTORY = "authentik/flows/history"
SESSION_KEY_AUTH_STARTED = "authentik/flows/auth_started"
QS_KEY_TOKEN = "flow_token" # nosec
QS_QUERY = "query"
@ -453,6 +454,7 @@ class FlowExecutorView(APIView):
SESSION_KEY_APPLICATION_PRE,
SESSION_KEY_PLAN,
SESSION_KEY_GET,
SESSION_KEY_AUTH_STARTED,
# We might need the initial POST payloads for later requests
# SESSION_KEY_POST,
# We don't delete the history on purpose, as a user might

View File

@ -6,7 +6,8 @@ from django.shortcuts import get_object_or_404
from ua_parser.user_agent_parser import Parse
from authentik.core.views.interface import InterfaceView
from authentik.flows.models import Flow
from authentik.flows.models import Flow, FlowDesignation
from authentik.flows.views.executor import SESSION_KEY_AUTH_STARTED
class FlowInterfaceView(InterfaceView):
@ -15,7 +16,12 @@ class FlowInterfaceView(InterfaceView):
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
flow = get_object_or_404(Flow, slug=self.kwargs.get("flow_slug"))
kwargs["flow"] = flow
kwargs["flow_background_url"] = flow.background_url(self.request)
if (
not self.request.user.is_authenticated
and flow.designation == FlowDesignation.AUTHENTICATION
):
self.request.session[SESSION_KEY_AUTH_STARTED] = True
self.request.session.save()
kwargs["inspector"] = "inspector" in self.request.GET
return super().get_context_data(**kwargs)

View File

@ -363,9 +363,6 @@ def django_db_config(config: ConfigLoader | None = None) -> dict:
pool_options = config.get_dict_from_b64_json("postgresql.pool_options", True)
if not pool_options:
pool_options = True
# FIXME: Temporarily force pool to be deactivated.
# See https://github.com/goauthentik/authentik/issues/14320
pool_options = False
db = {
"default": {

View File

@ -17,7 +17,7 @@ from ldap3.core.exceptions import LDAPException
from redis.exceptions import ConnectionError as RedisConnectionError
from redis.exceptions import RedisError, ResponseError
from rest_framework.exceptions import APIException
from sentry_sdk import HttpTransport, get_current_scope
from sentry_sdk import HttpTransport
from sentry_sdk import init as sentry_sdk_init
from sentry_sdk.api import set_tag
from sentry_sdk.integrations.argv import ArgvIntegration
@ -27,7 +27,6 @@ from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.integrations.socket import SocketIntegration
from sentry_sdk.integrations.stdlib import StdlibIntegration
from sentry_sdk.integrations.threading import ThreadingIntegration
from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, SENTRY_TRACE_HEADER_NAME
from structlog.stdlib import get_logger
from websockets.exceptions import WebSocketException
@ -96,8 +95,6 @@ def traces_sampler(sampling_context: dict) -> float:
return 0
if _type == "websocket":
return 0
if CONFIG.get_bool("debug"):
return 1
return float(CONFIG.get("error_reporting.sample_rate", 0.1))
@ -170,14 +167,3 @@ def before_send(event: dict, hint: dict) -> dict | None:
if settings.DEBUG:
return None
return event
def get_http_meta():
"""Get sentry-related meta key-values"""
scope = get_current_scope()
meta = {
SENTRY_TRACE_HEADER_NAME: scope.get_traceparent() or "",
}
if bag := scope.get_baggage():
meta[BAGGAGE_HEADER_NAME] = bag.serialize()
return meta

View File

@ -59,7 +59,7 @@ class PropertyMappingManager:
request: HttpRequest | None,
return_mapping: bool = False,
**kwargs,
) -> Generator[tuple[dict, PropertyMapping]]:
) -> Generator[tuple[dict, PropertyMapping], None]:
"""Iterate over all mappings that were pre-compiled and
execute all of them with the given context"""
if not self.__has_compiled:

View File

@ -23,6 +23,7 @@ if TYPE_CHECKING:
class Direction(StrEnum):
add = "add"
remove = "remove"
@ -36,16 +37,13 @@ SAFE_METHODS = [
class BaseOutgoingSyncClient[
TModel: "Model",
TConnection: "Model",
TSchema: dict,
TProvider: "OutgoingSyncProvider",
TModel: "Model", TConnection: "Model", TSchema: dict, TProvider: "OutgoingSyncProvider"
]:
"""Basic Outgoing sync client Client"""
provider: TProvider
connection_type: type[TConnection]
connection_attr: str
connection_type_query: str
mapper: PropertyMappingManager
can_discover = False
@ -65,7 +63,9 @@ class BaseOutgoingSyncClient[
def write(self, obj: TModel) -> tuple[TConnection, bool]:
"""Write object to destination. Uses self.create and self.update, but
can be overwritten for further logic"""
connection = getattr(obj, self.connection_attr).filter(provider=self.provider).first()
connection = self.connection_type.objects.filter(
provider=self.provider, **{self.connection_type_query: obj}
).first()
try:
if not connection:
connection = self.create(obj)

View File

@ -494,88 +494,86 @@ class TestConfig(TestCase):
},
)
# FIXME: Temporarily force pool to be deactivated.
# See https://github.com/goauthentik/authentik/issues/14320
# def test_db_pool(self):
# """Test DB Config with pool"""
# config = ConfigLoader()
# config.set("postgresql.host", "foo")
# config.set("postgresql.name", "foo")
# config.set("postgresql.user", "foo")
# config.set("postgresql.password", "foo")
# config.set("postgresql.port", "foo")
# config.set("postgresql.test.name", "foo")
# config.set("postgresql.use_pool", True)
# conf = django_db_config(config)
# self.assertEqual(
# conf,
# {
# "default": {
# "ENGINE": "authentik.root.db",
# "HOST": "foo",
# "NAME": "foo",
# "OPTIONS": {
# "pool": True,
# "sslcert": None,
# "sslkey": None,
# "sslmode": None,
# "sslrootcert": None,
# },
# "PASSWORD": "foo",
# "PORT": "foo",
# "TEST": {"NAME": "foo"},
# "USER": "foo",
# "CONN_MAX_AGE": 0,
# "CONN_HEALTH_CHECKS": False,
# "DISABLE_SERVER_SIDE_CURSORS": False,
# }
# },
# )
def test_db_pool(self):
"""Test DB Config with pool"""
config = ConfigLoader()
config.set("postgresql.host", "foo")
config.set("postgresql.name", "foo")
config.set("postgresql.user", "foo")
config.set("postgresql.password", "foo")
config.set("postgresql.port", "foo")
config.set("postgresql.test.name", "foo")
config.set("postgresql.use_pool", True)
conf = django_db_config(config)
self.assertEqual(
conf,
{
"default": {
"ENGINE": "authentik.root.db",
"HOST": "foo",
"NAME": "foo",
"OPTIONS": {
"pool": True,
"sslcert": None,
"sslkey": None,
"sslmode": None,
"sslrootcert": None,
},
"PASSWORD": "foo",
"PORT": "foo",
"TEST": {"NAME": "foo"},
"USER": "foo",
"CONN_MAX_AGE": 0,
"CONN_HEALTH_CHECKS": False,
"DISABLE_SERVER_SIDE_CURSORS": False,
}
},
)
# def test_db_pool_options(self):
# """Test DB Config with pool"""
# config = ConfigLoader()
# config.set("postgresql.host", "foo")
# config.set("postgresql.name", "foo")
# config.set("postgresql.user", "foo")
# config.set("postgresql.password", "foo")
# config.set("postgresql.port", "foo")
# config.set("postgresql.test.name", "foo")
# config.set("postgresql.use_pool", True)
# config.set(
# "postgresql.pool_options",
# base64.b64encode(
# dumps(
# {
# "max_size": 15,
# }
# ).encode()
# ).decode(),
# )
# conf = django_db_config(config)
# self.assertEqual(
# conf,
# {
# "default": {
# "ENGINE": "authentik.root.db",
# "HOST": "foo",
# "NAME": "foo",
# "OPTIONS": {
# "pool": {
# "max_size": 15,
# },
# "sslcert": None,
# "sslkey": None,
# "sslmode": None,
# "sslrootcert": None,
# },
# "PASSWORD": "foo",
# "PORT": "foo",
# "TEST": {"NAME": "foo"},
# "USER": "foo",
# "CONN_MAX_AGE": 0,
# "CONN_HEALTH_CHECKS": False,
# "DISABLE_SERVER_SIDE_CURSORS": False,
# }
# },
# )
def test_db_pool_options(self):
"""Test DB Config with pool"""
config = ConfigLoader()
config.set("postgresql.host", "foo")
config.set("postgresql.name", "foo")
config.set("postgresql.user", "foo")
config.set("postgresql.password", "foo")
config.set("postgresql.port", "foo")
config.set("postgresql.test.name", "foo")
config.set("postgresql.use_pool", True)
config.set(
"postgresql.pool_options",
base64.b64encode(
dumps(
{
"max_size": 15,
}
).encode()
).decode(),
)
conf = django_db_config(config)
self.assertEqual(
conf,
{
"default": {
"ENGINE": "authentik.root.db",
"HOST": "foo",
"NAME": "foo",
"OPTIONS": {
"pool": {
"max_size": 15,
},
"sslcert": None,
"sslkey": None,
"sslmode": None,
"sslrootcert": None,
},
"PASSWORD": "foo",
"PORT": "foo",
"TEST": {"NAME": "foo"},
"USER": "foo",
"CONN_MAX_AGE": 0,
"CONN_HEALTH_CHECKS": False,
"DISABLE_SERVER_SIDE_CURSORS": False,
}
},
)

View File

@ -74,8 +74,6 @@ class OutpostConfig:
kubernetes_ingress_annotations: dict[str, str] = field(default_factory=dict)
kubernetes_ingress_secret_name: str = field(default="authentik-outpost-tls")
kubernetes_ingress_class_name: str | None = field(default=None)
kubernetes_httproute_annotations: dict[str, str] = field(default_factory=dict)
kubernetes_httproute_parent_refs: list[dict[str, str]] = field(default_factory=list)
kubernetes_service_type: str = field(default="ClusterIP")
kubernetes_disabled_components: list[str] = field(default_factory=list)
kubernetes_image_pull_secrets: list[str] = field(default_factory=list)

View File

@ -78,6 +78,7 @@ class PolicyBindingSerializer(ModelSerializer):
"negate",
"enabled",
"order",
"honor_order",
"timeout",
"failure_result",
]
@ -110,7 +111,16 @@ class PolicyBindingFilter(FilterSet):
class Meta:
model = PolicyBinding
fields = ["policy", "policy__isnull", "target", "target_in", "enabled", "order", "timeout"]
fields = [
"policy",
"policy__isnull",
"target",
"target_in",
"enabled",
"order",
"honor_order",
"timeout",
]
class PolicyBindingViewSet(UsedByMixin, ModelViewSet):

View File

@ -1,8 +1,4 @@
"""Authentik policies app config
Every system policy should be its own Django app under the `policies` app.
For example: The 'dummy' policy is available at `authentik.policies.dummy`.
"""
"""authentik policies app config"""
from prometheus_client import Gauge, Histogram
@ -39,3 +35,4 @@ class AuthentikPoliciesConfig(ManagedAppConfig):
label = "authentik_policies"
verbose_name = "authentik Policies"
default = True
mountpoint = "policy/"

View File

@ -0,0 +1,40 @@
# Generated by Django 5.1.8 on 2025-04-17 15:13
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_core", "0047_delete_oldauthenticatedsession"),
("authentik_policies", "0011_policybinding_failure_result_and_more"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AddConstraint(
model_name="policybinding",
constraint=models.CheckConstraint(
condition=models.Q(
models.Q(
("policy_id__isnull", False),
("group_id__isnull", True),
("user_id__isnull", True),
),
models.Q(
("group_id__isnull", False),
("policy_id__isnull", True),
("user_id__isnull", True),
),
models.Q(
("user_id__isnull", False),
("policy_id__isnull", True),
("group_id__isnull", True),
),
_connector="OR",
),
name="authentik_policies_policybinding_only_one_type",
),
),
]

View File

@ -0,0 +1,20 @@
# Generated by Django 5.1.8 on 2025-04-17 15:16
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_policies", "0012_policybinding_authentik_policies_policybinding_only_one_type"),
]
operations = [
migrations.AddField(
model_name="policybinding",
name="honor_order",
field=models.BooleanField(
default=False, help_text="Honor order when evaluating policies."
),
),
]

View File

@ -3,6 +3,7 @@
from uuid import uuid4
from django.db import models
from django.db.models import Q
from django.utils.translation import gettext_lazy as _
from model_utils.managers import InheritanceManager
from rest_framework.serializers import BaseSerializer
@ -52,13 +53,6 @@ class PolicyBindingModel(models.Model):
return ["policy", "user", "group"]
class BoundPolicyQuerySet(models.QuerySet):
"""QuerySet for filtering enabled bindings for a Policy type"""
def for_policy(self, policy: "Policy"):
return self.filter(policy__in=policy._default_manager.all()).filter(enabled=True)
class PolicyBinding(SerializerModel):
"""Relationship between a Policy and a PolicyBindingModel."""
@ -107,6 +101,10 @@ class PolicyBinding(SerializerModel):
)
order = models.IntegerField()
honor_order = models.BooleanField(
default=False,
help_text=_("Honor order when evaluating policies."),
)
def passes(self, request: PolicyRequest) -> PolicyResult:
"""Check if request passes this PolicyBinding, check policy, group or user"""
@ -155,9 +153,6 @@ class PolicyBinding(SerializerModel):
return f"Binding - #{self.order} to {suffix}"
return ""
objects = models.Manager()
in_use = BoundPolicyQuerySet.as_manager()
class Meta:
verbose_name = _("Policy Binding")
verbose_name_plural = _("Policy Bindings")
@ -168,6 +163,28 @@ class PolicyBinding(SerializerModel):
models.Index(fields=["user"]),
models.Index(fields=["target"]),
]
constraints = (
models.CheckConstraint(
condition=(
(
Q(policy_id__isnull=False)
& Q(group_id__isnull=True)
& Q(user_id__isnull=True)
)
| (
Q(group_id__isnull=False)
& Q(policy_id__isnull=True)
& Q(user_id__isnull=True)
)
| (
Q(user_id__isnull=False)
& Q(policy_id__isnull=True)
& Q(group_id__isnull=True)
)
),
name="%(app_label)s_%(class)s_only_one_type",
),
)
class Policy(SerializerModel, CreatedUpdatedModel):

View File

@ -2,6 +2,4 @@
from authentik.policies.password.api import PasswordPolicyViewSet
api_urlpatterns = [
("policies/password", PasswordPolicyViewSet),
]
api_urlpatterns = [("policies/password", PasswordPolicyViewSet)]

View File

@ -0,0 +1,89 @@
{% extends 'login/base_full.html' %}
{% load static %}
{% load i18n %}
{% block head %}
{{ block.super }}
<script>
let redirecting = false;
const checkAuth = async () => {
if (redirecting) return true;
const url = "{{ check_auth_url }}";
console.debug("authentik/policies/buffer: Checking authentication...");
try {
const result = await fetch(url, {
method: "HEAD",
});
if (result.status >= 400) {
return false
}
console.debug("authentik/policies/buffer: Continuing");
redirecting = true;
if ("{{ auth_req_method }}" === "post") {
document.querySelector("form").submit();
} else {
window.location.assign("{{ continue_url|escapejs }}");
}
} catch {
return false;
}
};
let timeout = 100;
let offset = 20;
let attempt = 0;
const main = async () => {
attempt += 1;
await checkAuth();
console.debug(`authentik/policies/buffer: Waiting ${timeout}ms...`);
setTimeout(main, timeout);
timeout += (offset * attempt);
if (timeout >= 2000) {
timeout = 2000;
}
}
document.addEventListener("visibilitychange", async () => {
if (document.hidden) return;
console.debug("authentik/policies/buffer: Checking authentication on tab activate...");
await checkAuth();
});
main();
</script>
{% endblock %}
{% block title %}
{% trans 'Waiting for authentication...' %} - {{ brand.branding_title }}
{% endblock %}
{% block card_title %}
{% trans 'Waiting for authentication...' %}
{% endblock %}
{% block card %}
<form class="pf-c-form" method="{{ auth_req_method }}" action="{{ continue_url }}">
{% if auth_req_method == "post" %}
{% for key, value in auth_req_body.items %}
<input type="hidden" name="{{ key }}" value="{{ value }}" />
{% endfor %}
{% endif %}
<div class="pf-c-empty-state">
<div class="pf-c-empty-state__content">
<div class="pf-c-empty-state__icon">
<span class="pf-c-spinner pf-m-xl" role="progressbar">
<span class="pf-c-spinner__clipper"></span>
<span class="pf-c-spinner__lead-ball"></span>
<span class="pf-c-spinner__tail-ball"></span>
</span>
</div>
<h1 class="pf-c-title pf-m-lg">
{% trans "You're already authenticating in another tab. This page will refresh once authentication is completed." %}
</h1>
</div>
</div>
<div class="pf-c-form__group pf-m-action">
<a href="{{ auth_req_url }}" class="pf-c-button pf-m-primary pf-m-block">
{% trans "Authenticate in this tab" %}
</a>
</div>
</form>
{% endblock %}

View File

@ -0,0 +1,121 @@
from django.contrib.auth.models import AnonymousUser
from django.contrib.sessions.middleware import SessionMiddleware
from django.http import HttpResponse
from django.test import RequestFactory, TestCase
from django.urls import reverse
from authentik.core.models import Application, Provider
from authentik.core.tests.utils import create_test_flow, create_test_user
from authentik.flows.models import FlowDesignation
from authentik.flows.planner import FlowPlan
from authentik.flows.views.executor import SESSION_KEY_PLAN
from authentik.lib.generators import generate_id
from authentik.lib.tests.utils import dummy_get_response
from authentik.policies.views import (
QS_BUFFER_ID,
SESSION_KEY_BUFFER,
BufferedPolicyAccessView,
BufferView,
PolicyAccessView,
)
class TestPolicyViews(TestCase):
"""Test PolicyAccessView"""
def setUp(self):
super().setUp()
self.factory = RequestFactory()
self.user = create_test_user()
def test_pav(self):
"""Test simple policy access view"""
provider = Provider.objects.create(
name=generate_id(),
)
app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
class TestView(PolicyAccessView):
def resolve_provider_application(self):
self.provider = provider
self.application = app
def get(self, *args, **kwargs):
return HttpResponse("foo")
req = self.factory.get("/")
req.user = self.user
res = TestView.as_view()(req)
self.assertEqual(res.status_code, 200)
self.assertEqual(res.content, b"foo")
def test_pav_buffer(self):
"""Test simple policy access view"""
provider = Provider.objects.create(
name=generate_id(),
)
app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
flow = create_test_flow(FlowDesignation.AUTHENTICATION)
class TestView(BufferedPolicyAccessView):
def resolve_provider_application(self):
self.provider = provider
self.application = app
def get(self, *args, **kwargs):
return HttpResponse("foo")
req = self.factory.get("/")
req.user = AnonymousUser()
middleware = SessionMiddleware(dummy_get_response)
middleware.process_request(req)
req.session[SESSION_KEY_PLAN] = FlowPlan(flow.pk)
req.session.save()
res = TestView.as_view()(req)
self.assertEqual(res.status_code, 302)
self.assertTrue(res.url.startswith(reverse("authentik_policies:buffer")))
def test_pav_buffer_skip(self):
"""Test simple policy access view (skip buffer)"""
provider = Provider.objects.create(
name=generate_id(),
)
app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
flow = create_test_flow(FlowDesignation.AUTHENTICATION)
class TestView(BufferedPolicyAccessView):
def resolve_provider_application(self):
self.provider = provider
self.application = app
def get(self, *args, **kwargs):
return HttpResponse("foo")
req = self.factory.get("/?skip_buffer=true")
req.user = AnonymousUser()
middleware = SessionMiddleware(dummy_get_response)
middleware.process_request(req)
req.session[SESSION_KEY_PLAN] = FlowPlan(flow.pk)
req.session.save()
res = TestView.as_view()(req)
self.assertEqual(res.status_code, 302)
self.assertTrue(res.url.startswith(reverse("authentik_flows:default-authentication")))
def test_buffer(self):
"""Test buffer view"""
uid = generate_id()
req = self.factory.get(f"/?{QS_BUFFER_ID}={uid}")
req.user = AnonymousUser()
middleware = SessionMiddleware(dummy_get_response)
middleware.process_request(req)
ts = generate_id()
req.session[SESSION_KEY_BUFFER % uid] = {
"method": "get",
"body": {},
"url": f"/{ts}",
}
req.session.save()
res = BufferView.as_view()(req)
self.assertEqual(res.status_code, 200)
self.assertIn(ts, res.render().content.decode())

View File

@ -1,7 +1,14 @@
"""API URLs"""
from django.urls import path
from authentik.policies.api.bindings import PolicyBindingViewSet
from authentik.policies.api.policies import PolicyViewSet
from authentik.policies.views import BufferView
urlpatterns = [
path("buffer", BufferView.as_view(), name="buffer"),
]
api_urlpatterns = [
("policies/all", PolicyViewSet),

View File

@ -1,23 +1,37 @@
"""authentik access helper classes"""
from typing import Any
from uuid import uuid4
from django.contrib import messages
from django.contrib.auth.mixins import AccessMixin
from django.contrib.auth.views import redirect_to_login
from django.http import HttpRequest, HttpResponse
from django.http import HttpRequest, HttpResponse, QueryDict
from django.shortcuts import redirect
from django.urls import reverse
from django.utils.http import urlencode
from django.utils.translation import gettext as _
from django.views.generic.base import View
from django.views.generic.base import TemplateView, View
from structlog.stdlib import get_logger
from authentik.core.models import Application, Provider, User
from authentik.flows.views.executor import SESSION_KEY_APPLICATION_PRE, SESSION_KEY_POST
from authentik.flows.models import Flow, FlowDesignation
from authentik.flows.planner import FlowPlan
from authentik.flows.views.executor import (
SESSION_KEY_APPLICATION_PRE,
SESSION_KEY_AUTH_STARTED,
SESSION_KEY_PLAN,
SESSION_KEY_POST,
)
from authentik.lib.sentry import SentryIgnoredException
from authentik.policies.denied import AccessDeniedResponse
from authentik.policies.engine import PolicyEngine
from authentik.policies.types import PolicyRequest, PolicyResult
LOGGER = get_logger()
QS_BUFFER_ID = "af_bf_id"
QS_SKIP_BUFFER = "skip_buffer"
SESSION_KEY_BUFFER = "authentik/policies/pav_buffer/%s"
class RequestValidationError(SentryIgnoredException):
@ -125,3 +139,65 @@ class PolicyAccessView(AccessMixin, View):
for message in result.messages:
messages.error(self.request, _(message))
return result
def url_with_qs(url: str, **kwargs):
"""Update/set querystring of `url` with the parameters in `kwargs`. Original query string
parameters are retained"""
if "?" not in url:
return url + f"?{urlencode(kwargs)}"
url, _, qs = url.partition("?")
qs = QueryDict(qs, mutable=True)
qs.update(kwargs)
return url + f"?{urlencode(qs.items())}"
class BufferView(TemplateView):
"""Buffer view"""
template_name = "policies/buffer.html"
def get_context_data(self, **kwargs):
buf_id = self.request.GET.get(QS_BUFFER_ID)
buffer: dict = self.request.session.get(SESSION_KEY_BUFFER % buf_id)
kwargs["auth_req_method"] = buffer["method"]
kwargs["auth_req_body"] = buffer["body"]
kwargs["auth_req_url"] = url_with_qs(buffer["url"], **{QS_SKIP_BUFFER: True})
kwargs["check_auth_url"] = reverse("authentik_api:user-me")
kwargs["continue_url"] = url_with_qs(buffer["url"], **{QS_BUFFER_ID: buf_id})
return super().get_context_data(**kwargs)
class BufferedPolicyAccessView(PolicyAccessView):
"""PolicyAccessView which buffers access requests in case the user is not logged in"""
def handle_no_permission(self):
plan: FlowPlan | None = self.request.session.get(SESSION_KEY_PLAN)
authenticating = self.request.session.get(SESSION_KEY_AUTH_STARTED)
if plan:
flow = Flow.objects.filter(pk=plan.flow_pk).first()
if not flow or flow.designation != FlowDesignation.AUTHENTICATION:
LOGGER.debug("Not buffering request, no flow or flow not for authentication")
return super().handle_no_permission()
if not plan and authenticating is None:
LOGGER.debug("Not buffering request, no flow plan active")
return super().handle_no_permission()
if self.request.GET.get(QS_SKIP_BUFFER):
LOGGER.debug("Not buffering request, explicit skip")
return super().handle_no_permission()
buffer_id = str(uuid4())
LOGGER.debug("Buffering access request", bf_id=buffer_id)
self.request.session[SESSION_KEY_BUFFER % buffer_id] = {
"body": self.request.POST,
"url": self.request.build_absolute_uri(self.request.get_full_path()),
"method": self.request.method.lower(),
}
return redirect(
url_with_qs(reverse("authentik_policies:buffer"), **{QS_BUFFER_ID: buffer_id})
)
def dispatch(self, request, *args, **kwargs):
response = super().dispatch(request, *args, **kwargs)
if QS_BUFFER_ID in self.request.GET:
self.request.session.pop(SESSION_KEY_BUFFER % self.request.GET[QS_BUFFER_ID], None)
return response

View File

@ -30,7 +30,7 @@ from authentik.flows.stage import StageView
from authentik.lib.utils.time import timedelta_from_string
from authentik.lib.views import bad_request_message
from authentik.policies.types import PolicyRequest
from authentik.policies.views import PolicyAccessView, RequestValidationError
from authentik.policies.views import BufferedPolicyAccessView, RequestValidationError
from authentik.providers.oauth2.constants import (
PKCE_METHOD_PLAIN,
PKCE_METHOD_S256,
@ -326,7 +326,7 @@ class OAuthAuthorizationParams:
return code
class AuthorizationFlowInitView(PolicyAccessView):
class AuthorizationFlowInitView(BufferedPolicyAccessView):
"""OAuth2 Flow initializer, checks access to application and starts flow"""
params: OAuthAuthorizationParams

View File

@ -1,234 +0,0 @@
from dataclasses import asdict, dataclass, field
from typing import TYPE_CHECKING
from urllib.parse import urlparse
from dacite.core import from_dict
from kubernetes.client import ApiextensionsV1Api, CustomObjectsApi, V1ObjectMeta
from authentik.outposts.controllers.base import FIELD_MANAGER
from authentik.outposts.controllers.k8s.base import KubernetesObjectReconciler
from authentik.outposts.controllers.k8s.triggers import NeedsUpdate
from authentik.outposts.controllers.kubernetes import KubernetesController
from authentik.providers.proxy.models import ProxyMode, ProxyProvider
if TYPE_CHECKING:
from authentik.outposts.controllers.kubernetes import KubernetesController
@dataclass(slots=True)
class RouteBackendRef:
name: str
port: int
@dataclass(slots=True)
class RouteSpecParentRefs:
name: str
sectionName: str | None = None
port: int | None = None
namespace: str | None = None
kind: str = "Gateway"
group: str = "gateway.networking.k8s.io"
@dataclass(slots=True)
class HTTPRouteSpecRuleMatchPath:
type: str
value: str
@dataclass(slots=True)
class HTTPRouteSpecRuleMatchHeader:
name: str
value: str
type: str = "Exact"
@dataclass(slots=True)
class HTTPRouteSpecRuleMatch:
path: HTTPRouteSpecRuleMatchPath
headers: list[HTTPRouteSpecRuleMatchHeader]
@dataclass(slots=True)
class HTTPRouteSpecRule:
backendRefs: list[RouteBackendRef]
matches: list[HTTPRouteSpecRuleMatch]
@dataclass(slots=True)
class HTTPRouteSpec:
parentRefs: list[RouteSpecParentRefs]
hostnames: list[str]
rules: list[HTTPRouteSpecRule]
@dataclass(slots=True)
class HTTPRouteMetadata:
name: str
namespace: str
annotations: dict = field(default_factory=dict)
labels: dict = field(default_factory=dict)
@dataclass(slots=True)
class HTTPRoute:
apiVersion: str
kind: str
metadata: HTTPRouteMetadata
spec: HTTPRouteSpec
class HTTPRouteReconciler(KubernetesObjectReconciler):
"""Kubernetes Gateway API HTTPRoute Reconciler"""
def __init__(self, controller: "KubernetesController") -> None:
super().__init__(controller)
self.api_ex = ApiextensionsV1Api(controller.client)
self.api = CustomObjectsApi(controller.client)
self.crd_group = "gateway.networking.k8s.io"
self.crd_version = "v1"
self.crd_plural = "httproutes"
@staticmethod
def reconciler_name() -> str:
return "httproute"
@property
def noop(self) -> bool:
if not self.crd_exists():
self.logger.debug("CRD doesn't exist")
return True
if not self.controller.outpost.config.kubernetes_httproute_parent_refs:
self.logger.debug("HTTPRoute parentRefs not set.")
return True
return False
def crd_exists(self) -> bool:
"""Check if the Gateway API resources exists"""
return bool(
len(
self.api_ex.list_custom_resource_definition(
field_selector=f"metadata.name={self.crd_plural}.{self.crd_group}"
).items
)
)
def reconcile(self, current: HTTPRoute, reference: HTTPRoute):
super().reconcile(current, reference)
if current.metadata.annotations != reference.metadata.annotations:
raise NeedsUpdate()
if current.spec.parentRefs != reference.spec.parentRefs:
raise NeedsUpdate()
if current.spec.hostnames != reference.spec.hostnames:
raise NeedsUpdate()
if current.spec.rules != reference.spec.rules:
raise NeedsUpdate()
def get_object_meta(self, **kwargs) -> V1ObjectMeta:
return super().get_object_meta(
**kwargs,
)
def get_reference_object(self) -> HTTPRoute:
hostnames = []
rules = []
for proxy_provider in ProxyProvider.objects.filter(outpost__in=[self.controller.outpost]):
proxy_provider: ProxyProvider
external_host_name = urlparse(proxy_provider.external_host)
if proxy_provider.mode in [ProxyMode.FORWARD_SINGLE, ProxyMode.FORWARD_DOMAIN]:
rule = HTTPRouteSpecRule(
backendRefs=[RouteBackendRef(name=self.name, port=9000)],
matches=[
HTTPRouteSpecRuleMatch(
headers=[
HTTPRouteSpecRuleMatchHeader(
name="Host",
value=external_host_name.hostname,
)
],
path=HTTPRouteSpecRuleMatchPath(
type="PathPrefix", value="/outpost.goauthentik.io"
),
)
],
)
else:
rule = HTTPRouteSpecRule(
backendRefs=[RouteBackendRef(name=self.name, port=9000)],
matches=[
HTTPRouteSpecRuleMatch(
headers=[
HTTPRouteSpecRuleMatchHeader(
name="Host",
value=external_host_name.hostname,
)
],
path=HTTPRouteSpecRuleMatchPath(type="PathPrefix", value="/"),
)
],
)
hostnames.append(external_host_name.hostname)
rules.append(rule)
return HTTPRoute(
apiVersion=f"{self.crd_group}/{self.crd_version}",
kind="HTTPRoute",
metadata=HTTPRouteMetadata(
name=self.name,
namespace=self.namespace,
annotations=self.controller.outpost.config.kubernetes_httproute_annotations,
labels=self.get_object_meta().labels,
),
spec=HTTPRouteSpec(
parentRefs=[
from_dict(RouteSpecParentRefs, spec)
for spec in self.controller.outpost.config.kubernetes_httproute_parent_refs
],
hostnames=hostnames,
rules=rules,
),
)
def create(self, reference: HTTPRoute):
return self.api.create_namespaced_custom_object(
group=self.crd_group,
version=self.crd_version,
plural=self.crd_plural,
namespace=self.namespace,
body=asdict(reference),
field_manager=FIELD_MANAGER,
)
def delete(self, reference: HTTPRoute):
return self.api.delete_namespaced_custom_object(
group=self.crd_group,
version=self.crd_version,
plural=self.crd_plural,
namespace=self.namespace,
name=self.name,
)
def retrieve(self) -> HTTPRoute:
return from_dict(
HTTPRoute,
self.api.get_namespaced_custom_object(
group=self.crd_group,
version=self.crd_version,
plural=self.crd_plural,
namespace=self.namespace,
name=self.name,
),
)
def update(self, current: HTTPRoute, reference: HTTPRoute):
return self.api.patch_namespaced_custom_object(
group=self.crd_group,
version=self.crd_version,
plural=self.crd_plural,
namespace=self.namespace,
name=self.name,
body=asdict(reference),
field_manager=FIELD_MANAGER,
)

View File

@ -3,7 +3,6 @@
from authentik.outposts.controllers.base import DeploymentPort
from authentik.outposts.controllers.kubernetes import KubernetesController
from authentik.outposts.models import KubernetesServiceConnection, Outpost
from authentik.providers.proxy.controllers.k8s.httproute import HTTPRouteReconciler
from authentik.providers.proxy.controllers.k8s.ingress import IngressReconciler
from authentik.providers.proxy.controllers.k8s.traefik import TraefikMiddlewareReconciler
@ -19,10 +18,8 @@ class ProxyKubernetesController(KubernetesController):
DeploymentPort(9443, "https", "tcp"),
]
self.reconcilers[IngressReconciler.reconciler_name()] = IngressReconciler
self.reconcilers[HTTPRouteReconciler.reconciler_name()] = HTTPRouteReconciler
self.reconcilers[TraefikMiddlewareReconciler.reconciler_name()] = (
TraefikMiddlewareReconciler
)
self.reconcile_order.append(IngressReconciler.reconciler_name())
self.reconcile_order.append(HTTPRouteReconciler.reconciler_name())
self.reconcile_order.append(TraefikMiddlewareReconciler.reconciler_name())

View File

@ -18,11 +18,11 @@ from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, FlowPlanner
from authentik.flows.stage import RedirectStage
from authentik.lib.utils.time import timedelta_from_string
from authentik.policies.engine import PolicyEngine
from authentik.policies.views import PolicyAccessView
from authentik.policies.views import BufferedPolicyAccessView
from authentik.providers.rac.models import ConnectionToken, Endpoint, RACProvider
class RACStartView(PolicyAccessView):
class RACStartView(BufferedPolicyAccessView):
"""Start a RAC connection by checking access and creating a connection token"""
endpoint: Endpoint

View File

@ -15,7 +15,7 @@ from authentik.flows.models import in_memory_stage
from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO, FlowPlanner
from authentik.flows.views.executor import SESSION_KEY_POST
from authentik.lib.views import bad_request_message
from authentik.policies.views import PolicyAccessView
from authentik.policies.views import BufferedPolicyAccessView
from authentik.providers.saml.exceptions import CannotHandleAssertion
from authentik.providers.saml.models import SAMLBindings, SAMLProvider
from authentik.providers.saml.processors.authn_request_parser import AuthNRequestParser
@ -35,7 +35,7 @@ from authentik.stages.consent.stage import (
LOGGER = get_logger()
class SAMLSSOView(PolicyAccessView):
class SAMLSSOView(BufferedPolicyAccessView):
"""SAML SSO Base View, which plans a flow and injects our final stage.
Calls get/post handler."""
@ -83,7 +83,7 @@ class SAMLSSOView(PolicyAccessView):
def post(self, request: HttpRequest, application_slug: str) -> HttpResponse:
"""GET and POST use the same handler, but we can't
override .dispatch easily because PolicyAccessView's dispatch"""
override .dispatch easily because BufferedPolicyAccessView's dispatch"""
return self.get(request, application_slug)

View File

@ -34,7 +34,7 @@ class SCIMGroupClient(SCIMClient[Group, SCIMProviderGroup, SCIMGroupSchema]):
"""SCIM client for groups"""
connection_type = SCIMProviderGroup
connection_attr = "scimprovidergroup_set"
connection_type_query = "group"
mapper: PropertyMappingManager
def __init__(self, provider: SCIMProvider):
@ -199,7 +199,7 @@ class SCIMGroupClient(SCIMClient[Group, SCIMProviderGroup, SCIMGroupSchema]):
chunk_size = len(ops)
if len(ops) < 1:
return
for chunk in batched(ops, chunk_size, strict=False):
for chunk in batched(ops, chunk_size):
req = PatchRequest(Operations=list(chunk))
self._request(
"PATCH",

View File

@ -18,7 +18,7 @@ class SCIMUserClient(SCIMClient[User, SCIMProviderUser, SCIMUserSchema]):
"""SCIM client for users"""
connection_type = SCIMProviderUser
connection_attr = "scimprovideruser_set"
connection_type_query = "user"
mapper: PropertyMappingManager
def __init__(self, provider: SCIMProvider):

View File

@ -116,7 +116,7 @@ class SCIMProvider(OutgoingSyncProvider, BackchannelProvider):
if type == User:
# Get queryset of all users with consistent ordering
# according to the provider's settings
base = User.objects.prefetch_related("scimprovideruser_set").all().exclude_anonymous()
base = User.objects.all().exclude_anonymous()
if self.exclude_users_service_account:
base = base.exclude(type=UserTypes.SERVICE_ACCOUNT).exclude(
type=UserTypes.INTERNAL_SERVICE_ACCOUNT
@ -126,7 +126,7 @@ class SCIMProvider(OutgoingSyncProvider, BackchannelProvider):
return base.order_by("pk")
if type == Group:
# Get queryset of all groups with consistent ordering
return Group.objects.prefetch_related("scimprovidergroup_set").all().order_by("pk")
return Group.objects.all().order_by("pk")
raise ValueError(f"Invalid type {type}")
@property

View File

@ -99,7 +99,6 @@ class RBACPermissionViewSet(ReadOnlyModelViewSet):
filterset_class = PermissionFilter
permission_classes = [IsAuthenticated]
search_fields = [
"name",
"codename",
"content_type__model",
"content_type__app_label",

View File

@ -317,7 +317,7 @@ class KerberosSource(Source):
usage="accept", name=name, store=self.get_gssapi_store()
)
except gssapi.exceptions.GSSError as exc:
LOGGER.warning("GSSAPI credentials failure", exc=exc)
LOGGER.warn("GSSAPI credentials failure", exc=exc)
return None

Some files were not shown because too many files have changed in this diff Show More