Compare commits

..

1 Commits

Author SHA1 Message Date
e86a5cf5a7 retry
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
2025-01-21 16:27:54 +01:00
436 changed files with 12072 additions and 32455 deletions

View File

@ -1,16 +1,16 @@
[bumpversion]
current_version = 2025.2.0-rc1
current_version = 2024.12.2
tag = True
commit = True
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(?:-(?P<rc_t>[a-zA-Z-]+)(?P<rc_n>[1-9]\\d*))?
serialize =
serialize =
{major}.{minor}.{patch}-{rc_t}{rc_n}
{major}.{minor}.{patch}
message = release: {new_version}
tag_name = version/{new_version}
[bumpversion:part:rc_t]
values =
values =
rc
final
optional_value = final

View File

@ -9,9 +9,6 @@ inputs:
image-arch:
required: false
description: "Docker image arch"
release:
required: true
description: "True if this is a release build, false if this is a dev/PR build"
outputs:
shouldPush:
@ -47,9 +44,6 @@ outputs:
imageMainName:
description: "Docker image main name"
value: ${{ steps.ev.outputs.imageMainName }}
imageBuildArgs:
description: "Docker image build args"
value: ${{ steps.ev.outputs.imageBuildArgs }}
runs:
using: "composite"
@ -60,8 +54,6 @@ runs:
env:
IMAGE_NAME: ${{ inputs.image-name }}
IMAGE_ARCH: ${{ inputs.image-arch }}
RELEASE: ${{ inputs.release }}
PR_HEAD_SHA: ${{ github.event.pull_request.head.sha }}
REF: ${{ github.ref }}
run: |
python3 ${{ github.action_path }}/push_vars.py

View File

@ -80,13 +80,6 @@ if should_push:
cache_to = f"type=registry,ref={get_attest_image_names(image_tags)}:{_cache_tag},mode=max"
image_build_args = []
if os.getenv("RELEASE", "false").lower() == "true":
image_build_args = [f"VERSION={os.getenv('REF')}"]
else:
image_build_args = [f"GIT_BUILD_HASH={sha}"]
image_build_args = "\n".join(image_build_args)
with open(os.environ["GITHUB_OUTPUT"], "a+", encoding="utf-8") as _output:
print(f"shouldPush={str(should_push).lower()}", file=_output)
print(f"sha={sha}", file=_output)
@ -98,4 +91,3 @@ with open(os.environ["GITHUB_OUTPUT"], "a+", encoding="utf-8") as _output:
print(f"imageMainTag={image_main_tag}", file=_output)
print(f"imageMainName={image_tags[0]}", file=_output)
print(f"cacheTo={cache_to}", file=_output)
print(f"imageBuildArgs={image_build_args}", file=_output)

View File

@ -40,7 +40,7 @@ jobs:
attestations: write
steps:
- uses: actions/checkout@v4
- uses: docker/setup-qemu-action@v3.4.0
- uses: docker/setup-qemu-action@v3.3.0
- uses: docker/setup-buildx-action@v3
- name: prepare variables
uses: ./.github/actions/docker-push-variables
@ -50,7 +50,6 @@ jobs:
with:
image-name: ${{ inputs.image_name }}
image-arch: ${{ inputs.image_arch }}
release: ${{ inputs.release }}
- name: Login to Docker Hub
if: ${{ inputs.registry_dockerhub }}
uses: docker/login-action@v3
@ -77,19 +76,18 @@ jobs:
id: push
with:
context: .
push: ${{ steps.ev.outputs.shouldPush == 'true' }}
push: true
secrets: |
GEOIPUPDATE_ACCOUNT_ID=${{ secrets.GEOIPUPDATE_ACCOUNT_ID }}
GEOIPUPDATE_LICENSE_KEY=${{ secrets.GEOIPUPDATE_LICENSE_KEY }}
build-args: |
${{ steps.ev.outputs.imageBuildArgs }}
VERSION=${{ github.ref }}
tags: ${{ steps.ev.outputs.imageTags }}
platforms: linux/${{ inputs.image_arch }}
cache-from: type=registry,ref=${{ steps.ev.outputs.attestImageNames }}:buildcache-${{ inputs.image_arch }}
cache-to: ${{ steps.ev.outputs.cacheTo }}
- uses: actions/attest-build-provenance@v2
id: attest
if: ${{ steps.ev.outputs.shouldPush == 'true' }}
with:
subject-name: ${{ steps.ev.outputs.attestImageNames }}
subject-digest: ${{ steps.push.outputs.digest }}

View File

@ -46,7 +46,6 @@ jobs:
- build-server-arm64
outputs:
tags: ${{ steps.ev.outputs.imageTagsJSON }}
shouldPush: ${{ steps.ev.outputs.shouldPush }}
steps:
- uses: actions/checkout@v4
- name: prepare variables
@ -58,7 +57,6 @@ jobs:
image-name: ${{ inputs.image_name }}
merge-server:
runs-on: ubuntu-latest
if: ${{ needs.get-tags.outputs.shouldPush == 'true' }}
needs:
- get-tags
- build-server-amd64

View File

@ -1,28 +0,0 @@
---
name: authentik-ci-main-daily
on:
workflow_dispatch:
schedule:
# Every night at 3am
- cron: "0 3 * * *"
jobs:
test-container:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
version:
- docs
- version-2024-12
- version-2024-10
steps:
- uses: actions/checkout@v4
- run: |
current="$(pwd)"
dir="/tmp/authentik/${{ matrix.version }}"
mkdir -p $dir
cd $dir
wget https://${{ matrix.version }}.goauthentik.io/docker-compose.yml
${current}/scripts/test_docker.sh

View File

@ -43,26 +43,15 @@ jobs:
uses: ./.github/actions/setup
- name: run migrations
run: poetry run python -m lifecycle.migrate
test-make-seed:
runs-on: ubuntu-latest
steps:
- id: seed
run: |
echo "seed=$(printf "%d\n" "0x$(openssl rand -hex 4)")" >> "$GITHUB_OUTPUT"
outputs:
seed: ${{ steps.seed.outputs.seed }}
test-migrations-from-stable:
name: test-migrations-from-stable - PostgreSQL ${{ matrix.psql }} - Run ${{ matrix.run_id }}/5
name: test-migrations-from-stable - PostgreSQL ${{ matrix.psql }}
runs-on: ubuntu-latest
timeout-minutes: 20
needs: test-make-seed
strategy:
fail-fast: false
matrix:
psql:
- 15-alpine
- 16-alpine
run_id: [1, 2, 3, 4, 5]
steps:
- uses: actions/checkout@v4
with:
@ -104,23 +93,18 @@ jobs:
env:
# Test in the main database that we just migrated from the previous stable version
AUTHENTIK_POSTGRESQL__TEST__NAME: authentik
CI_TEST_SEED: ${{ needs.test-make-seed.outputs.seed }}
CI_RUN_ID: ${{ matrix.run_id }}
CI_TOTAL_RUNS: "5"
run: |
poetry run make ci-test
poetry run make test
test-unittest:
name: test-unittest - PostgreSQL ${{ matrix.psql }} - Run ${{ matrix.run_id }}/5
name: test-unittest - PostgreSQL ${{ matrix.psql }}
runs-on: ubuntu-latest
timeout-minutes: 20
needs: test-make-seed
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
psql:
- 15-alpine
- 16-alpine
run_id: [1, 2, 3, 4, 5]
steps:
- uses: actions/checkout@v4
- name: Setup authentik env
@ -128,12 +112,9 @@ jobs:
with:
postgresql_version: ${{ matrix.psql }}
- name: run unittest
env:
CI_TEST_SEED: ${{ needs.test-make-seed.outputs.seed }}
CI_RUN_ID: ${{ matrix.run_id }}
CI_TOTAL_RUNS: "5"
run: |
poetry run make ci-test
poetry run make test
poetry run coverage xml
- if: ${{ always() }}
uses: codecov/codecov-action@v5
with:

View File

@ -82,7 +82,7 @@ jobs:
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Set up QEMU
uses: docker/setup-qemu-action@v3.4.0
uses: docker/setup-qemu-action@v3.3.0
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: prepare variables

View File

@ -9,17 +9,9 @@ jobs:
build-server:
uses: ./.github/workflows/_reusable-docker-build.yaml
secrets: inherit
permissions:
# Needed to upload container images to ghcr.io
packages: write
# Needed for attestation
id-token: write
attestations: write
with:
image_name: ghcr.io/goauthentik/server,beryju/authentik
release: true
registry_dockerhub: true
registry_ghcr: true
build-outpost:
runs-on: ubuntu-latest
permissions:
@ -42,7 +34,7 @@ jobs:
with:
go-version-file: "go.mod"
- name: Set up QEMU
uses: docker/setup-qemu-action@v3.4.0
uses: docker/setup-qemu-action@v3.3.0
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: prepare variables

View File

@ -14,7 +14,16 @@ jobs:
- uses: actions/checkout@v4
- name: Pre-release test
run: |
make test-docker
echo "PG_PASS=$(openssl rand 32 | base64 -w 0)" >> .env
echo "AUTHENTIK_SECRET_KEY=$(openssl rand 32 | base64 -w 0)" >> .env
docker buildx install
mkdir -p ./gen-ts-api
docker build -t testing:latest .
echo "AUTHENTIK_IMAGE=testing" >> .env
echo "AUTHENTIK_TAG=latest" >> .env
docker compose up --no-start
docker compose start postgresql redis
docker compose run -u root server test-all
- id: generate_token
uses: tibdex/github-app-token@v2
with:

View File

@ -1,8 +1,8 @@
name: "authentik-repo-stale"
name: 'authentik-repo-stale'
on:
schedule:
- cron: "30 1 * * *"
- cron: '30 1 * * *'
workflow_dispatch:
permissions:
@ -25,7 +25,7 @@ jobs:
days-before-stale: 60
days-before-close: 7
exempt-issue-labels: pinned,security,pr_wanted,enhancement,bug/confirmed,enhancement/confirmed,question,status/reviewing
stale-issue-label: status/stale
stale-issue-label: wontfix
stale-issue-message: >
This issue has been automatically marked as stale because it has not had
recent activity. It will be closed if no further activity occurs. Thank you

3
.gitignore vendored
View File

@ -209,6 +209,3 @@ source_docs/
### Golang ###
/vendor/
### Docker ###
docker-compose.override.yml

View File

@ -2,7 +2,6 @@
"recommendations": [
"bashmish.es6-string-css",
"bpruitt-goddard.mermaid-markdown-syntax-highlighting",
"charliermarsh.ruff",
"dbaeumer.vscode-eslint",
"EditorConfig.EditorConfig",
"esbenp.prettier-vscode",
@ -11,12 +10,12 @@
"Gruntfuggly.todo-tree",
"mechatroner.rainbow-csv",
"ms-python.black-formatter",
"ms-python.black-formatter",
"ms-python.debugpy",
"charliermarsh.ruff",
"ms-python.python",
"ms-python.vscode-pylance",
"ms-python.black-formatter",
"redhat.vscode-yaml",
"Tobermory.es6-string-html",
"unifiedjs.vscode-mdx",
"unifiedjs.vscode-mdx"
]
}

66
.vscode/launch.json vendored
View File

@ -2,76 +2,26 @@
"version": "0.2.0",
"configurations": [
{
"name": "Debug: Attach Server Core",
"type": "debugpy",
"name": "Python: PDB attach Server",
"type": "python",
"request": "attach",
"connect": {
"host": "localhost",
"port": 9901
"port": 6800
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": "."
}
],
"justMyCode": true,
"django": true
},
{
"name": "Debug: Attach Worker",
"type": "debugpy",
"name": "Python: PDB attach Worker",
"type": "python",
"request": "attach",
"connect": {
"host": "localhost",
"port": 9901
"port": 6900
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": "."
}
],
"justMyCode": true,
"django": true
},
{
"name": "Debug: Start Server Router",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/server",
"cwd": "${workspaceFolder}"
},
{
"name": "Debug: Start LDAP Outpost",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/ldap",
"cwd": "${workspaceFolder}"
},
{
"name": "Debug: Start Proxy Outpost",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/proxy",
"cwd": "${workspaceFolder}"
},
{
"name": "Debug: Start RAC Outpost",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/rac",
"cwd": "${workspaceFolder}"
},
{
"name": "Debug: Start Radius Outpost",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/radius",
"cwd": "${workspaceFolder}"
}
]
}

View File

@ -94,7 +94,7 @@ RUN --mount=type=secret,id=GEOIPUPDATE_ACCOUNT_ID \
/bin/sh -c "/usr/bin/entry.sh || echo 'Failed to get GeoIP database, disabling'; exit 0"
# Stage 5: Python dependencies
FROM ghcr.io/goauthentik/fips-python:3.12.8-slim-bookworm-fips AS python-deps
FROM ghcr.io/goauthentik/fips-python:3.12.7-slim-bookworm-fips AS python-deps
ARG TARGETARCH
ARG TARGETVARIANT
@ -132,14 +132,13 @@ RUN --mount=type=bind,target=./pyproject.toml,src=./pyproject.toml \
. "$HOME/.cargo/env" && \
python -m venv /ak-root/venv/ && \
bash -c "source ${VENV_PATH}/bin/activate && \
pip3 install --upgrade pip poetry && \
pip3 install --upgrade pip && \
pip3 install poetry && \
poetry config --local installer.no-binary cryptography,xmlsec,lxml,python-kadmin-rs && \
poetry install --only=main --no-ansi --no-interaction --no-root && \
pip uninstall cryptography -y && \
poetry install --only=main --no-ansi --no-interaction --no-root"
# Stage 6: Run
FROM ghcr.io/goauthentik/fips-python:3.12.8-slim-bookworm-fips AS final-image
FROM ghcr.io/goauthentik/fips-python:3.12.7-slim-bookworm-fips AS final-image
ARG VERSION
ARG GIT_BUILD_HASH
@ -151,39 +150,34 @@ LABEL org.opencontainers.image.source=https://github.com/goauthentik/authentik
LABEL org.opencontainers.image.version=${VERSION}
LABEL org.opencontainers.image.revision=${GIT_BUILD_HASH}
WORKDIR /
# We cannot cache this layer otherwise we'll end up with a bigger image
RUN apt-get update && \
apt-get upgrade -y && \
# Required for runtime
apt-get install -y --no-install-recommends libpq5 libmaxminddb0 ca-certificates libkrb5-3 libkadm5clnt-mit12 libkdb5-10 libltdl7 libxslt1.1 && \
# Required for bootstrap & healtcheck
apt-get install -y --no-install-recommends runit && \
pip3 install --no-cache-dir --upgrade pip && \
apt-get clean && \
rm -rf /tmp/* /var/lib/apt/lists/* /var/tmp/ && \
adduser --system --no-create-home --uid 1000 --group --home /authentik authentik && \
adduser --system --no-create-home --uid 1000 --group --home /ak-root authentik && \
mkdir -p /certs /media /blueprints && \
mkdir -p /authentik/.ssh && \
mkdir -p /ak-root && \
chown authentik:authentik /certs /media /authentik/.ssh /ak-root
mkdir -p /ak-root/authentik/.ssh && \
chown authentik:authentik /certs /media /ak-root/authentik/.ssh /ak-root
COPY ./authentik/ /authentik
COPY ./pyproject.toml /
COPY ./poetry.lock /
COPY ./schemas /schemas
COPY ./locale /locale
COPY ./tests /tests
COPY ./manage.py /
COPY ./authentik/ /ak-root/authentik
COPY ./pyproject.toml /ak-root
COPY ./poetry.lock /ak-root
COPY ./schemas /ak-root/schemas
COPY ./locale /ak-root/locale
COPY ./tests /ak-root/tests
COPY ./manage.py /ak-root
COPY ./blueprints /blueprints
COPY ./lifecycle/ /lifecycle
COPY ./lifecycle/ /ak-root/lifecycle
COPY ./authentik/sources/kerberos/krb5.conf /etc/krb5.conf
COPY --from=go-builder /go/authentik /bin/authentik
COPY --from=python-deps /ak-root/venv /ak-root/venv
COPY --from=web-builder /work/web/dist/ /web/dist/
COPY --from=web-builder /work/web/authentik/ /web/authentik/
COPY --from=website-builder /work/website/build/ /website/help/
COPY --from=web-builder /work/web/dist/ /ak-root/web/dist/
COPY --from=web-builder /work/web/authentik/ /ak-root/web/authentik/
COPY --from=website-builder /work/website/build/ /ak-root/website/help/
COPY --from=geoip /usr/share/GeoIP /geoip
USER 1000
@ -191,11 +185,13 @@ USER 1000
ENV TMPDIR=/dev/shm/ \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PATH="/ak-root/venv/bin:/lifecycle:$PATH" \
PATH="/ak-root/venv/bin:/ak-root/lifecycle:$PATH" \
VENV_PATH="/ak-root/venv" \
POETRY_VIRTUALENVS_CREATE=false \
GOFIPS=1
HEALTHCHECK --interval=30s --timeout=30s --start-period=60s --retries=3 CMD [ "ak", "healthcheck" ]
WORKDIR /ak-root
ENTRYPOINT [ "dumb-init", "--", "ak" ]

View File

@ -6,8 +6,6 @@ UID = $(shell id -u)
GID = $(shell id -g)
NPM_VERSION = $(shell python -m scripts.npm_version)
PY_SOURCES = authentik tests scripts lifecycle .github
GO_SOURCES = cmd internal
WEB_SOURCES = web/src web/packages
DOCKER_IMAGE ?= "authentik:test"
GEN_API_TS = "gen-ts-api"
@ -22,11 +20,10 @@ CODESPELL_ARGS = -D - -D .github/codespell-dictionary.txt \
-I .github/codespell-words.txt \
-S 'web/src/locales/**' \
-S 'website/docs/developer-docs/api/reference/**' \
-S '**/node_modules/**' \
-S '**/dist/**' \
$(PY_SOURCES) \
$(GO_SOURCES) \
$(WEB_SOURCES) \
authentik \
internal \
cmd \
web/src \
website/src \
website/blog \
website/docs \
@ -48,6 +45,15 @@ help: ## Show this help
go-test:
go test -timeout 0 -v -race -cover ./...
test-docker: ## Run all tests in a docker-compose
echo "PG_PASS=$(shell openssl rand 32 | base64 -w 0)" >> .env
echo "AUTHENTIK_SECRET_KEY=$(shell openssl rand 32 | base64 -w 0)" >> .env
docker compose pull -q
docker compose up --no-start
docker compose start postgresql redis
docker compose run -u root server test-all
rm -f .env
test: ## Run the server tests and produce a coverage report (locally)
coverage run manage.py test --keepdb authentik
coverage html
@ -146,7 +152,7 @@ gen-client-ts: gen-clean-ts ## Build and install the authentik API for Typescri
docker run \
--rm -v ${PWD}:/local \
--user ${UID}:${GID} \
docker.io/openapitools/openapi-generator-cli:v7.11.0 generate \
docker.io/openapitools/openapi-generator-cli:v6.5.0 generate \
-i /local/schema.yml \
-g typescript-fetch \
-o /local/${GEN_API_TS} \
@ -257,9 +263,6 @@ docker: ## Build a docker image of the current source tree
mkdir -p ${GEN_API_TS}
DOCKER_BUILDKIT=1 docker build . --progress plain --tag ${DOCKER_IMAGE}
test-docker:
BUILD=true ./scripts/test_docker.sh
#########################
## CI
#########################
@ -284,8 +287,3 @@ ci-bandit: ci--meta-debug
ci-pending-migrations: ci--meta-debug
ak makemigrations --check
ci-test: ci--meta-debug
coverage run manage.py test --keepdb --randomly-seed ${CI_TEST_SEED} authentik
coverage report
coverage xml

View File

@ -2,7 +2,7 @@
from os import environ
__version__ = "2025.2.0"
__version__ = "2024.12.2"
ENV_GIT_HASH_KEY = "GIT_BUILD_HASH"

View File

@ -51,7 +51,6 @@ from authentik.enterprise.providers.microsoft_entra.models import (
MicrosoftEntraProviderUser,
)
from authentik.enterprise.providers.rac.models import ConnectionToken
from authentik.enterprise.providers.ssf.models import StreamEvent
from authentik.enterprise.stages.authenticator_endpoint_gdtc.models import (
EndpointDevice,
EndpointDeviceConnection,
@ -132,7 +131,6 @@ def excluded_models() -> list[type[Model]]:
EndpointDevice,
EndpointDeviceConnection,
DeviceToken,
StreamEvent,
)

View File

@ -3,7 +3,6 @@
from django.utils.translation import gettext_lazy as _
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiParameter, extend_schema
from guardian.shortcuts import get_objects_for_user
from rest_framework.fields import (
BooleanField,
CharField,
@ -17,6 +16,7 @@ from rest_framework.viewsets import ViewSet
from authentik.core.api.utils import MetaNameSerializer
from authentik.enterprise.stages.authenticator_endpoint_gdtc.models import EndpointDevice
from authentik.rbac.decorators import permission_required
from authentik.stages.authenticator import device_classes, devices_for_user
from authentik.stages.authenticator.models import Device
from authentik.stages.authenticator_webauthn.models import WebAuthnDevice
@ -73,9 +73,7 @@ class AdminDeviceViewSet(ViewSet):
def get_devices(self, **kwargs):
"""Get all devices in all child classes"""
for model in device_classes():
device_set = get_objects_for_user(
self.request.user, f"{model._meta.app_label}.view_{model._meta.model_name}", model
).filter(**kwargs)
device_set = model.objects.filter(**kwargs)
yield from device_set
@extend_schema(
@ -88,6 +86,10 @@ class AdminDeviceViewSet(ViewSet):
],
responses={200: DeviceSerializer(many=True)},
)
@permission_required(
None,
[f"{model._meta.app_label}.view_{model._meta.model_name}" for model in device_classes()],
)
def list(self, request: Request) -> Response:
"""Get all devices for current user"""
kwargs = {}

View File

@ -4,7 +4,6 @@ from json import loads
from django.db.models import Prefetch
from django.http import Http404
from django.utils.translation import gettext as _
from django_filters.filters import CharFilter, ModelMultipleChoiceFilter
from django_filters.filterset import FilterSet
from drf_spectacular.utils import (
@ -82,37 +81,9 @@ class GroupSerializer(ModelSerializer):
if not self.instance or not parent:
return parent
if str(parent.group_uuid) == str(self.instance.group_uuid):
raise ValidationError(_("Cannot set group as parent of itself."))
raise ValidationError("Cannot set group as parent of itself.")
return parent
def validate_is_superuser(self, superuser: bool):
"""Ensure that the user creating this group has permissions to set the superuser flag"""
request: Request = self.context.get("request", None)
if not request:
return superuser
# If we're updating an instance, and the state hasn't changed, we don't need to check perms
if self.instance and superuser == self.instance.is_superuser:
return superuser
user: User = request.user
perm = (
"authentik_core.enable_group_superuser"
if superuser
else "authentik_core.disable_group_superuser"
)
has_perm = user.has_perm(perm)
if self.instance and not has_perm:
has_perm = user.has_perm(perm, self.instance)
if not has_perm:
raise ValidationError(
_(
(
"User does not have permission to set "
"superuser status to {superuser_status}."
).format_map({"superuser_status": superuser})
)
)
return superuser
class Meta:
model = Group
fields = [

View File

@ -85,7 +85,7 @@ class SourceViewSet(
serializer_class = SourceSerializer
lookup_field = "slug"
search_fields = ["slug", "name"]
filterset_fields = ["slug", "name", "managed", "pbm_uuid"]
filterset_fields = ["slug", "name", "managed"]
def get_queryset(self): # pragma: no cover
return Source.objects.select_subclasses()

View File

@ -236,11 +236,9 @@ class UserSerializer(ModelSerializer):
"path",
"type",
"uuid",
"password_change_date",
]
extra_kwargs = {
"name": {"allow_blank": True},
"password_change_date": {"read_only": True},
}

View File

@ -5,7 +5,6 @@ from typing import TextIO
from daphne.management.commands.runserver import Command as RunServer
from daphne.server import Server
from authentik.lib.debug import start_debug_server
from authentik.root.signals import post_startup, pre_startup, startup
@ -14,7 +13,6 @@ class SignalServer(Server):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
start_debug_server()
def ready_callable():
pre_startup.send(sender=self)

View File

@ -9,7 +9,6 @@ from django.db import close_old_connections
from structlog.stdlib import get_logger
from authentik.lib.config import CONFIG
from authentik.lib.debug import start_debug_server
from authentik.root.celery import CELERY_APP
LOGGER = get_logger()
@ -29,7 +28,10 @@ class Command(BaseCommand):
def handle(self, **options):
LOGGER.debug("Celery options", **options)
close_old_connections()
start_debug_server()
if CONFIG.get_bool("remote_debug"):
import debugpy
debugpy.listen(("0.0.0.0", 6900)) # nosec
worker: Worker = CELERY_APP.Worker(
no_color=False,
quiet=True,

View File

@ -1,26 +0,0 @@
# Generated by Django 5.0.11 on 2025-01-30 23:55
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("authentik_core", "0042_authenticatedsession_authentik_c_expires_08251d_idx_and_more"),
]
operations = [
migrations.AlterModelOptions(
name="group",
options={
"permissions": [
("add_user_to_group", "Add user to group"),
("remove_user_from_group", "Remove user from group"),
("enable_group_superuser", "Enable superuser status"),
("disable_group_superuser", "Disable superuser status"),
],
"verbose_name": "Group",
"verbose_name_plural": "Groups",
},
),
]

View File

@ -204,8 +204,6 @@ class Group(SerializerModel, AttributesMixin):
permissions = [
("add_user_to_group", _("Add user to group")),
("remove_user_from_group", _("Remove user from group")),
("enable_group_superuser", _("Enable superuser status")),
("disable_group_superuser", _("Disable superuser status")),
]
def __str__(self):
@ -601,14 +599,6 @@ class Application(SerializerModel, PolicyBindingModel):
return None
return candidates[-1]
def backchannel_provider_for[T: Provider](self, provider_type: type[T], **kwargs) -> T | None:
"""Get Backchannel provider for a specific type"""
providers = self.backchannel_providers.filter(
**{f"{provider_type._meta.model_name}__isnull": False},
**kwargs,
)
return getattr(providers.first(), provider_type._meta.model_name)
def __str__(self):
return str(self.name)

View File

@ -67,8 +67,6 @@ def clean_expired_models(self: SystemTask):
raise ImproperlyConfigured(
"Invalid session_storage setting, allowed values are db and cache"
)
if CONFIG.get("session_storage", "cache") == "db":
DBSessionStore.clear_expired()
LOGGER.debug("Expired sessions", model=AuthenticatedSession, amount=amount)
messages.append(f"Expired {amount} {AuthenticatedSession._meta.verbose_name_plural}")

View File

@ -4,7 +4,7 @@ from django.urls.base import reverse
from guardian.shortcuts import assign_perm
from rest_framework.test import APITestCase
from authentik.core.models import Group
from authentik.core.models import Group, User
from authentik.core.tests.utils import create_test_admin_user, create_test_user
from authentik.lib.generators import generate_id
@ -14,7 +14,7 @@ class TestGroupsAPI(APITestCase):
def setUp(self) -> None:
self.login_user = create_test_user()
self.user = create_test_user()
self.user = User.objects.create(username="test-user")
def test_list_with_users(self):
"""Test listing with users"""
@ -109,57 +109,3 @@ class TestGroupsAPI(APITestCase):
},
)
self.assertEqual(res.status_code, 400)
def test_superuser_no_perm(self):
"""Test creating a superuser group without permission"""
assign_perm("authentik_core.add_group", self.login_user)
self.client.force_login(self.login_user)
res = self.client.post(
reverse("authentik_api:group-list"),
data={"name": generate_id(), "is_superuser": True},
)
self.assertEqual(res.status_code, 400)
self.assertJSONEqual(
res.content,
{"is_superuser": ["User does not have permission to set superuser status to True."]},
)
def test_superuser_update_no_perm(self):
"""Test updating a superuser group without permission"""
group = Group.objects.create(name=generate_id(), is_superuser=True)
assign_perm("view_group", self.login_user, group)
assign_perm("change_group", self.login_user, group)
self.client.force_login(self.login_user)
res = self.client.patch(
reverse("authentik_api:group-detail", kwargs={"pk": group.pk}),
data={"is_superuser": False},
)
self.assertEqual(res.status_code, 400)
self.assertJSONEqual(
res.content,
{"is_superuser": ["User does not have permission to set superuser status to False."]},
)
def test_superuser_update_no_change(self):
"""Test updating a superuser group without permission
and without changing the superuser status"""
group = Group.objects.create(name=generate_id(), is_superuser=True)
assign_perm("view_group", self.login_user, group)
assign_perm("change_group", self.login_user, group)
self.client.force_login(self.login_user)
res = self.client.patch(
reverse("authentik_api:group-detail", kwargs={"pk": group.pk}),
data={"name": generate_id(), "is_superuser": True},
)
self.assertEqual(res.status_code, 200)
def test_superuser_create(self):
"""Test creating a superuser group with permission"""
assign_perm("authentik_core.add_group", self.login_user)
assign_perm("authentik_core.enable_group_superuser", self.login_user)
self.client.force_login(self.login_user)
res = self.client.post(
reverse("authentik_api:group-list"),
data={"name": generate_id(), "is_superuser": True},
)
self.assertEqual(res.status_code, 201)

View File

@ -97,8 +97,6 @@ class EnterpriseAuditMiddleware(AuditMiddleware):
thread_kwargs: dict | None = None,
**_,
):
if not self.enabled:
return super().post_save_handler(request, sender, instance, created, thread_kwargs, **_)
if not should_log_model(instance):
return None
thread_kwargs = {}
@ -124,8 +122,6 @@ class EnterpriseAuditMiddleware(AuditMiddleware):
):
thread_kwargs = {}
m2m_field = None
if not self.enabled:
return super().m2m_changed_handler(request, sender, instance, action, thread_kwargs)
# For the audit log we don't care about `pre_` or `post_` so we trim that part off
_, _, action_direction = action.partition("_")
# resolve the "through" model to an actual field

View File

@ -1,64 +0,0 @@
"""SSF Provider API Views"""
from django.urls import reverse
from rest_framework.fields import SerializerMethodField
from rest_framework.request import Request
from rest_framework.viewsets import ModelViewSet
from authentik.core.api.providers import ProviderSerializer
from authentik.core.api.tokens import TokenSerializer
from authentik.core.api.used_by import UsedByMixin
from authentik.enterprise.api import EnterpriseRequiredMixin
from authentik.enterprise.providers.ssf.models import SSFProvider
class SSFProviderSerializer(EnterpriseRequiredMixin, ProviderSerializer):
"""SSFProvider Serializer"""
ssf_url = SerializerMethodField()
token_obj = TokenSerializer(source="token", required=False, read_only=True)
def get_ssf_url(self, instance: SSFProvider) -> str | None:
request: Request = self._context.get("request")
if not request:
return None
if not instance.backchannel_application:
return None
return request.build_absolute_uri(
reverse(
"authentik_providers_ssf:configuration",
kwargs={
"application_slug": instance.backchannel_application.slug,
},
)
)
class Meta:
model = SSFProvider
fields = [
"pk",
"name",
"component",
"verbose_name",
"verbose_name_plural",
"meta_model_name",
"signing_key",
"token_obj",
"oidc_auth_providers",
"ssf_url",
"event_retention",
]
extra_kwargs = {}
class SSFProviderViewSet(UsedByMixin, ModelViewSet):
"""SSFProvider Viewset"""
queryset = SSFProvider.objects.all()
serializer_class = SSFProviderSerializer
filterset_fields = {
"application": ["isnull"],
"name": ["iexact"],
}
search_fields = ["name"]
ordering = ["name"]

View File

@ -1,37 +0,0 @@
"""SSF Stream API Views"""
from rest_framework.viewsets import ReadOnlyModelViewSet
from authentik.core.api.utils import ModelSerializer
from authentik.enterprise.providers.ssf.api.providers import SSFProviderSerializer
from authentik.enterprise.providers.ssf.models import Stream
class SSFStreamSerializer(ModelSerializer):
"""SSFStream Serializer"""
provider_obj = SSFProviderSerializer(source="provider", read_only=True)
class Meta:
model = Stream
fields = [
"pk",
"provider",
"provider_obj",
"delivery_method",
"endpoint_url",
"events_requested",
"format",
"aud",
"iss",
]
class SSFStreamViewSet(ReadOnlyModelViewSet):
"""SSFStream Viewset"""
queryset = Stream.objects.all()
serializer_class = SSFStreamSerializer
filterset_fields = ["provider", "endpoint_url", "delivery_method"]
search_fields = ["provider__name", "endpoint_url"]
ordering = ["provider", "uuid"]

View File

@ -1,13 +0,0 @@
"""SSF app config"""
from authentik.enterprise.apps import EnterpriseConfig
class AuthentikEnterpriseProviderSSF(EnterpriseConfig):
"""authentik enterprise ssf app config"""
name = "authentik.enterprise.providers.ssf"
label = "authentik_providers_ssf"
verbose_name = "authentik Enterprise.Providers.SSF"
default = True
mountpoint = ""

View File

@ -1,201 +0,0 @@
# Generated by Django 5.0.11 on 2025-02-05 16:20
import authentik.lib.utils.time
import django.contrib.postgres.fields
import django.db.models.deletion
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("authentik_core", "0042_authenticatedsession_authentik_c_expires_08251d_idx_and_more"),
("authentik_crypto", "0004_alter_certificatekeypair_name"),
("authentik_providers_oauth2", "0027_accesstoken_authentik_p_expires_9f24a5_idx_and_more"),
]
operations = [
migrations.CreateModel(
name="SSFProvider",
fields=[
(
"provider_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="authentik_core.provider",
),
),
(
"event_retention",
models.TextField(
default="days=30",
validators=[authentik.lib.utils.time.timedelta_string_validator],
),
),
(
"oidc_auth_providers",
models.ManyToManyField(
blank=True, default=None, to="authentik_providers_oauth2.oauth2provider"
),
),
(
"signing_key",
models.ForeignKey(
help_text="Key used to sign the SSF Events.",
on_delete=django.db.models.deletion.CASCADE,
to="authentik_crypto.certificatekeypair",
verbose_name="Signing Key",
),
),
(
"token",
models.ForeignKey(
default=None,
null=True,
on_delete=django.db.models.deletion.CASCADE,
to="authentik_core.token",
),
),
],
options={
"verbose_name": "Shared Signals Framework Provider",
"verbose_name_plural": "Shared Signals Framework Providers",
"permissions": [("add_stream", "Add stream to SSF provider")],
},
bases=("authentik_core.provider",),
),
migrations.CreateModel(
name="Stream",
fields=[
(
"uuid",
models.UUIDField(
default=uuid.uuid4, editable=False, primary_key=True, serialize=False
),
),
(
"delivery_method",
models.TextField(
choices=[
(
"https://schemas.openid.net/secevent/risc/delivery-method/push",
"Risc Push",
),
(
"https://schemas.openid.net/secevent/risc/delivery-method/poll",
"Risc Poll",
),
]
),
),
("endpoint_url", models.TextField(null=True)),
(
"events_requested",
django.contrib.postgres.fields.ArrayField(
base_field=models.TextField(
choices=[
(
"https://schemas.openid.net/secevent/caep/event-type/session-revoked",
"Caep Session Revoked",
),
(
"https://schemas.openid.net/secevent/caep/event-type/credential-change",
"Caep Credential Change",
),
(
"https://schemas.openid.net/secevent/ssf/event-type/verification",
"Set Verification",
),
]
),
default=list,
size=None,
),
),
("format", models.TextField()),
(
"aud",
django.contrib.postgres.fields.ArrayField(
base_field=models.TextField(), default=list, size=None
),
),
("iss", models.TextField()),
(
"provider",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="authentik_providers_ssf.ssfprovider",
),
),
],
options={
"verbose_name": "SSF Stream",
"verbose_name_plural": "SSF Streams",
"default_permissions": ["change", "delete", "view"],
},
),
migrations.CreateModel(
name="StreamEvent",
fields=[
("created", models.DateTimeField(auto_now_add=True)),
("last_updated", models.DateTimeField(auto_now=True)),
("expires", models.DateTimeField(default=None, null=True)),
("expiring", models.BooleanField(default=True)),
(
"uuid",
models.UUIDField(
default=uuid.uuid4, editable=False, primary_key=True, serialize=False
),
),
(
"status",
models.TextField(
choices=[
("pending_new", "Pending New"),
("pending_failed", "Pending Failed"),
("sent", "Sent"),
]
),
),
(
"type",
models.TextField(
choices=[
(
"https://schemas.openid.net/secevent/caep/event-type/session-revoked",
"Caep Session Revoked",
),
(
"https://schemas.openid.net/secevent/caep/event-type/credential-change",
"Caep Credential Change",
),
(
"https://schemas.openid.net/secevent/ssf/event-type/verification",
"Set Verification",
),
]
),
),
("payload", models.JSONField(default=dict)),
(
"stream",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="authentik_providers_ssf.stream",
),
),
],
options={
"verbose_name": "SSF Stream Event",
"verbose_name_plural": "SSF Stream Events",
"ordering": ("-created",),
},
),
]

View File

@ -1,178 +0,0 @@
from datetime import datetime
from functools import cached_property
from uuid import uuid4
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
from django.contrib.postgres.fields import ArrayField
from django.db import models
from django.templatetags.static import static
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from jwt import encode
from authentik.core.models import BackchannelProvider, ExpiringModel, Token
from authentik.crypto.models import CertificateKeyPair
from authentik.lib.models import CreatedUpdatedModel
from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator
from authentik.providers.oauth2.models import JWTAlgorithms, OAuth2Provider
class EventTypes(models.TextChoices):
"""SSF Event types supported by authentik"""
CAEP_SESSION_REVOKED = "https://schemas.openid.net/secevent/caep/event-type/session-revoked"
CAEP_CREDENTIAL_CHANGE = "https://schemas.openid.net/secevent/caep/event-type/credential-change"
SET_VERIFICATION = "https://schemas.openid.net/secevent/ssf/event-type/verification"
class DeliveryMethods(models.TextChoices):
"""SSF Delivery methods"""
RISC_PUSH = "https://schemas.openid.net/secevent/risc/delivery-method/push"
RISC_POLL = "https://schemas.openid.net/secevent/risc/delivery-method/poll"
class SSFEventStatus(models.TextChoices):
"""SSF Event status"""
PENDING_NEW = "pending_new"
PENDING_FAILED = "pending_failed"
SENT = "sent"
class SSFProvider(BackchannelProvider):
"""Shared Signals Framework provider to allow applications to
receive user events from authentik."""
signing_key = models.ForeignKey(
CertificateKeyPair,
verbose_name=_("Signing Key"),
on_delete=models.CASCADE,
help_text=_("Key used to sign the SSF Events."),
)
oidc_auth_providers = models.ManyToManyField(OAuth2Provider, blank=True, default=None)
token = models.ForeignKey(Token, on_delete=models.CASCADE, null=True, default=None)
event_retention = models.TextField(
default="days=30",
validators=[timedelta_string_validator],
)
@cached_property
def jwt_key(self) -> tuple[PrivateKeyTypes, str]:
"""Get either the configured certificate or the client secret"""
key: CertificateKeyPair = self.signing_key
private_key = key.private_key
if isinstance(private_key, RSAPrivateKey):
return private_key, JWTAlgorithms.RS256
if isinstance(private_key, EllipticCurvePrivateKey):
return private_key, JWTAlgorithms.ES256
raise ValueError(f"Invalid private key type: {type(private_key)}")
@property
def service_account_identifier(self) -> str:
return f"ak-providers-ssf-{self.pk}"
@property
def serializer(self):
from authentik.enterprise.providers.ssf.api.providers import SSFProviderSerializer
return SSFProviderSerializer
@property
def icon_url(self) -> str | None:
return static("authentik/sources/ssf.svg")
@property
def component(self) -> str:
return "ak-provider-ssf-form"
class Meta:
verbose_name = _("Shared Signals Framework Provider")
verbose_name_plural = _("Shared Signals Framework Providers")
permissions = [
# This overrides the default "add_stream" permission of the Stream object,
# as the user requesting to add a stream must have the permission on the provider
("add_stream", _("Add stream to SSF provider")),
]
class Stream(models.Model):
"""SSF Stream"""
uuid = models.UUIDField(default=uuid4, primary_key=True, editable=False)
provider = models.ForeignKey(SSFProvider, on_delete=models.CASCADE)
delivery_method = models.TextField(choices=DeliveryMethods.choices)
endpoint_url = models.TextField(null=True)
events_requested = ArrayField(models.TextField(choices=EventTypes.choices), default=list)
format = models.TextField()
aud = ArrayField(models.TextField(), default=list)
iss = models.TextField()
class Meta:
verbose_name = _("SSF Stream")
verbose_name_plural = _("SSF Streams")
default_permissions = ["change", "delete", "view"]
def __str__(self) -> str:
return "SSF Stream"
def prepare_event_payload(self, type: EventTypes, event_data: dict, **kwargs) -> dict:
jti = uuid4()
_now = now()
return {
"uuid": jti,
"stream_id": str(self.pk),
"type": type,
"expiring": True,
"status": SSFEventStatus.PENDING_NEW,
"expires": _now + timedelta_from_string(self.provider.event_retention),
"payload": {
"jti": jti.hex,
"aud": self.aud,
"iat": int(datetime.now().timestamp()),
"iss": self.iss,
"events": {type: event_data},
**kwargs,
},
}
def encode(self, data: dict) -> str:
headers = {}
if self.provider.signing_key:
headers["kid"] = self.provider.signing_key.kid
key, alg = self.provider.jwt_key
return encode(data, key, algorithm=alg, headers=headers)
class StreamEvent(CreatedUpdatedModel, ExpiringModel):
"""Single stream event to be sent"""
uuid = models.UUIDField(default=uuid4, primary_key=True, editable=False)
stream = models.ForeignKey(Stream, on_delete=models.CASCADE)
status = models.TextField(choices=SSFEventStatus.choices)
type = models.TextField(choices=EventTypes.choices)
payload = models.JSONField(default=dict)
def expire_action(self, *args, **kwargs):
"""Only allow automatic cleanup of successfully sent event"""
if self.status != SSFEventStatus.SENT:
return
return super().expire_action(*args, **kwargs)
def __str__(self):
return f"Stream event {self.type}"
class Meta:
verbose_name = _("SSF Stream Event")
verbose_name_plural = _("SSF Stream Events")
ordering = ("-created",)

View File

@ -1,193 +0,0 @@
from hashlib import sha256
from django.contrib.auth.signals import user_logged_out
from django.db.models import Model
from django.db.models.signals import post_delete, post_save, pre_delete
from django.dispatch import receiver
from django.http.request import HttpRequest
from guardian.shortcuts import assign_perm
from authentik.core.models import (
USER_PATH_SYSTEM_PREFIX,
AuthenticatedSession,
Token,
TokenIntents,
User,
UserTypes,
)
from authentik.core.signals import password_changed
from authentik.enterprise.providers.ssf.models import (
EventTypes,
SSFProvider,
)
from authentik.enterprise.providers.ssf.tasks import send_ssf_event
from authentik.events.middleware import audit_ignore
from authentik.stages.authenticator.models import Device
from authentik.stages.authenticator_duo.models import DuoDevice
from authentik.stages.authenticator_static.models import StaticDevice
from authentik.stages.authenticator_totp.models import TOTPDevice
from authentik.stages.authenticator_webauthn.models import (
UNKNOWN_DEVICE_TYPE_AAGUID,
WebAuthnDevice,
)
USER_PATH_PROVIDERS_SSF = USER_PATH_SYSTEM_PREFIX + "/providers/ssf"
@receiver(post_save, sender=SSFProvider)
def ssf_providers_post_save(sender: type[Model], instance: SSFProvider, created: bool, **_):
"""Create service account before provider is saved"""
identifier = instance.service_account_identifier
user, _ = User.objects.update_or_create(
username=identifier,
defaults={
"name": f"SSF Provider {instance.name} Service-Account",
"type": UserTypes.INTERNAL_SERVICE_ACCOUNT,
"path": USER_PATH_PROVIDERS_SSF,
},
)
assign_perm("add_stream", user, instance)
token, token_created = Token.objects.update_or_create(
identifier=identifier,
defaults={
"user": user,
"intent": TokenIntents.INTENT_API,
"expiring": False,
"managed": f"goauthentik.io/providers/ssf/{instance.pk}",
},
)
if created or token_created:
with audit_ignore():
instance.token = token
instance.save()
@receiver(user_logged_out)
def ssf_user_logged_out_session_revoked(sender, request: HttpRequest, user: User, **_):
"""Session revoked trigger (user logged out)"""
if not request.session or not request.session.session_key or not user:
return
send_ssf_event(
EventTypes.CAEP_SESSION_REVOKED,
{
"initiating_entity": "user",
},
sub_id={
"format": "complex",
"session": {
"format": "opaque",
"id": sha256(request.session.session_key.encode("ascii")).hexdigest(),
},
"user": {
"format": "email",
"email": user.email,
},
},
request=request,
)
@receiver(pre_delete, sender=AuthenticatedSession)
def ssf_user_session_delete_session_revoked(sender, instance: AuthenticatedSession, **_):
"""Session revoked trigger (users' session has been deleted)
As this signal is also triggered with a regular logout, we can't be sure
if the session has been deleted by an admin or by the user themselves."""
send_ssf_event(
EventTypes.CAEP_SESSION_REVOKED,
{
"initiating_entity": "user",
},
sub_id={
"format": "complex",
"session": {
"format": "opaque",
"id": sha256(instance.session_key.encode("ascii")).hexdigest(),
},
"user": {
"format": "email",
"email": instance.user.email,
},
},
)
@receiver(password_changed)
def ssf_password_changed_cred_change(sender, user: User, password: str | None, **_):
"""Credential change trigger (password changed)"""
send_ssf_event(
EventTypes.CAEP_CREDENTIAL_CHANGE,
{
"credential_type": "password",
"change_type": "revoke" if password is None else "update",
},
sub_id={
"format": "complex",
"user": {
"format": "email",
"email": user.email,
},
},
)
device_type_map = {
StaticDevice: "pin",
TOTPDevice: "pin",
WebAuthnDevice: "fido-u2f",
DuoDevice: "app",
}
@receiver(post_save)
def ssf_device_post_save(sender: type[Model], instance: Device, created: bool, **_):
if not isinstance(instance, Device):
return
if not instance.confirmed:
return
device_type = device_type_map.get(instance.__class__)
data = {
"credential_type": device_type,
"change_type": "create" if created else "update",
"friendly_name": instance.name,
}
if isinstance(instance, WebAuthnDevice) and instance.aaguid != UNKNOWN_DEVICE_TYPE_AAGUID:
data["fido2_aaguid"] = instance.aaguid
send_ssf_event(
EventTypes.CAEP_CREDENTIAL_CHANGE,
data,
sub_id={
"format": "complex",
"user": {
"format": "email",
"email": instance.user.email,
},
},
)
@receiver(post_delete)
def ssf_device_post_delete(sender: type[Model], instance: Device, **_):
if not isinstance(instance, Device):
return
if not instance.confirmed:
return
device_type = device_type_map.get(instance.__class__)
data = {
"credential_type": device_type,
"change_type": "delete",
"friendly_name": instance.name,
}
if isinstance(instance, WebAuthnDevice) and instance.aaguid != UNKNOWN_DEVICE_TYPE_AAGUID:
data["fido2_aaguid"] = instance.aaguid
send_ssf_event(
EventTypes.CAEP_CREDENTIAL_CHANGE,
data,
sub_id={
"format": "complex",
"user": {
"format": "email",
"email": instance.user.email,
},
},
)

View File

@ -1,136 +0,0 @@
from celery import group
from django.http import HttpRequest
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from requests.exceptions import RequestException
from structlog.stdlib import get_logger
from authentik.core.models import User
from authentik.enterprise.providers.ssf.models import (
DeliveryMethods,
EventTypes,
SSFEventStatus,
Stream,
StreamEvent,
)
from authentik.events.logs import LogEvent
from authentik.events.models import TaskStatus
from authentik.events.system_tasks import SystemTask
from authentik.lib.utils.http import get_http_session
from authentik.lib.utils.time import timedelta_from_string
from authentik.policies.engine import PolicyEngine
from authentik.root.celery import CELERY_APP
session = get_http_session()
LOGGER = get_logger()
def send_ssf_event(
event_type: EventTypes,
data: dict,
stream_filter: dict | None = None,
request: HttpRequest | None = None,
**extra_data,
):
"""Wrapper to send an SSF event to multiple streams"""
payload = []
if not stream_filter:
stream_filter = {}
stream_filter["events_requested__contains"] = [event_type]
if request and hasattr(request, "request_id"):
extra_data.setdefault("txn", request.request_id)
for stream in Stream.objects.filter(**stream_filter):
event_data = stream.prepare_event_payload(event_type, data, **extra_data)
payload.append((str(stream.uuid), event_data))
return _send_ssf_event.delay(payload)
def _check_app_access(stream_uuid: str, event_data: dict) -> bool:
"""Check if event is related to user and if so, check
if the user has access to the application"""
stream = Stream.objects.filter(pk=stream_uuid).first()
if not stream:
return False
# `event_data` is a dict version of a StreamEvent
sub_id = event_data.get("payload", {}).get("sub_id", {})
email = sub_id.get("user", {}).get("email", None)
if not email:
return True
user = User.objects.filter(email=email).first()
if not user:
return True
engine = PolicyEngine(stream.provider.backchannel_application, user)
engine.use_cache = False
engine.build()
return engine.passing
@CELERY_APP.task()
def _send_ssf_event(event_data: list[tuple[str, dict]]):
tasks = []
for stream, data in event_data:
if not _check_app_access(stream, data):
continue
event = StreamEvent.objects.create(**data)
tasks.extend(send_single_ssf_event(stream, str(event.uuid)))
main_task = group(*tasks)
main_task()
def send_single_ssf_event(stream_id: str, evt_id: str):
stream = Stream.objects.filter(pk=stream_id).first()
if not stream:
return
event = StreamEvent.objects.filter(pk=evt_id).first()
if not event:
return
if event.status == SSFEventStatus.SENT:
return
if stream.delivery_method == DeliveryMethods.RISC_PUSH:
return [ssf_push_event.si(str(event.pk))]
return []
@CELERY_APP.task(bind=True, base=SystemTask)
def ssf_push_event(self: SystemTask, event_id: str):
self.save_on_success = False
event = StreamEvent.objects.filter(pk=event_id).first()
if not event:
return
self.set_uid(event_id)
if event.status == SSFEventStatus.SENT:
self.set_status(TaskStatus.SUCCESSFUL)
return
try:
response = session.post(
event.stream.endpoint_url,
data=event.stream.encode(event.payload),
headers={"Content-Type": "application/secevent+jwt", "Accept": "application/json"},
)
response.raise_for_status()
event.status = SSFEventStatus.SENT
event.save()
self.set_status(TaskStatus.SUCCESSFUL)
return
except RequestException as exc:
LOGGER.warning("Failed to send SSF event", exc=exc)
self.set_status(TaskStatus.ERROR)
attrs = {}
if exc.response:
attrs["response"] = {
"content": exc.response.text,
"status": exc.response.status_code,
}
self.set_error(
exc,
LogEvent(
_("Failed to send request"),
log_level="warning",
logger=self.__name__,
attributes=attrs,
),
)
# Re-up the expiry of the stream event
event.expires = now() + timedelta_from_string(event.stream.provider.event_retention)
event.status = SSFEventStatus.PENDING_FAILED
event.save()

View File

@ -1,46 +0,0 @@
import json
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import Application
from authentik.core.tests.utils import create_test_cert
from authentik.enterprise.providers.ssf.models import (
SSFProvider,
)
from authentik.lib.generators import generate_id
class TestConfiguration(APITestCase):
def setUp(self):
self.application = Application.objects.create(name=generate_id(), slug=generate_id())
self.provider = SSFProvider.objects.create(
name=generate_id(),
signing_key=create_test_cert(),
backchannel_application=self.application,
)
def test_config_fetch(self):
"""test SSF configuration (unauthenticated)"""
res = self.client.get(
reverse(
"authentik_providers_ssf:configuration",
kwargs={"application_slug": self.application.slug},
),
)
self.assertEqual(res.status_code, 200)
content = json.loads(res.content)
self.assertEqual(content["spec_version"], "1_0-ID2")
def test_config_fetch_authenticated(self):
"""test SSF configuration (authenticated)"""
res = self.client.get(
reverse(
"authentik_providers_ssf:configuration",
kwargs={"application_slug": self.application.slug},
),
HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
)
self.assertEqual(res.status_code, 200)
content = json.loads(res.content)
self.assertEqual(content["spec_version"], "1_0-ID2")

View File

@ -1,51 +0,0 @@
"""JWKS tests"""
import base64
import json
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_der_x509_certificate
from django.test import TestCase
from django.urls.base import reverse
from jwt import PyJWKSet
from authentik.core.models import Application
from authentik.core.tests.utils import create_test_cert
from authentik.enterprise.providers.ssf.models import SSFProvider
from authentik.lib.generators import generate_id
class TestJWKS(TestCase):
"""Test JWKS view"""
def test_rs256(self):
"""Test JWKS request with RS256"""
provider = SSFProvider.objects.create(
name=generate_id(),
signing_key=create_test_cert(),
)
app = Application.objects.create(name=generate_id(), slug=generate_id())
app.backchannel_providers.add(provider)
response = self.client.get(
reverse("authentik_providers_ssf:jwks", kwargs={"application_slug": app.slug})
)
body = json.loads(response.content.decode())
self.assertEqual(len(body["keys"]), 1)
PyJWKSet.from_dict(body)
key = body["keys"][0]
load_der_x509_certificate(base64.b64decode(key["x5c"][0]), default_backend()).public_key()
def test_es256(self):
"""Test JWKS request with ES256"""
provider = SSFProvider.objects.create(
name=generate_id(),
signing_key=create_test_cert(),
)
app = Application.objects.create(name=generate_id(), slug=generate_id())
app.backchannel_providers.add(provider)
response = self.client.get(
reverse("authentik_providers_ssf:jwks", kwargs={"application_slug": app.slug})
)
body = json.loads(response.content.decode())
self.assertEqual(len(body["keys"]), 1)
PyJWKSet.from_dict(body)

View File

@ -1,168 +0,0 @@
from uuid import uuid4
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import Application, Group
from authentik.core.tests.utils import (
create_test_cert,
create_test_user,
)
from authentik.enterprise.providers.ssf.models import (
EventTypes,
SSFEventStatus,
SSFProvider,
Stream,
StreamEvent,
)
from authentik.lib.generators import generate_id
from authentik.policies.models import PolicyBinding
from authentik.stages.authenticator_webauthn.models import WebAuthnDevice
class TestSignals(APITestCase):
"""Test individual SSF Signals"""
def setUp(self):
self.application = Application.objects.create(name=generate_id(), slug=generate_id())
self.provider = SSFProvider.objects.create(
name=generate_id(),
signing_key=create_test_cert(),
backchannel_application=self.application,
)
res = self.client.post(
reverse(
"authentik_providers_ssf:stream",
kwargs={"application_slug": self.application.slug},
),
data={
"iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
"aud": ["https://app.authentik.company"],
"delivery": {
"method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
"endpoint_url": "https://app.authentik.company",
},
"events_requested": [
"https://schemas.openid.net/secevent/caep/event-type/credential-change",
"https://schemas.openid.net/secevent/caep/event-type/session-revoked",
],
"format": "iss_sub",
},
HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
)
self.assertEqual(res.status_code, 201, res.content)
def test_signal_logout(self):
"""Test user logout"""
user = create_test_user()
self.client.force_login(user)
self.client.logout()
stream = Stream.objects.filter(provider=self.provider).first()
self.assertIsNotNone(stream)
event = StreamEvent.objects.filter(stream=stream).first()
self.assertIsNotNone(event)
self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
event_payload = event.payload["events"][
"https://schemas.openid.net/secevent/caep/event-type/session-revoked"
]
self.assertEqual(event_payload["initiating_entity"], "user")
self.assertEqual(event.payload["sub_id"]["format"], "complex")
self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque")
self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
def test_signal_password_change(self):
"""Test user password change"""
user = create_test_user()
self.client.force_login(user)
user.set_password(generate_id())
user.save()
stream = Stream.objects.filter(provider=self.provider).first()
self.assertIsNotNone(stream)
event = StreamEvent.objects.filter(stream=stream).first()
self.assertIsNotNone(event)
self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
event_payload = event.payload["events"][
"https://schemas.openid.net/secevent/caep/event-type/credential-change"
]
self.assertEqual(event_payload["change_type"], "update")
self.assertEqual(event_payload["credential_type"], "password")
self.assertEqual(event.payload["sub_id"]["format"], "complex")
self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
def test_signal_authenticator_added(self):
"""Test authenticator creation signal"""
user = create_test_user()
self.client.force_login(user)
dev = WebAuthnDevice.objects.create(
user=user,
name=generate_id(),
credential_id=generate_id(),
public_key=generate_id(),
aaguid=str(uuid4()),
)
stream = Stream.objects.filter(provider=self.provider).first()
self.assertIsNotNone(stream)
event = StreamEvent.objects.filter(stream=stream).exclude().first()
self.assertIsNotNone(event)
self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
event_payload = event.payload["events"][
"https://schemas.openid.net/secevent/caep/event-type/credential-change"
]
self.assertEqual(event_payload["change_type"], "create")
self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
self.assertEqual(event_payload["friendly_name"], dev.name)
self.assertEqual(event_payload["credential_type"], "fido-u2f")
self.assertEqual(event.payload["sub_id"]["format"], "complex")
self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
def test_signal_authenticator_deleted(self):
"""Test authenticator deletion signal"""
user = create_test_user()
self.client.force_login(user)
dev = WebAuthnDevice.objects.create(
user=user,
name=generate_id(),
credential_id=generate_id(),
public_key=generate_id(),
aaguid=str(uuid4()),
)
dev.delete()
stream = Stream.objects.filter(provider=self.provider).first()
self.assertIsNotNone(stream)
event = StreamEvent.objects.filter(stream=stream).exclude().first()
self.assertIsNotNone(event)
self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
event_payload = event.payload["events"][
"https://schemas.openid.net/secevent/caep/event-type/credential-change"
]
self.assertEqual(event_payload["change_type"], "delete")
self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
self.assertEqual(event_payload["friendly_name"], dev.name)
self.assertEqual(event_payload["credential_type"], "fido-u2f")
self.assertEqual(event.payload["sub_id"]["format"], "complex")
self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
def test_signal_policy_ignore(self):
"""Test event not being created for user that doesn't have access to the application"""
PolicyBinding.objects.create(
target=self.application, group=Group.objects.create(name=generate_id()), order=0
)
user = create_test_user()
self.client.force_login(user)
user.set_password(generate_id())
user.save()
stream = Stream.objects.filter(provider=self.provider).first()
self.assertIsNotNone(stream)
event = StreamEvent.objects.filter(
stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE
).first()
self.assertIsNone(event)

View File

@ -1,154 +0,0 @@
import json
from dataclasses import asdict
from django.urls import reverse
from django.utils import timezone
from rest_framework.test import APITestCase
from authentik.core.models import Application
from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow
from authentik.enterprise.providers.ssf.models import (
SSFEventStatus,
SSFProvider,
Stream,
StreamEvent,
)
from authentik.lib.generators import generate_id
from authentik.providers.oauth2.id_token import IDToken
from authentik.providers.oauth2.models import AccessToken, OAuth2Provider
class TestStream(APITestCase):
def setUp(self):
self.application = Application.objects.create(name=generate_id(), slug=generate_id())
self.provider = SSFProvider.objects.create(
name=generate_id(),
signing_key=create_test_cert(),
backchannel_application=self.application,
)
def test_stream_add_token(self):
"""test stream add (token auth)"""
res = self.client.post(
reverse(
"authentik_providers_ssf:stream",
kwargs={"application_slug": self.application.slug},
),
data={
"iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
"aud": ["https://app.authentik.company"],
"delivery": {
"method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
"endpoint_url": "https://app.authentik.company",
},
"events_requested": [
"https://schemas.openid.net/secevent/caep/event-type/credential-change",
"https://schemas.openid.net/secevent/caep/event-type/session-revoked",
],
"format": "iss_sub",
},
HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
)
self.assertEqual(res.status_code, 201)
stream = Stream.objects.filter(provider=self.provider).first()
self.assertIsNotNone(stream)
event = StreamEvent.objects.filter(stream=stream).first()
self.assertIsNotNone(event)
self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
self.assertEqual(
event.payload["events"],
{"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}},
)
def test_stream_add_poll(self):
"""test stream add - poll method"""
res = self.client.post(
reverse(
"authentik_providers_ssf:stream",
kwargs={"application_slug": self.application.slug},
),
data={
"iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
"aud": ["https://app.authentik.company"],
"delivery": {
"method": "https://schemas.openid.net/secevent/risc/delivery-method/poll",
},
"events_requested": [
"https://schemas.openid.net/secevent/caep/event-type/credential-change",
"https://schemas.openid.net/secevent/caep/event-type/session-revoked",
],
"format": "iss_sub",
},
HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
)
self.assertEqual(res.status_code, 400)
self.assertJSONEqual(
res.content,
{"delivery": {"method": ["Polling for SSF events is not currently supported."]}},
)
def test_stream_add_oidc(self):
"""test stream add (oidc auth)"""
provider = OAuth2Provider.objects.create(
name=generate_id(),
authorization_flow=create_test_flow(),
)
self.application.provider = provider
self.application.save()
user = create_test_admin_user()
token = AccessToken.objects.create(
provider=provider,
user=user,
token=generate_id(),
auth_time=timezone.now(),
_scope="openid user profile",
_id_token=json.dumps(
asdict(
IDToken("foo", "bar"),
)
),
)
res = self.client.post(
reverse(
"authentik_providers_ssf:stream",
kwargs={"application_slug": self.application.slug},
),
data={
"iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
"aud": ["https://app.authentik.company"],
"delivery": {
"method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
"endpoint_url": "https://app.authentik.company",
},
"events_requested": [
"https://schemas.openid.net/secevent/caep/event-type/credential-change",
"https://schemas.openid.net/secevent/caep/event-type/session-revoked",
],
"format": "iss_sub",
},
HTTP_AUTHORIZATION=f"Bearer {token.token}",
)
self.assertEqual(res.status_code, 201)
stream = Stream.objects.filter(provider=self.provider).first()
self.assertIsNotNone(stream)
event = StreamEvent.objects.filter(stream=stream).first()
self.assertIsNotNone(event)
self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
self.assertEqual(
event.payload["events"],
{"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}},
)
def test_stream_delete(self):
"""delete stream"""
stream = Stream.objects.create(provider=self.provider)
res = self.client.delete(
reverse(
"authentik_providers_ssf:stream",
kwargs={"application_slug": self.application.slug},
),
HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
)
self.assertEqual(res.status_code, 204)
self.assertFalse(Stream.objects.filter(pk=stream.pk).exists())

View File

@ -1,32 +0,0 @@
"""SSF provider URLs"""
from django.urls import path
from authentik.enterprise.providers.ssf.api.providers import SSFProviderViewSet
from authentik.enterprise.providers.ssf.api.streams import SSFStreamViewSet
from authentik.enterprise.providers.ssf.views.configuration import ConfigurationView
from authentik.enterprise.providers.ssf.views.jwks import JWKSview
from authentik.enterprise.providers.ssf.views.stream import StreamView
urlpatterns = [
path(
"application/ssf/<slug:application_slug>/ssf-jwks/",
JWKSview.as_view(),
name="jwks",
),
path(
".well-known/ssf-configuration/<slug:application_slug>",
ConfigurationView.as_view(),
name="configuration",
),
path(
"application/ssf/<slug:application_slug>/stream/",
StreamView.as_view(),
name="stream",
),
]
api_urlpatterns = [
("providers/ssf", SSFProviderViewSet),
("ssf/streams", SSFStreamViewSet),
]

View File

@ -1,66 +0,0 @@
"""SSF Token auth"""
from typing import TYPE_CHECKING, Any
from django.db.models import Q
from rest_framework.authentication import BaseAuthentication, get_authorization_header
from rest_framework.request import Request
from authentik.core.models import Token, TokenIntents, User
from authentik.enterprise.providers.ssf.models import SSFProvider
from authentik.providers.oauth2.models import AccessToken
if TYPE_CHECKING:
from authentik.enterprise.providers.ssf.views.base import SSFView
class SSFTokenAuth(BaseAuthentication):
"""SSF Token auth"""
view: "SSFView"
def __init__(self, view: "SSFView") -> None:
super().__init__()
self.view = view
def check_token(self, key: str) -> Token | None:
"""Check that a token exists, is not expired, and is assigned to the correct provider"""
token = Token.filter_not_expired(key=key, intent=TokenIntents.INTENT_API).first()
if not token:
return None
provider: SSFProvider = token.ssfprovider_set.first()
if not provider:
return None
self.view.application = provider.backchannel_application
self.view.provider = provider
return token
def check_jwt(self, jwt: str) -> AccessToken | None:
"""Check JWT-based authentication, this supports tokens issued either by providers
configured directly in the provider, and by providers assigned to the application
that the SSF provider is a backchannel provider of."""
token = AccessToken.filter_not_expired(token=jwt, revoked=False).first()
if not token:
return None
ssf_provider = SSFProvider.objects.filter(
Q(oidc_auth_providers__in=[token.provider])
| Q(backchannel_application__provider__in=[token.provider]),
).first()
if not ssf_provider:
return None
self.view.application = ssf_provider.backchannel_application
self.view.provider = ssf_provider
return token
def authenticate(self, request: Request) -> tuple[User, Any] | None:
auth = get_authorization_header(request).decode()
auth_type, _, key = auth.partition(" ")
if auth_type != "Bearer":
return None
token = self.check_token(key)
if token:
return (token.user, token)
jwt_token = self.check_jwt(key)
if jwt_token:
return (jwt_token.user, token)
return None

View File

@ -1,23 +0,0 @@
from django.http import HttpRequest
from rest_framework.permissions import IsAuthenticated
from rest_framework.views import APIView
from structlog.stdlib import BoundLogger, get_logger
from authentik.core.models import Application
from authentik.enterprise.providers.ssf.models import SSFProvider
from authentik.enterprise.providers.ssf.views.auth import SSFTokenAuth
class SSFView(APIView):
application: Application
provider: SSFProvider
logger: BoundLogger
permission_classes = [IsAuthenticated]
def setup(self, request: HttpRequest, *args, **kwargs) -> None:
self.logger = get_logger().bind()
super().setup(request, *args, **kwargs)
def get_authenticators(self):
return [SSFTokenAuth(self)]

View File

@ -1,55 +0,0 @@
from django.http import Http404, HttpRequest, HttpResponse, JsonResponse
from django.shortcuts import get_object_or_404
from django.urls import reverse
from rest_framework.permissions import AllowAny
from authentik.core.models import Application
from authentik.enterprise.providers.ssf.models import DeliveryMethods, SSFProvider
from authentik.enterprise.providers.ssf.views.base import SSFView
class ConfigurationView(SSFView):
"""SSF configuration endpoint"""
permission_classes = [AllowAny]
def get_authenticators(self):
return []
def get(self, request: HttpRequest, application_slug: str, *args, **kwargs) -> HttpResponse:
application = get_object_or_404(Application, slug=application_slug)
provider = application.backchannel_provider_for(SSFProvider)
if not provider:
raise Http404
data = {
"spec_version": "1_0-ID2",
"issuer": self.request.build_absolute_uri(
reverse(
"authentik_providers_ssf:configuration",
kwargs={
"application_slug": application.slug,
},
)
),
"jwks_uri": self.request.build_absolute_uri(
reverse(
"authentik_providers_ssf:jwks",
kwargs={
"application_slug": application.slug,
},
)
),
"configuration_endpoint": self.request.build_absolute_uri(
reverse(
"authentik_providers_ssf:stream",
kwargs={
"application_slug": application.slug,
},
)
),
"delivery_methods_supported": [
DeliveryMethods.RISC_PUSH,
],
"authorization_schemes": [{"spec_urn": "urn:ietf:rfc:6749"}],
}
return JsonResponse(data)

View File

@ -1,31 +0,0 @@
from django.http import Http404, HttpRequest, HttpResponse, JsonResponse
from django.shortcuts import get_object_or_404
from django.views import View
from authentik.core.models import Application
from authentik.crypto.models import CertificateKeyPair
from authentik.enterprise.providers.ssf.models import SSFProvider
from authentik.providers.oauth2.views.jwks import JWKSView as OAuthJWKSView
class JWKSview(View):
"""SSF JWKS endpoint, similar to the OAuth2 provider's endpoint"""
def get(self, request: HttpRequest, application_slug: str) -> HttpResponse:
"""Show JWK Key data for Provider"""
application = get_object_or_404(Application, slug=application_slug)
provider = application.backchannel_provider_for(SSFProvider)
if not provider:
raise Http404
signing_key: CertificateKeyPair = provider.signing_key
response_data = {}
jwk = OAuthJWKSView.get_jwk_for_key(signing_key, "sig")
if jwk:
response_data["keys"] = [jwk]
response = JsonResponse(response_data)
response["Access-Control-Allow-Origin"] = "*"
return response

View File

@ -1,130 +0,0 @@
from django.http import HttpRequest
from django.urls import reverse
from rest_framework.exceptions import PermissionDenied, ValidationError
from rest_framework.fields import CharField, ChoiceField, ListField, SerializerMethodField
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.serializers import ModelSerializer
from structlog.stdlib import get_logger
from authentik.core.api.utils import PassiveSerializer
from authentik.enterprise.providers.ssf.models import (
DeliveryMethods,
EventTypes,
SSFProvider,
Stream,
)
from authentik.enterprise.providers.ssf.tasks import send_ssf_event
from authentik.enterprise.providers.ssf.views.base import SSFView
LOGGER = get_logger()
class StreamDeliverySerializer(PassiveSerializer):
method = ChoiceField(choices=[(x.value, x.value) for x in DeliveryMethods])
endpoint_url = CharField(required=False)
def validate_method(self, method: DeliveryMethods):
"""Currently only push is supported"""
if method == DeliveryMethods.RISC_POLL:
raise ValidationError("Polling for SSF events is not currently supported.")
return method
def validate(self, attrs: dict) -> dict:
if attrs["method"] == DeliveryMethods.RISC_PUSH:
if not attrs.get("endpoint_url"):
raise ValidationError("Endpoint URL is required when using push.")
return attrs
class StreamSerializer(ModelSerializer):
delivery = StreamDeliverySerializer()
events_requested = ListField(
child=ChoiceField(choices=[(x.value, x.value) for x in EventTypes])
)
format = CharField()
aud = ListField(child=CharField())
def create(self, validated_data):
provider: SSFProvider = validated_data["provider"]
request: HttpRequest = self.context["request"]
iss = request.build_absolute_uri(
reverse(
"authentik_providers_ssf:configuration",
kwargs={
"application_slug": provider.backchannel_application.slug,
},
)
)
# Ensure that streams always get SET verification events sent to them
validated_data["events_requested"].append(EventTypes.SET_VERIFICATION)
return super().create(
{
"delivery_method": validated_data["delivery"]["method"],
"endpoint_url": validated_data["delivery"].get("endpoint_url"),
"format": validated_data["format"],
"provider": validated_data["provider"],
"events_requested": validated_data["events_requested"],
"aud": validated_data["aud"],
"iss": iss,
}
)
class Meta:
model = Stream
fields = [
"delivery",
"events_requested",
"format",
"aud",
]
class StreamResponseSerializer(PassiveSerializer):
stream_id = CharField(source="pk")
iss = CharField()
aud = ListField(child=CharField())
delivery = SerializerMethodField()
format = CharField()
events_requested = ListField(child=CharField())
events_supported = SerializerMethodField()
events_delivered = ListField(child=CharField(), source="events_requested")
def get_delivery(self, instance: Stream) -> StreamDeliverySerializer:
return {
"method": instance.delivery_method,
"endpoint_url": instance.endpoint_url,
}
def get_events_supported(self, instance: Stream) -> list[str]:
return [x.value for x in EventTypes]
class StreamView(SSFView):
def post(self, request: Request, *args, **kwargs) -> Response:
stream = StreamSerializer(data=request.data, context={"request": request})
stream.is_valid(raise_exception=True)
if not request.user.has_perm("authentik_providers_ssf.add_stream", self.provider):
raise PermissionDenied(
"User does not have permission to create stream for this provider."
)
instance: Stream = stream.save(provider=self.provider)
send_ssf_event(
EventTypes.SET_VERIFICATION,
{
"state": None,
},
stream_filter={"pk": instance.uuid},
sub_id={"format": "opaque", "id": str(instance.uuid)},
)
response = StreamResponseSerializer(instance=instance, context={"request": request}).data
return Response(response, status=201)
def delete(self, request: Request, *args, **kwargs) -> Response:
streams = Stream.objects.filter(provider=self.provider)
# Technically this parameter is required by the spec...
if "stream_id" in request.query_params:
streams = streams.filter(stream_id=request.query_params["stream_id"])
streams.delete()
return Response(status=204)

View File

@ -17,7 +17,6 @@ TENANT_APPS = [
"authentik.enterprise.providers.google_workspace",
"authentik.enterprise.providers.microsoft_entra",
"authentik.enterprise.providers.rac",
"authentik.enterprise.providers.ssf",
"authentik.enterprise.stages.authenticator_endpoint_gdtc",
"authentik.enterprise.stages.source",
]

View File

@ -53,13 +53,12 @@ class SystemTask(TenantTask):
if not isinstance(msg, LogEvent):
self._messages[idx] = LogEvent(msg, logger=self.__name__, log_level="info")
def set_error(self, exception: Exception, *messages: LogEvent):
def set_error(self, exception: Exception):
"""Set result to error and save exception"""
self._status = TaskStatus.ERROR
self._messages = list(messages)
self._messages.extend(
[LogEvent(exception_to_string(exception), logger=self.__name__, log_level="error")]
)
self._messages = [
LogEvent(exception_to_string(exception), logger=self.__name__, log_level="error")
]
def before_start(self, task_id, args, kwargs):
self._start_precise = perf_counter()

View File

@ -3,7 +3,6 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING
from django.contrib.messages import INFO, add_message
from django.http.request import HttpRequest
from structlog.stdlib import get_logger
@ -62,8 +61,6 @@ class ReevaluateMarker(StageMarker):
engine.request.context.update(plan.context)
engine.build()
result = engine.result
for message in result.messages:
add_message(http_request, INFO, message)
if result.passing:
return binding
LOGGER.warning(

View File

@ -109,8 +109,6 @@ class FlowPlan:
def pop(self):
"""Pop next pending stage from bottom of list"""
if not self.markers and not self.bindings:
return
self.markers.pop(0)
self.bindings.pop(0)
@ -158,13 +156,8 @@ class FlowPlan:
final_stage: type[StageView] = self.bindings[-1].stage.view
temp_exec = FlowExecutorView(flow=flow, request=request, plan=self)
temp_exec.current_stage = self.bindings[-1].stage
temp_exec.current_stage_view = final_stage
temp_exec.setup(request, flow.slug)
stage = final_stage(request=request, executor=temp_exec)
response = stage.dispatch(request)
# Ensure we clean the flow state we have in the session before we redirect away
temp_exec.stage_ok()
return response
return stage.dispatch(request)
get_qs = request.GET.copy()
if request.user.is_authenticated and (

View File

@ -103,7 +103,7 @@ class FlowExecutorView(APIView):
permission_classes = [AllowAny]
flow: Flow = None
flow: Flow
plan: FlowPlan | None = None
current_binding: FlowStageBinding | None = None
@ -114,8 +114,7 @@ class FlowExecutorView(APIView):
def setup(self, request: HttpRequest, flow_slug: str):
super().setup(request, flow_slug=flow_slug)
if not self.flow:
self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
self._logger = get_logger().bind(flow_slug=flow_slug)
set_tag("authentik.flow", self.flow.slug)

View File

@ -283,15 +283,12 @@ class ConfigLoader:
def get_optional_int(self, path: str, default=None) -> int | None:
"""Wrapper for get that converts value into int or None if set"""
value = self.get(path, default)
if value is UNSET:
return default
try:
return int(value)
except (ValueError, TypeError) as exc:
if value is None or (isinstance(value, str) and value.lower() == "null"):
return default
if value is UNSET:
return default
return None
self.log("warning", "Failed to parse config as int", path=path, exc=str(exc))
return default
@ -424,4 +421,4 @@ if __name__ == "__main__":
if len(argv) < 2: # noqa: PLR2004
print(dumps(CONFIG.raw, indent=4, cls=AttrEncoder))
else:
print(CONFIG.get(argv[-1]))
print(CONFIG.get(argv[1]))

View File

@ -1,26 +0,0 @@
from structlog.stdlib import get_logger
from authentik.lib.config import CONFIG
LOGGER = get_logger()
def start_debug_server(**kwargs) -> bool:
"""Attempt to start a debugpy server in the current process.
Returns true if the server was started successfully, otherwise false"""
if not CONFIG.get_bool("debug") and not CONFIG.get_bool("debugger"):
return
try:
import debugpy
except ImportError:
LOGGER.warning(
"Failed to import debugpy. debugpy is not included "
"in the default release dependencies and must be installed manually"
)
return False
listen: str = CONFIG.get("listen.listen_debug_py", "127.0.0.1:9901")
host, _, port = listen.rpartition(":")
debugpy.listen((host, int(port)), **kwargs) # nosec
LOGGER.debug("Starting debug server", host=host, port=port)
return True

View File

@ -8,7 +8,6 @@ postgresql:
password: "env://POSTGRES_PASSWORD"
test:
name: test_authentik
default_schema: public
read_replicas: {}
# For example
# 0:
@ -22,7 +21,6 @@ listen:
listen_radius: 0.0.0.0:1812
listen_metrics: 0.0.0.0:9300
listen_debug: 0.0.0.0:9900
listen_debug_py: 0.0.0.0:9901
trusted_proxy_cidrs:
- 127.0.0.0/8
- 10.0.0.0/8
@ -59,7 +57,7 @@ cache:
# transport_options: ""
debug: false
debugger: false
remote_debug: false
log_level: info

View File

@ -22,9 +22,9 @@ class OutgoingSyncProvider(Model):
class Meta:
abstract = True
def client_for_model[T: User | Group](
self, model: type[T]
) -> BaseOutgoingSyncClient[T, Any, Any, Self]:
def client_for_model[
T: User | Group
](self, model: type[T]) -> BaseOutgoingSyncClient[T, Any, Any, Self]:
raise NotImplementedError
def get_object_qs[T: User | Group](self, type: type[T]) -> QuerySet[T]:

View File

@ -1,54 +0,0 @@
"""Email utility functions"""
def mask_email(email: str | None) -> str | None:
"""Mask email address for privacy
Args:
email: Email address to mask
Returns:
Masked email address or None if input is None
Example:
mask_email("myname@company.org")
'm*****@c******.org'
"""
if not email:
return None
# Basic email format validation
if email.count("@") != 1:
raise ValueError("Invalid email format: Must contain exactly one '@' symbol")
local, domain = email.split("@")
if not local or not domain:
raise ValueError("Invalid email format: Local and domain parts cannot be empty")
domain_parts = domain.split(".")
if len(domain_parts) < 2: # noqa: PLR2004
raise ValueError("Invalid email format: Domain must contain at least one dot")
limit = 2
# Mask local part (keep first char)
if len(local) <= limit:
masked_local = "*" * len(local)
else:
masked_local = local[0] + "*" * (len(local) - 1)
# Mask each domain part except the last one (TLD)
masked_domain_parts = []
for _i, part in enumerate(domain_parts[:-1]): # Process all parts except TLD
if not part: # Check for empty parts (consecutive dots)
raise ValueError("Invalid email format: Domain parts cannot be empty")
if len(part) <= limit:
masked_part = "*" * len(part)
else:
masked_part = part[0] + "*" * (len(part) - 1)
masked_domain_parts.append(masked_part)
# Add TLD unchanged
if not domain_parts[-1]: # Check if TLD is empty
raise ValueError("Invalid email format: TLD cannot be empty")
masked_domain_parts.append(domain_parts[-1])
return f"{masked_local}@{'.'.join(masked_domain_parts)}"

View File

@ -42,8 +42,6 @@ class DebugSession(Session):
def get_http_session() -> Session:
"""Get a requests session with common headers"""
session = Session()
if CONFIG.get_bool("debug") or CONFIG.get("log_level") == "trace":
session = DebugSession()
session = DebugSession() if CONFIG.get_bool("debug") else Session()
session.headers["User-Agent"] = authentik_user_agent()
return session

View File

@ -42,12 +42,6 @@ class GeoIPPolicySerializer(CountryFieldMixin, PolicySerializer):
"asns",
"countries",
"countries_obj",
"check_history_distance",
"history_max_distance_km",
"distance_tolerance_km",
"history_login_count",
"check_impossible_travel",
"impossible_tolerance_km",
]

View File

@ -1,43 +0,0 @@
# Generated by Django 5.0.10 on 2025-01-02 20:40
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_policies_geoip", "0001_initial"),
]
operations = [
migrations.AddField(
model_name="geoippolicy",
name="check_history_distance",
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name="geoippolicy",
name="check_impossible_travel",
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name="geoippolicy",
name="distance_tolerance_km",
field=models.PositiveIntegerField(default=50),
),
migrations.AddField(
model_name="geoippolicy",
name="history_login_count",
field=models.PositiveIntegerField(default=5),
),
migrations.AddField(
model_name="geoippolicy",
name="history_max_distance_km",
field=models.PositiveBigIntegerField(default=100),
),
migrations.AddField(
model_name="geoippolicy",
name="impossible_tolerance_km",
field=models.PositiveIntegerField(default=100),
),
]

View File

@ -4,21 +4,15 @@ from itertools import chain
from django.contrib.postgres.fields import ArrayField
from django.db import models
from django.utils.timezone import now
from django.utils.translation import gettext as _
from django_countries.fields import CountryField
from geopy import distance
from rest_framework.serializers import BaseSerializer
from authentik.events.context_processors.geoip import GeoIPDict
from authentik.events.models import Event, EventAction
from authentik.policies.exceptions import PolicyException
from authentik.policies.geoip.exceptions import GeoIPNotFoundException
from authentik.policies.models import Policy
from authentik.policies.types import PolicyRequest, PolicyResult
MAX_DISTANCE_HOUR_KM = 1000
class GeoIPPolicy(Policy):
"""Ensure the user satisfies requirements of geography or network topology, based on IP
@ -27,15 +21,6 @@ class GeoIPPolicy(Policy):
asns = ArrayField(models.IntegerField(), blank=True, default=list)
countries = CountryField(multiple=True, blank=True)
distance_tolerance_km = models.PositiveIntegerField(default=50)
check_history_distance = models.BooleanField(default=False)
history_max_distance_km = models.PositiveBigIntegerField(default=100)
history_login_count = models.PositiveIntegerField(default=5)
check_impossible_travel = models.BooleanField(default=False)
impossible_tolerance_km = models.PositiveIntegerField(default=100)
@property
def serializer(self) -> type[BaseSerializer]:
from authentik.policies.geoip.api import GeoIPPolicySerializer
@ -52,27 +37,21 @@ class GeoIPPolicy(Policy):
- the client IP is advertised by an autonomous system with ASN in the `asns`
- the client IP is geolocated in a country of `countries`
"""
static_results: list[PolicyResult] = []
dynamic_results: list[PolicyResult] = []
results: list[PolicyResult] = []
if self.asns:
static_results.append(self.passes_asn(request))
results.append(self.passes_asn(request))
if self.countries:
static_results.append(self.passes_country(request))
results.append(self.passes_country(request))
if self.check_history_distance or self.check_impossible_travel:
dynamic_results.append(self.passes_distance(request))
if not static_results and not dynamic_results:
if not results:
return PolicyResult(True)
passing = any(r.passing for r in static_results) and all(r.passing for r in dynamic_results)
messages = chain(
*[r.messages for r in static_results], *[r.messages for r in dynamic_results]
)
passing = any(r.passing for r in results)
messages = chain(*[r.messages for r in results])
result = PolicyResult(passing, *messages)
result.source_results = list(chain(static_results, dynamic_results))
result.source_results = results
return result
@ -94,7 +73,7 @@ class GeoIPPolicy(Policy):
def passes_country(self, request: PolicyRequest) -> PolicyResult:
# This is not a single get chain because `request.context` can contain `{ "geoip": None }`.
geoip_data: GeoIPDict | None = request.context.get("geoip")
geoip_data = request.context.get("geoip")
country = geoip_data.get("country") if geoip_data else None
if not country:
@ -108,42 +87,6 @@ class GeoIPPolicy(Policy):
return PolicyResult(True)
def passes_distance(self, request: PolicyRequest) -> PolicyResult:
"""Check if current policy execution is out of distance range compared
to previous authentication requests"""
# Get previous login event and GeoIP data
previous_logins = Event.objects.filter(
action=EventAction.LOGIN, user__pk=request.user.pk, context__geo__isnull=False
).order_by("-created")[: self.history_login_count]
_now = now()
geoip_data: GeoIPDict | None = request.context.get("geoip")
if not geoip_data:
return PolicyResult(False)
for previous_login in previous_logins:
previous_login_geoip: GeoIPDict = previous_login.context["geo"]
# Figure out distance
dist = distance.geodesic(
(previous_login_geoip["lat"], previous_login_geoip["long"]),
(geoip_data["lat"], geoip_data["long"]),
)
if self.check_history_distance and dist.km >= (
self.history_max_distance_km - self.distance_tolerance_km
):
return PolicyResult(
False, _("Distance from previous authentication is larger than threshold.")
)
# Check if distance between `previous_login` and now is more
# than max distance per hour times the amount of hours since the previous login
# (round down to the lowest closest time of hours)
# clamped to be at least 1 hour
rel_time_hours = max(int((_now - previous_login.created).total_seconds() / 3600), 1)
if self.check_impossible_travel and dist.km >= (
(MAX_DISTANCE_HOUR_KM * rel_time_hours) - self.distance_tolerance_km
):
return PolicyResult(False, _("Distance is further than possible."))
return PolicyResult(True)
class Meta(Policy.PolicyMeta):
verbose_name = _("GeoIP Policy")
verbose_name_plural = _("GeoIP Policies")

View File

@ -1,10 +1,8 @@
"""geoip policy tests"""
from django.test import TestCase
from guardian.shortcuts import get_anonymous_user
from authentik.core.tests.utils import create_test_user
from authentik.events.models import Event, EventAction
from authentik.events.utils import get_user
from authentik.policies.engine import PolicyRequest, PolicyResult
from authentik.policies.exceptions import PolicyException
from authentik.policies.geoip.exceptions import GeoIPNotFoundException
@ -16,8 +14,8 @@ class TestGeoIPPolicy(TestCase):
def setUp(self):
super().setUp()
self.user = create_test_user()
self.request = PolicyRequest(self.user)
self.request = PolicyRequest(get_anonymous_user())
self.context_disabled_geoip = {}
self.context_unknown_ip = {"asn": None, "geoip": None}
@ -128,70 +126,3 @@ class TestGeoIPPolicy(TestCase):
result: PolicyResult = policy.passes(self.request)
self.assertTrue(result.passing)
def test_history(self):
"""Test history checks"""
Event.objects.create(
action=EventAction.LOGIN,
user=get_user(self.user),
context={
# Random location in Canada
"geo": {"lat": 55.868351, "long": -104.441011},
},
)
# Random location in Poland
self.request.context["geoip"] = {"lat": 50.950613, "long": 20.363679}
policy = GeoIPPolicy.objects.create(check_history_distance=True)
result: PolicyResult = policy.passes(self.request)
self.assertFalse(result.passing)
def test_history_no_data(self):
"""Test history checks (with no geoip data in context)"""
Event.objects.create(
action=EventAction.LOGIN,
user=get_user(self.user),
context={
# Random location in Canada
"geo": {"lat": 55.868351, "long": -104.441011},
},
)
policy = GeoIPPolicy.objects.create(check_history_distance=True)
result: PolicyResult = policy.passes(self.request)
self.assertFalse(result.passing)
def test_history_impossible_travel(self):
"""Test history checks"""
Event.objects.create(
action=EventAction.LOGIN,
user=get_user(self.user),
context={
# Random location in Canada
"geo": {"lat": 55.868351, "long": -104.441011},
},
)
# Random location in Poland
self.request.context["geoip"] = {"lat": 50.950613, "long": 20.363679}
policy = GeoIPPolicy.objects.create(check_impossible_travel=True)
result: PolicyResult = policy.passes(self.request)
self.assertFalse(result.passing)
def test_history_no_geoip(self):
"""Test history checks (previous login with no geoip data)"""
Event.objects.create(
action=EventAction.LOGIN,
user=get_user(self.user),
context={},
)
# Random location in Poland
self.request.context["geoip"] = {"lat": 50.950613, "long": 20.363679}
policy = GeoIPPolicy.objects.create(check_history_distance=True)
result: PolicyResult = policy.passes(self.request)
self.assertFalse(result.passing)

View File

@ -281,6 +281,7 @@ class OAuth2Provider(WebfingerProvider, Provider):
},
)
return request.build_absolute_uri(url)
except Provider.application.RelatedObjectDoesNotExist:
return None

View File

@ -1,10 +1,9 @@
from django.contrib.auth.signals import user_logged_out
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.http import HttpRequest
from authentik.core.models import User
from authentik.providers.oauth2.models import AccessToken, DeviceToken, RefreshToken
from authentik.providers.oauth2.models import AccessToken
@receiver(user_logged_out)
@ -13,13 +12,3 @@ def user_logged_out_oauth_access_token(sender, request: HttpRequest, user: User,
if not request.session or not request.session.session_key:
return
AccessToken.objects.filter(user=user, session__session_key=request.session.session_key).delete()
@receiver(post_save, sender=User)
def user_deactivated(sender, instance: User, **_):
"""Remove user tokens when deactivated"""
if instance.is_active:
return
AccessToken.objects.filter(session__user=instance).delete()
RefreshToken.objects.filter(session__user=instance).delete()
DeviceToken.objects.filter(session__user=instance).delete()

View File

@ -150,7 +150,6 @@ class TestToken(OAuthTestCase):
"id_token": provider.encode(
access.id_token.to_dict(),
),
"scope": "",
},
)
self.validate_jwt(access, provider)
@ -243,7 +242,6 @@ class TestToken(OAuthTestCase):
"id_token": provider.encode(
access.id_token.to_dict(),
),
"scope": "offline_access",
},
)
self.validate_jwt(access, provider)
@ -303,7 +301,6 @@ class TestToken(OAuthTestCase):
"id_token": provider.encode(
access.id_token.to_dict(),
),
"scope": "offline_access",
},
)

View File

@ -499,11 +499,11 @@ class OAuthFulfillmentStage(StageView):
)
challenge.is_valid()
self.executor.stage_ok()
return HttpChallengeResponse(
challenge=challenge,
)
self.executor.stage_ok()
return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme])
def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:

View File

@ -64,8 +64,7 @@ def to_base64url_uint(val: int, min_length: int = 0) -> bytes:
class JWKSView(View):
"""Show RSA Key data for Provider"""
@staticmethod
def get_jwk_for_key(key: CertificateKeyPair, use: str) -> dict | None:
def get_jwk_for_key(self, key: CertificateKeyPair, use: str) -> dict | None:
"""Convert a certificate-key pair into JWK"""
private_key = key.private_key
key_data = None
@ -124,12 +123,12 @@ class JWKSView(View):
response_data = {}
if signing_key := provider.signing_key:
jwk = JWKSView.get_jwk_for_key(signing_key, "sig")
jwk = self.get_jwk_for_key(signing_key, "sig")
if jwk:
response_data.setdefault("keys", [])
response_data["keys"].append(jwk)
if encryption_key := provider.encryption_key:
jwk = JWKSView.get_jwk_for_key(encryption_key, "enc")
jwk = self.get_jwk_for_key(encryption_key, "enc")
if jwk:
response_data.setdefault("keys", [])
response_data["keys"].append(jwk)

View File

@ -627,7 +627,6 @@ class TokenView(View):
response = {
"access_token": access_token.token,
"token_type": TOKEN_TYPE,
"scope": " ".join(access_token.scope),
"expires_in": int(
timedelta_from_string(self.provider.access_token_validity).total_seconds()
),
@ -711,7 +710,6 @@ class TokenView(View):
"access_token": access_token.token,
"refresh_token": refresh_token.token,
"token_type": TOKEN_TYPE,
"scope": " ".join(access_token.scope),
"expires_in": int(
timedelta_from_string(self.provider.access_token_validity).total_seconds()
),
@ -738,7 +736,6 @@ class TokenView(View):
return {
"access_token": access_token.token,
"token_type": TOKEN_TYPE,
"scope": " ".join(access_token.scope),
"expires_in": int(
timedelta_from_string(self.provider.access_token_validity).total_seconds()
),
@ -770,7 +767,6 @@ class TokenView(View):
response = {
"access_token": access_token.token,
"token_type": TOKEN_TYPE,
"scope": " ".join(access_token.scope),
"expires_in": int(
timedelta_from_string(self.provider.access_token_validity).total_seconds()
),

View File

@ -2,7 +2,7 @@
from django.apps import apps
from django.contrib.auth.models import Permission
from django.db.models import Q, QuerySet
from django.db.models import QuerySet
from django_filters.filters import ModelChoiceFilter
from django_filters.filterset import FilterSet
from django_filters.rest_framework import DjangoFilterBackend
@ -18,7 +18,6 @@ from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.permissions import IsAuthenticated
from rest_framework.viewsets import ReadOnlyModelViewSet
from authentik.blueprints.v1.importer import excluded_models
from authentik.core.api.utils import ModelSerializer, PassiveSerializer
from authentik.core.models import User
from authentik.lib.validators import RequiredTogetherValidator
@ -106,13 +105,13 @@ class RBACPermissionViewSet(ReadOnlyModelViewSet):
]
def get_queryset(self) -> QuerySet:
query = Q()
for model in excluded_models():
query |= Q(
content_type__app_label=model._meta.app_label,
content_type__model=model._meta.model_name,
return (
Permission.objects.all()
.select_related("content_type")
.filter(
content_type__app_label__startswith="authentik",
)
return Permission.objects.all().select_related("content_type").exclude(query)
)
class PermissionAssignSerializer(PassiveSerializer):

View File

@ -7,12 +7,7 @@ from psycopg import connect
from authentik.lib.config import CONFIG
# We need to string format the query as tables and schemas can't be set by parameters
# not a security issue as the config value is set by the person installing authentik
# which also has postgres credentials etc
QUERY = """SELECT id FROM {}.authentik_install_id ORDER BY id LIMIT 1;""".format( # nosec
CONFIG.get("postgresql.default_schema")
)
QUERY = """SELECT id FROM public.authentik_install_id ORDER BY id LIMIT 1;"""
@lru_cache

View File

@ -100,7 +100,6 @@ TENANT_APPS = [
"authentik.sources.scim",
"authentik.stages.authenticator",
"authentik.stages.authenticator_duo",
"authentik.stages.authenticator_email",
"authentik.stages.authenticator_sms",
"authentik.stages.authenticator_static",
"authentik.stages.authenticator_totp",
@ -130,7 +129,6 @@ TENANT_DOMAIN_MODEL = "authentik_tenants.Domain"
TENANT_CREATION_FAKES_MIGRATIONS = True
TENANT_BASE_SCHEMA = "template"
PUBLIC_SCHEMA_NAME = CONFIG.get("postgresql.default_schema")
GUARDIAN_MONKEY_PATCH = False

View File

@ -1,4 +1,3 @@
import math
from os import environ
from ssl import OPENSSL_VERSION
@ -25,20 +24,3 @@ def pytest_report_header(*_, **__):
f"authentik version: {get_full_version()}",
f"OpenSSL version: {OPENSSL_VERSION}, FIPS: {backend._fips_enabled}",
]
def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None:
current_id = int(environ.get("CI_RUN_ID", 0)) - 1
total_ids = int(environ.get("CI_TOTAL_RUNS", 0))
if total_ids:
num_tests = len(items)
matrix_size = math.ceil(num_tests / total_ids)
start = current_id * matrix_size
end = (current_id + 1) * matrix_size
deselected_items = items[:start] + items[end:]
config.hook.pytest_deselected(items=deselected_items)
items[:] = items[start:end]
print(f" Executing {start} - {end} tests")

View File

@ -66,7 +66,6 @@ class KerberosSourceViewSet(UsedByMixin, ModelViewSet):
serializer_class = KerberosSourceSerializer
lookup_field = "slug"
filterset_fields = [
"pbm_uuid",
"name",
"slug",
"enabled",

View File

@ -110,7 +110,6 @@ class LDAPSourceViewSet(UsedByMixin, ModelViewSet):
serializer_class = LDAPSourceSerializer
lookup_field = "slug"
filterset_fields = [
"pbm_uuid",
"name",
"slug",
"enabled",

View File

@ -152,7 +152,6 @@ class OAuthSourceFilter(FilterSet):
class Meta:
model = OAuthSource
fields = [
"pbm_uuid",
"name",
"slug",
"enabled",

View File

@ -52,7 +52,6 @@ class PlexSourceViewSet(UsedByMixin, ModelViewSet):
serializer_class = PlexSourceSerializer
lookup_field = "slug"
filterset_fields = [
"pbm_uuid",
"name",
"slug",
"enabled",

View File

@ -44,7 +44,6 @@ class SAMLSourceViewSet(UsedByMixin, ModelViewSet):
serializer_class = SAMLSourceSerializer
lookup_field = "slug"
filterset_fields = [
"pbm_uuid",
"name",
"slug",
"enabled",

View File

@ -53,6 +53,6 @@ class SCIMSourceViewSet(UsedByMixin, ModelViewSet):
queryset = SCIMSource.objects.all()
serializer_class = SCIMSourceSerializer
lookup_field = "slug"
filterset_fields = ["pbm_uuid", "name", "slug"]
filterset_fields = ["name", "slug"]
search_fields = ["name", "slug", "token__identifier", "token__user__username"]
ordering = ["name"]

View File

@ -114,7 +114,7 @@ class SCIMView(APIView):
class SCIMObjectView(SCIMView):
"""Base SCIM View for object management"""
"""Base SCIM View for object management"""
mapper: SourceMapper
manager: PropertyMappingManager

View File

@ -1,85 +0,0 @@
"""AuthenticatorEmailStage API Views"""
from rest_framework import mixins
from rest_framework.viewsets import GenericViewSet, ModelViewSet
from authentik.core.api.groups import GroupMemberSerializer
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer
from authentik.flows.api.stages import StageSerializer
from authentik.stages.authenticator_email.models import AuthenticatorEmailStage, EmailDevice
class AuthenticatorEmailStageSerializer(StageSerializer):
"""AuthenticatorEmailStage Serializer"""
class Meta:
model = AuthenticatorEmailStage
fields = StageSerializer.Meta.fields + [
"configure_flow",
"friendly_name",
"use_global_settings",
"host",
"port",
"username",
"password",
"use_tls",
"use_ssl",
"timeout",
"from_address",
"subject",
"token_expiry",
"template",
]
class AuthenticatorEmailStageViewSet(UsedByMixin, ModelViewSet):
"""AuthenticatorEmailStage Viewset"""
queryset = AuthenticatorEmailStage.objects.all()
serializer_class = AuthenticatorEmailStageSerializer
filterset_fields = "__all__"
ordering = ["name"]
search_fields = ["name"]
class EmailDeviceSerializer(ModelSerializer):
"""Serializer for email authenticator devices"""
user = GroupMemberSerializer(read_only=True)
class Meta:
model = EmailDevice
fields = ["name", "pk", "email", "user"]
depth = 2
extra_kwargs = {
"email": {"read_only": True},
}
class EmailDeviceViewSet(
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
UsedByMixin,
mixins.ListModelMixin,
GenericViewSet,
):
"""Viewset for email authenticator devices"""
queryset = EmailDevice.objects.all()
serializer_class = EmailDeviceSerializer
search_fields = ["name"]
filterset_fields = ["name"]
ordering = ["name"]
owner_field = "user"
class EmailAdminDeviceViewSet(ModelViewSet):
"""Viewset for email authenticator devices (for admins)"""
queryset = EmailDevice.objects.all()
serializer_class = EmailDeviceSerializer
search_fields = ["name"]
filterset_fields = ["name"]
ordering = ["name"]

View File

@ -1,12 +0,0 @@
"""Email Authenticator"""
from authentik.blueprints.apps import ManagedAppConfig
class AuthentikStageAuthenticatorEmailConfig(ManagedAppConfig):
"""Email Authenticator App config"""
name = "authentik.stages.authenticator_email"
label = "authentik_stages_authenticator_email"
verbose_name = "authentik Stages.Authenticator.Email"
default = True

View File

@ -1,132 +0,0 @@
# Generated by Django 5.0.10 on 2025-01-27 20:05
import django.db.models.deletion
import django.utils.timezone
from django.conf import settings
from django.db import migrations, models
import authentik.lib.utils.time
class Migration(migrations.Migration):
initial = True
dependencies = [
("authentik_flows", "0027_auto_20231028_1424"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="AuthenticatorEmailStage",
fields=[
(
"stage_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="authentik_flows.stage",
),
),
("friendly_name", models.TextField(null=True)),
(
"use_global_settings",
models.BooleanField(
default=False,
help_text="When enabled, global Email connection settings will be used and connection settings below will be ignored.",
),
),
("host", models.TextField(default="localhost")),
("port", models.IntegerField(default=25)),
("username", models.TextField(blank=True, default="")),
("password", models.TextField(blank=True, default="")),
("use_tls", models.BooleanField(default=False)),
("use_ssl", models.BooleanField(default=False)),
("timeout", models.IntegerField(default=10)),
(
"from_address",
models.EmailField(default="system@authentik.local", max_length=254),
),
(
"token_expiry",
models.TextField(
default="minutes=30",
help_text="Time the token sent is valid (Format: hours=3,minutes=17,seconds=300).",
validators=[authentik.lib.utils.time.timedelta_string_validator],
),
),
("subject", models.TextField(default="authentik Sign-in code")),
("template", models.TextField(default="email/email_otp.html")),
(
"configure_flow",
models.ForeignKey(
blank=True,
help_text="Flow used by an authenticated user to configure this Stage. If empty, user will not be able to configure this stage.",
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="authentik_flows.flow",
),
),
],
options={
"verbose_name": "Email Authenticator Setup Stage",
"verbose_name_plural": "Email Authenticator Setup Stages",
},
bases=("authentik_flows.stage", models.Model),
),
migrations.CreateModel(
name="EmailDevice",
fields=[
(
"id",
models.AutoField(
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
),
),
("created", models.DateTimeField(auto_now_add=True)),
("last_updated", models.DateTimeField(auto_now=True)),
(
"name",
models.CharField(
help_text="The human-readable name of this device.", max_length=64
),
),
(
"confirmed",
models.BooleanField(default=True, help_text="Is this device ready for use?"),
),
("token", models.CharField(blank=True, max_length=16, null=True)),
(
"valid_until",
models.DateTimeField(
default=django.utils.timezone.now,
help_text="The timestamp of the moment of expiry of the saved token.",
),
),
("email", models.EmailField(max_length=254)),
("last_used", models.DateTimeField(auto_now=True)),
(
"stage",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="authentik_stages_authenticator_email.authenticatoremailstage",
),
),
(
"user",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL
),
),
],
options={
"verbose_name": "Email Device",
"verbose_name_plural": "Email Devices",
"unique_together": {("user", "email")},
},
),
]

View File

@ -1,167 +0,0 @@
from django.contrib.auth import get_user_model
from django.core.mail.backends.base import BaseEmailBackend
from django.core.mail.backends.smtp import EmailBackend
from django.db import models
from django.template import TemplateSyntaxError
from django.utils.translation import gettext_lazy as _
from django.views import View
from rest_framework.serializers import BaseSerializer
from authentik.events.models import Event, EventAction
from authentik.flows.exceptions import StageInvalidException
from authentik.flows.models import ConfigurableStage, FriendlyNamedStage, Stage
from authentik.lib.config import CONFIG
from authentik.lib.models import SerializerModel
from authentik.lib.utils.errors import exception_to_string
from authentik.lib.utils.time import timedelta_string_validator
from authentik.stages.authenticator.models import SideChannelDevice
from authentik.stages.email.utils import TemplateEmailMessage
class EmailTemplates(models.TextChoices):
"""Templates used for rendering the Email"""
EMAIL_OTP = (
"email/email_otp.html",
_("Email OTP"),
) # nosec
class AuthenticatorEmailStage(ConfigurableStage, FriendlyNamedStage, Stage):
"""Use Email-based authentication instead of authenticator-based."""
use_global_settings = models.BooleanField(
default=False,
help_text=_(
"When enabled, global Email connection settings will be used and "
"connection settings below will be ignored."
),
)
host = models.TextField(default="localhost")
port = models.IntegerField(default=25)
username = models.TextField(default="", blank=True)
password = models.TextField(default="", blank=True)
use_tls = models.BooleanField(default=False)
use_ssl = models.BooleanField(default=False)
timeout = models.IntegerField(default=10)
from_address = models.EmailField(default="system@authentik.local")
token_expiry = models.TextField(
default="minutes=30",
validators=[timedelta_string_validator],
help_text=_("Time the token sent is valid (Format: hours=3,minutes=17,seconds=300)."),
)
subject = models.TextField(default="authentik Sign-in code")
template = models.TextField(default=EmailTemplates.EMAIL_OTP)
@property
def serializer(self) -> type[BaseSerializer]:
from authentik.stages.authenticator_email.api import AuthenticatorEmailStageSerializer
return AuthenticatorEmailStageSerializer
@property
def view(self) -> type[View]:
from authentik.stages.authenticator_email.stage import AuthenticatorEmailStageView
return AuthenticatorEmailStageView
@property
def component(self) -> str:
return "ak-stage-authenticator-email-form"
@property
def backend_class(self) -> type[BaseEmailBackend]:
"""Get the email backend class to use"""
return EmailBackend
@property
def backend(self) -> BaseEmailBackend:
"""Get fully configured Email Backend instance"""
if self.use_global_settings:
CONFIG.refresh("email.password")
return self.backend_class(
host=CONFIG.get("email.host"),
port=CONFIG.get_int("email.port"),
username=CONFIG.get("email.username"),
password=CONFIG.get("email.password"),
use_tls=CONFIG.get_bool("email.use_tls", False),
use_ssl=CONFIG.get_bool("email.use_ssl", False),
timeout=CONFIG.get_int("email.timeout"),
)
return self.backend_class(
host=self.host,
port=self.port,
username=self.username,
password=self.password,
use_tls=self.use_tls,
use_ssl=self.use_ssl,
timeout=self.timeout,
)
def send(self, device: "EmailDevice"):
# Lazy import here to avoid circular import
from authentik.stages.email.tasks import send_mails
# Compose the message using templates
message = device._compose_email()
return send_mails(device.stage, message)
def __str__(self):
return f"Email Authenticator Stage {self.name}"
class Meta:
verbose_name = _("Email Authenticator Setup Stage")
verbose_name_plural = _("Email Authenticator Setup Stages")
class EmailDevice(SerializerModel, SideChannelDevice):
"""Email Device"""
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
email = models.EmailField()
stage = models.ForeignKey(AuthenticatorEmailStage, on_delete=models.CASCADE)
last_used = models.DateTimeField(auto_now=True)
@property
def serializer(self) -> type[BaseSerializer]:
from authentik.stages.authenticator_email.api import EmailDeviceSerializer
return EmailDeviceSerializer
def _compose_email(self) -> TemplateEmailMessage:
try:
pending_user = self.user
stage = self.stage
email = self.email
message = TemplateEmailMessage(
subject=_(stage.subject),
to=[(pending_user.name, email)],
template_name=stage.template,
template_context={
"user": pending_user,
"expires": self.valid_until,
"token": self.token,
},
)
return message
except TemplateSyntaxError as exc:
Event.new(
EventAction.CONFIGURATION_ERROR,
message=_("Exception occurred while rendering E-mail template"),
error=exception_to_string(exc),
template=stage.template,
).from_http(self.request)
raise StageInvalidException from exc
def __str__(self):
if not self.pk:
return "New Email Device"
return f"Email Device for {self.user_id}"
class Meta:
verbose_name = _("Email Device")
verbose_name_plural = _("Email Devices")
unique_together = (("user", "email"),)

View File

@ -1,177 +0,0 @@
"""Email Setup stage"""
from django.db.models import Q
from django.http import HttpRequest, HttpResponse
from django.http.request import QueryDict
from django.template.exceptions import TemplateSyntaxError
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ValidationError
from rest_framework.fields import BooleanField, CharField, IntegerField
from authentik.events.models import Event, EventAction
from authentik.flows.challenge import (
Challenge,
ChallengeResponse,
WithUserInfoChallenge,
)
from authentik.flows.exceptions import StageInvalidException
from authentik.flows.stage import ChallengeStageView
from authentik.lib.utils.email import mask_email
from authentik.lib.utils.errors import exception_to_string
from authentik.lib.utils.time import timedelta_from_string
from authentik.stages.authenticator_email.models import (
AuthenticatorEmailStage,
EmailDevice,
)
from authentik.stages.email.tasks import send_mails
from authentik.stages.email.utils import TemplateEmailMessage
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
SESSION_KEY_EMAIL_DEVICE = "authentik/stages/authenticator_email/email_device"
PLAN_CONTEXT_EMAIL = "email"
PLAN_CONTEXT_EMAIL_SENT = "email_sent"
PLAN_CONTEXT_EMAIL_OVERRIDE = "email"
class AuthenticatorEmailChallenge(WithUserInfoChallenge):
"""Authenticator Email Setup challenge"""
# Set to true if no previous prompt stage set the email
# this stage will also check prompt_data.email
email = CharField(default=None, allow_blank=True, allow_null=True)
email_required = BooleanField(default=True)
component = CharField(default="ak-stage-authenticator-email")
class AuthenticatorEmailChallengeResponse(ChallengeResponse):
"""Authenticator Email Challenge response, device is set by get_response_instance"""
device: EmailDevice
code = IntegerField(required=False)
email = CharField(required=False)
component = CharField(default="ak-stage-authenticator-email")
def validate(self, attrs: dict) -> dict:
"""Check"""
if "code" not in attrs:
if "email" not in attrs:
raise ValidationError("email required")
self.device.email = attrs["email"]
self.stage.validate_and_send(attrs["email"])
return super().validate(attrs)
if not self.device.verify_token(str(attrs["code"])):
raise ValidationError(_("Code does not match"))
self.device.confirmed = True
return super().validate(attrs)
class AuthenticatorEmailStageView(ChallengeStageView):
"""Authenticator Email Setup stage"""
response_class = AuthenticatorEmailChallengeResponse
def validate_and_send(self, email: str):
"""Validate email and send message"""
pending_user = self.get_pending_user()
stage: AuthenticatorEmailStage = self.executor.current_stage
if EmailDevice.objects.filter(Q(email=email), stage=stage.pk).exists():
raise ValidationError(_("Invalid email"))
device: EmailDevice = self.request.session[SESSION_KEY_EMAIL_DEVICE]
try:
message = TemplateEmailMessage(
subject=_(stage.subject),
to=[(pending_user.name, email)],
language=pending_user.locale(self.request),
template_name=stage.template,
template_context={
"user": pending_user,
"expires": device.valid_until,
"token": device.token,
},
)
send_mails(stage, message)
except TemplateSyntaxError as exc:
Event.new(
EventAction.CONFIGURATION_ERROR,
message=_("Exception occurred while rendering E-mail template"),
error=exception_to_string(exc),
template=stage.template,
).from_http(self.request)
raise StageInvalidException from exc
def _has_email(self) -> str | None:
context = self.executor.plan.context
# Check user's email attribute
user = self.get_pending_user()
if user.email:
self.logger.debug("got email from user attributes")
return user.email
# Check plan context for email
if PLAN_CONTEXT_EMAIL in context.get(PLAN_CONTEXT_PROMPT, {}):
self.logger.debug("got email from plan context")
return context.get(PLAN_CONTEXT_PROMPT, {}).get(PLAN_CONTEXT_EMAIL)
# Check device for email
if SESSION_KEY_EMAIL_DEVICE in self.request.session:
self.logger.debug("got email from device in session")
device: EmailDevice = self.request.session[SESSION_KEY_EMAIL_DEVICE]
if device.email == "":
return None
return device.email
return None
def get_challenge(self, *args, **kwargs) -> Challenge:
email = self._has_email()
return AuthenticatorEmailChallenge(
data={
"email": mask_email(email),
"email_required": email is None,
}
)
def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
response = super().get_response_instance(data)
response.device = self.request.session[SESSION_KEY_EMAIL_DEVICE]
return response
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
user = self.get_pending_user()
stage: AuthenticatorEmailStage = self.executor.current_stage
if SESSION_KEY_EMAIL_DEVICE not in self.request.session:
device = EmailDevice(user=user, confirmed=False, stage=stage, name="Email Device")
valid_secs: int = timedelta_from_string(stage.token_expiry).total_seconds()
device.generate_token(valid_secs=valid_secs, commit=False)
self.request.session[SESSION_KEY_EMAIL_DEVICE] = device
if email := self._has_email():
device.email = email
try:
self.validate_and_send(email)
except ValidationError as exc:
# We had an email given already (at this point only possible from flow
# context), but an error occurred while sending (most likely)
# due to a duplicate device, so delete the email we got given, reset the state
# (ish) and retry
device.email = ""
self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop(
PLAN_CONTEXT_EMAIL, None
)
self.request.session.pop(SESSION_KEY_EMAIL_DEVICE, None)
self.logger.warning("failed to send email to pre-set address", exc=exc)
return self.get(request, *args, **kwargs)
return super().get(request, *args, **kwargs)
def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
"""Email Token is validated by challenge"""
device: EmailDevice = self.request.session[SESSION_KEY_EMAIL_DEVICE]
if not device.confirmed:
return self.challenge_invalid(response)
device.save()
del self.request.session[SESSION_KEY_EMAIL_DEVICE]
return self.executor.stage_ok()

View File

@ -1,44 +0,0 @@
{% extends "email/base.html" %}
{% load i18n %}
{% load humanize %}
{% block content %}
<tr>
<td align="center">
<h1>
{% blocktrans with username=user.username %}
Hi {{ username }},
{% endblocktrans %}
</h1>
</td>
</tr>
<tr>
<td align="center">
<table border="0">
<tr>
<td align="center" style="max-width: 300px; padding: 20px 0; color: #212124;">
{% blocktrans %}
Email MFA code.
{% endblocktrans %}
</td>
</tr>
<tr>
<td align="center" class="btn btn-primary">
{{ token }}
</td>
</tr>
</table>
</td>
</tr>
{% endblock %}
{% block sub_content %}
<tr>
<td style="padding: 20px; font-size: 12px; color: #212124;" align="center">
{% blocktrans with expires=expires|timeuntil %}
If you did not request this code, please ignore this email. The code above is valid for {{ expires }}.
{% endblocktrans %}
</td>
</tr>
{% endblock %}

View File

@ -1,13 +0,0 @@
{% load i18n %}{% load humanize %}{% autoescape off %}{% blocktrans with username=user.username %}Hi {{ username }},{% endblocktrans %}
{% blocktrans %}
Email MFA code
{% endblocktrans %}
{{ token }}
{% blocktrans with expires=expires|timeuntil %}
If you did not request this code, please ignore this email. The code above is valid for {{ expires }}.
{% endblocktrans %}
--
Powered by goauthentik.io.
{% endautoescape %}

View File

@ -1,340 +0,0 @@
"""Test Email Authenticator API"""
from datetime import timedelta
from unittest.mock import MagicMock, PropertyMock, patch
from django.core import mail
from django.core.mail.backends.smtp import EmailBackend
from django.db.utils import IntegrityError
from django.template.exceptions import TemplateDoesNotExist
from django.urls import reverse
from django.utils.timezone import now
from authentik.core.tests.utils import create_test_admin_user, create_test_flow, create_test_user
from authentik.flows.models import FlowStageBinding
from authentik.flows.tests import FlowTestCase
from authentik.lib.config import CONFIG
from authentik.lib.utils.email import mask_email
from authentik.stages.authenticator_email.api import (
AuthenticatorEmailStageSerializer,
EmailDeviceSerializer,
)
from authentik.stages.authenticator_email.models import AuthenticatorEmailStage, EmailDevice
from authentik.stages.authenticator_email.stage import (
SESSION_KEY_EMAIL_DEVICE,
)
from authentik.stages.email.utils import TemplateEmailMessage
class TestAuthenticatorEmailStage(FlowTestCase):
"""Test Email Authenticator stage"""
def setUp(self):
super().setUp()
self.flow = create_test_flow()
self.user = create_test_admin_user()
self.user_noemail = create_test_user(email="")
self.stage = AuthenticatorEmailStage.objects.create(
name="email-authenticator",
use_global_settings=True,
from_address="test@authentik.local",
configure_flow=self.flow,
token_expiry="minutes=30",
) # nosec
self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
self.device = EmailDevice.objects.create(
user=self.user,
stage=self.stage,
email="test@authentik.local",
)
self.client.force_login(self.user)
def test_device_str(self):
"""Test string representation of device"""
self.assertEqual(str(self.device), f"Email Device for {self.user.pk}")
# Test unsaved device
unsaved_device = EmailDevice(
user=self.user,
stage=self.stage,
email="test@authentik.local",
)
self.assertEqual(str(unsaved_device), "New Email Device")
def test_stage_str(self):
"""Test string representation of stage"""
self.assertEqual(str(self.stage), f"Email Authenticator Stage {self.stage.name}")
def test_token_lifecycle(self):
"""Test token generation, validation and expiry"""
# Initially no token
self.assertIsNone(self.device.token)
# Generate token
self.device.generate_token()
token = self.device.token
self.assertIsNotNone(token)
self.assertIsNotNone(self.device.valid_until)
self.assertTrue(self.device.valid_until > now())
# Verify invalid token
self.assertFalse(self.device.verify_token("000000"))
# Verify correct token (should clear token after verification)
self.assertTrue(self.device.verify_token(token))
self.assertIsNone(self.device.token)
def test_stage_no_prefill(self):
"""Test stage without prefilled email"""
self.client.force_login(self.user_noemail)
with patch(
"authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
PropertyMock(return_value=EmailBackend),
):
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
)
self.assertStageResponse(
response,
self.flow,
self.user_noemail,
component="ak-stage-authenticator-email",
email_required=True,
)
def test_stage_submit(self):
"""Test stage email submission"""
# Initialize the flow
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
)
self.assertStageResponse(
response,
self.flow,
self.user,
component="ak-stage-authenticator-email",
email_required=False,
)
# Test email submission with locmem backend
def mock_send_mails(stage, *messages):
"""Mock send_mails to send directly"""
for message in messages:
message.send()
with (
patch(
"authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
return_value=EmailBackend,
),
patch(
"authentik.stages.authenticator_email.stage.send_mails",
side_effect=mock_send_mails,
),
):
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
data={"component": "ak-stage-authenticator-email", "email": "test@example.com"},
)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(mail.outbox), 1)
sent_mail = mail.outbox[0]
self.assertEqual(sent_mail.subject, self.stage.subject)
self.assertEqual(sent_mail.to, [f"{self.user} <test@example.com>"])
# Get from_address from global email config to test if global settings are being used
from_address_global = CONFIG.get("email.from")
self.assertEqual(sent_mail.from_email, from_address_global)
self.assertStageResponse(
response,
self.flow,
self.user,
component="ak-stage-authenticator-email",
response_errors={},
email_required=False,
)
def test_email_template(self):
"""Test email template rendering"""
self.device.generate_token()
message = self.device._compose_email()
self.assertIsInstance(message, TemplateEmailMessage)
self.assertEqual(message.subject, self.stage.subject)
self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
self.assertTrue(self.device.token in message.body)
def test_duplicate_email(self):
"""Test attempting to use same email twice"""
email = "test2@authentik.local"
# First device
EmailDevice.objects.create(
user=self.user,
stage=self.stage,
email=email,
)
# Attempt to create second device with same email
with self.assertRaises(IntegrityError):
EmailDevice.objects.create(
user=self.user,
stage=self.stage,
email=email,
)
def test_token_expiry(self):
"""Test token expiration behavior"""
self.device.generate_token()
token = self.device.token
# Set token as expired
self.device.valid_until = now() - timedelta(minutes=1)
self.device.save()
# Verify expired token fails
self.assertFalse(self.device.verify_token(token))
def test_template_errors(self):
"""Test handling of template errors"""
self.stage.template = "{% invalid template %}"
with self.assertRaises(TemplateDoesNotExist):
self.stage.send(self.device)
def test_challenge_response_validation(self):
"""Test challenge response validation"""
# Initialize the flow
self.client.force_login(self.user_noemail)
with patch(
"authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
PropertyMock(return_value=EmailBackend),
):
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
)
# Test missing code and email
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
data={"component": "ak-stage-authenticator-email"},
)
self.assertIn("email required", str(response.content))
# Test invalid code
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
data={"component": "ak-stage-authenticator-email", "code": "000000"},
)
self.assertIn("Code does not match", str(response.content))
# Test valid code
self.client.force_login(self.user)
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
)
device = self.device
token = device.token
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
data={"component": "ak-stage-authenticator-email", "code": token},
)
self.assertEqual(response.status_code, 200)
self.assertTrue(device.confirmed)
def test_challenge_generation(self):
"""Test challenge generation"""
# Test with masked email
with patch(
"authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
PropertyMock(return_value=EmailBackend),
):
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
)
self.assertStageResponse(
response,
self.flow,
self.user,
component="ak-stage-authenticator-email",
email_required=False,
)
masked_email = mask_email(self.user.email)
self.assertEqual(masked_email, response.json()["email"])
# Test without email
self.client.force_login(self.user_noemail)
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
)
self.assertStageResponse(
response,
self.flow,
self.user_noemail,
component="ak-stage-authenticator-email",
email_required=True,
)
self.assertIsNone(response.json()["email"])
def test_session_management(self):
"""Test session device management"""
# Test device creation in session
with patch(
"authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
PropertyMock(return_value=EmailBackend),
):
# Delete any existing devices for this test
EmailDevice.objects.filter(user=self.user).delete()
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
)
self.assertIn(SESSION_KEY_EMAIL_DEVICE, self.client.session)
device = self.client.session[SESSION_KEY_EMAIL_DEVICE]
self.assertIsInstance(device, EmailDevice)
self.assertFalse(device.confirmed)
self.assertEqual(device.user, self.user)
# Test device confirmation and cleanup
device.confirmed = True
device.email = "new_test@authentik.local" # Use a different email
self.client.session[SESSION_KEY_EMAIL_DEVICE] = device
self.client.session.save()
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
data={"component": "ak-stage-authenticator-email", "code": device.token},
)
self.assertEqual(response.status_code, 200)
self.assertTrue(device.confirmed)
# Session key should be removed after device is saved
device.save()
self.assertNotIn(SESSION_KEY_EMAIL_DEVICE, self.client.session)
def test_model_properties_and_methods(self):
"""Test model properties"""
device = self.device
stage = self.stage
self.assertEqual(stage.serializer, AuthenticatorEmailStageSerializer)
self.assertIsInstance(stage.backend, EmailBackend)
self.assertEqual(device.serializer, EmailDeviceSerializer)
# Test AuthenticatorEmailStage send method
with patch(
"authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
return_value=EmailBackend,
):
self.device.generate_token()
# Test EmailDevice _compose_email method
message = self.device._compose_email()
self.assertIsInstance(message, TemplateEmailMessage)
self.assertEqual(message.subject, self.stage.subject)
self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
self.assertTrue(self.device.token in message.body)
# Test AuthenticatorEmailStage send method
self.stage.send(device)
def test_email_tasks(self):
email_send_mock = MagicMock()
with patch(
"authentik.stages.email.tasks.send_mails",
email_send_mock,
):
# Test AuthenticatorEmailStage send method
self.stage.send(self.device)
email_send_mock.assert_called_once()

View File

@ -1,17 +0,0 @@
"""API URLs"""
from authentik.stages.authenticator_email.api import (
AuthenticatorEmailStageViewSet,
EmailAdminDeviceViewSet,
EmailDeviceViewSet,
)
api_urlpatterns = [
("authenticators/email", EmailDeviceViewSet),
(
"authenticators/admin/email",
EmailAdminDeviceViewSet,
"admin-emaildevice",
),
("stages/authenticator/email", AuthenticatorEmailStageViewSet),
]

View File

@ -26,13 +26,10 @@ from authentik.events.middleware import audit_ignore
from authentik.events.models import Event, EventAction
from authentik.flows.stage import StageView
from authentik.flows.views.executor import SESSION_KEY_APPLICATION_PRE
from authentik.lib.utils.email import mask_email
from authentik.lib.utils.time import timedelta_from_string
from authentik.root.middleware import ClientIPMiddleware
from authentik.stages.authenticator import match_token
from authentik.stages.authenticator.models import Device
from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice
from authentik.stages.authenticator_email.models import EmailDevice
from authentik.stages.authenticator_sms.models import SMSDevice
from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
from authentik.stages.authenticator_webauthn.models import UserVerification, WebAuthnDevice
@ -57,8 +54,6 @@ def get_challenge_for_device(
"""Generate challenge for a single device"""
if isinstance(device, WebAuthnDevice):
return get_webauthn_challenge(request, stage, device)
if isinstance(device, EmailDevice):
return {"email": mask_email(device.email)}
# Code-based challenges have no hints
return {}
@ -108,8 +103,6 @@ def select_challenge(request: HttpRequest, device: Device):
"""Callback when the user selected a challenge in the frontend."""
if isinstance(device, SMSDevice):
select_challenge_sms(request, device)
elif isinstance(device, EmailDevice):
select_challenge_email(request, device)
def select_challenge_sms(request: HttpRequest, device: SMSDevice):
@ -118,13 +111,6 @@ def select_challenge_sms(request: HttpRequest, device: SMSDevice):
device.stage.send(device.token, device)
def select_challenge_email(request: HttpRequest, device: EmailDevice):
"""Send Email"""
valid_secs: int = timedelta_from_string(device.stage.token_expiry).total_seconds()
device.generate_token(valid_secs=valid_secs)
device.stage.send(device)
def validate_challenge_code(code: str, stage_view: StageView, user: User) -> Device:
"""Validate code-based challenges. We test against every device, on purpose, as
the user mustn't choose between totp and static devices."""

View File

@ -1,37 +0,0 @@
# Generated by Django 5.0.10 on 2025-01-16 02:48
import authentik.stages.authenticator_validate.models
import django.contrib.postgres.fields
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
(
"authentik_stages_authenticator_validate",
"0013_authenticatorvalidatestage_webauthn_allowed_device_types",
),
]
operations = [
migrations.AlterField(
model_name="authenticatorvalidatestage",
name="device_classes",
field=django.contrib.postgres.fields.ArrayField(
base_field=models.TextField(
choices=[
("static", "Static"),
("totp", "TOTP"),
("webauthn", "WebAuthn"),
("duo", "Duo"),
("sms", "SMS"),
("email", "Email"),
]
),
default=authentik.stages.authenticator_validate.models.default_device_classes,
help_text="Device classes which can be used to authenticate",
size=None,
),
),
]

View File

@ -20,7 +20,6 @@ class DeviceClasses(models.TextChoices):
WEBAUTHN = "webauthn", _("WebAuthn")
DUO = "duo", _("Duo")
SMS = "sms", _("SMS")
EMAIL = "email", _("Email")
def default_device_classes() -> list:
@ -31,7 +30,6 @@ def default_device_classes() -> list:
DeviceClasses.WEBAUTHN,
DeviceClasses.DUO,
DeviceClasses.SMS,
DeviceClasses.EMAIL,
]

View File

@ -23,7 +23,6 @@ from authentik.flows.stage import ChallengeStageView
from authentik.lib.utils.time import timedelta_from_string
from authentik.stages.authenticator import devices_for_user
from authentik.stages.authenticator.models import Device
from authentik.stages.authenticator_email.models import EmailDevice
from authentik.stages.authenticator_sms.models import SMSDevice
from authentik.stages.authenticator_validate.challenge import (
DeviceChallenge,
@ -85,9 +84,7 @@ class AuthenticatorValidationChallengeResponse(ChallengeResponse):
def validate_code(self, code: str) -> str:
"""Validate code-based response, raise error if code isn't allowed"""
self._challenge_allowed(
[DeviceClasses.TOTP, DeviceClasses.STATIC, DeviceClasses.SMS, DeviceClasses.EMAIL]
)
self._challenge_allowed([DeviceClasses.TOTP, DeviceClasses.STATIC, DeviceClasses.SMS])
self.device = validate_challenge_code(code, self.stage, self.stage.get_pending_user())
return code
@ -120,17 +117,12 @@ class AuthenticatorValidationChallengeResponse(ChallengeResponse):
if not allowed:
raise ValidationError("invalid challenge selected")
device_class = challenge.get("device_class", "")
if device_class == "sms":
devices = SMSDevice.objects.filter(pk=int(challenge.get("device_uid", "0")))
if not devices.exists():
raise ValidationError("invalid challenge selected")
select_challenge(self.stage.request, devices.first())
elif device_class == "email":
devices = EmailDevice.objects.filter(pk=int(challenge.get("device_uid", "0")))
if not devices.exists():
raise ValidationError("invalid challenge selected")
select_challenge(self.stage.request, devices.first())
if challenge.get("device_class", "") != "sms":
return challenge
devices = SMSDevice.objects.filter(pk=int(challenge.get("device_uid", "0")))
if not devices.exists():
raise ValidationError("invalid challenge selected")
select_challenge(self.stage.request, devices.first())
return challenge
def validate_selected_stage(self, stage_pk: str) -> str:

View File

@ -1,183 +0,0 @@
"""Test validator stage for Email devices"""
from django.test.client import RequestFactory
from django.urls.base import reverse
from authentik.core.tests.utils import create_test_admin_user, create_test_flow
from authentik.flows.models import FlowStageBinding, NotConfiguredAction
from authentik.flows.tests import FlowTestCase
from authentik.lib.generators import generate_id
from authentik.lib.utils.email import mask_email
from authentik.stages.authenticator_email.models import AuthenticatorEmailStage, EmailDevice
from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
from authentik.stages.identification.models import IdentificationStage, UserFields
class AuthenticatorValidateStageEmailTests(FlowTestCase):
"""Test validator stage for Email devices"""
def setUp(self) -> None:
self.user = create_test_admin_user()
self.request_factory = RequestFactory()
# Create email authenticator stage
self.stage = AuthenticatorEmailStage.objects.create(
name="email-authenticator",
use_global_settings=True,
from_address="test@authentik.local",
)
# Create identification stage
self.ident_stage = IdentificationStage.objects.create(
name=generate_id(),
user_fields=[UserFields.USERNAME],
)
# Create validation stage
self.validate_stage = AuthenticatorValidateStage.objects.create(
name=generate_id(),
device_classes=[DeviceClasses.EMAIL],
)
# Create flow with both stages
self.flow = create_test_flow()
FlowStageBinding.objects.create(target=self.flow, stage=self.ident_stage, order=0)
FlowStageBinding.objects.create(target=self.flow, stage=self.validate_stage, order=1)
def _identify_user(self):
"""Helper to identify user in flow"""
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
{"uid_field": self.user.username},
follow=True,
)
self.assertEqual(response.status_code, 200)
return response
def _send_challenge(self, device):
"""Helper to send challenge for device"""
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
{
"component": "ak-stage-authenticator-validate",
"selected_challenge": {
"device_class": "email",
"device_uid": str(device.pk),
"challenge": {},
"last_used": device.last_used.isoformat() if device.last_used else None,
},
},
)
self.assertEqual(response.status_code, 200)
return response
def test_happy_path(self):
"""Test validator stage with valid code"""
# Create a device for our user
device = EmailDevice.objects.create(
user=self.user,
confirmed=True,
stage=self.stage,
email="xx@0.co",
) # Short email for testing purposes
# First identify the user
self._identify_user()
# Send the challenge
response = self._send_challenge(device)
response_data = self.assertStageResponse(
response,
flow=self.flow,
component="ak-stage-authenticator-validate",
)
# Get the device challenge from the response and verify it matches
device_challenge = response_data["device_challenges"][0]
self.assertEqual(device_challenge["device_class"], "email")
self.assertEqual(device_challenge["device_uid"], str(device.pk))
self.assertEqual(device_challenge["challenge"], {"email": mask_email(device.email)})
# Generate a token for the device
device.generate_token()
# Submit the valid code
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
{"component": "ak-stage-authenticator-validate", "code": device.token},
)
# Should redirect to root since this is the last stage
self.assertStageRedirects(response, "/")
def test_no_device(self):
"""Test validator stage without configured device"""
configuration_stage = AuthenticatorEmailStage.objects.create(
name=generate_id(),
use_global_settings=True,
from_address="test@authentik.local",
)
stage = AuthenticatorValidateStage.objects.create(
name=generate_id(),
not_configured_action=NotConfiguredAction.CONFIGURE,
device_classes=[DeviceClasses.EMAIL],
)
stage.configuration_stages.set([configuration_stage])
flow = create_test_flow()
FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
{"component": "ak-stage-authenticator-validate"},
)
self.assertEqual(response.status_code, 200)
response_data = self.assertStageResponse(
response,
flow=flow,
component="ak-stage-authenticator-validate",
)
self.assertEqual(response_data["configuration_stages"], [])
self.assertEqual(response_data["device_challenges"], [])
self.assertEqual(
response_data["response_errors"],
{"non_field_errors": [{"code": "invalid", "string": "Empty response"}]},
)
def test_invalid_code(self):
"""Test validator stage with invalid code"""
# Create a device for our user
device = EmailDevice.objects.create(
user=self.user,
confirmed=True,
stage=self.stage,
email="test@authentik.local",
)
# First identify the user
self._identify_user()
# Send the challenge
self._send_challenge(device)
# Generate a token for the device
device.generate_token()
# Try invalid code and verify error message
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
{"component": "ak-stage-authenticator-validate", "code": "invalid"},
)
response_data = self.assertStageResponse(
response,
flow=self.flow,
component="ak-stage-authenticator-validate",
)
self.assertEqual(
response_data["response_errors"],
{
"code": [
{
"code": "invalid",
"string": (
"Invalid Token. Please ensure the time on your device "
"is accurate and try again."
),
}
],
},
)

File diff suppressed because one or more lines are too long

Some files were not shown because too many files have changed in this diff Show More