Files
authentik/authentik/outposts/tasks.py
Jens L. 88fa7e37dc outposts: Refactor session end signal and add LDAP support (#14539)
* outpost: promote session end signal to non-provider specific

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* implement server-side logout in ldap

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix previous import

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* use better retry logic

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* log

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* make more generic if we switch from ws to something else

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* make it possible to e2e test WS

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix ldap session id

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* ok I actually need to go to bed this took me an hour to fix

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* format; add ldap test

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix leftover state

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* remove thread

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* use ws base for radius

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* separate test utils

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* rename

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix missing super calls

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* websocket tests with browser 🎉

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add proxy test for sign out

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix install_id issue with channels tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix proxy basic auth test

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* big code dedupe

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* allow passing go build args

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* improve waiting for outpost

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* rewrite ldap tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* ok actually fix the tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* undo a couple things that need more time to cook

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* remove unused lockfile-lint dependency since we use a shell script and SFE does not have a lockfile

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix session id for ldap

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix missing createTimestamp and modifyTimestamp ldap attributes

closes #10474

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
2025-06-10 12:11:21 +02:00

315 lines
13 KiB
Python

"""outpost tasks"""
from hashlib import sha256
from os import R_OK, access
from pathlib import Path
from socket import gethostname
from typing import Any
from urllib.parse import urlparse
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.core.cache import cache
from django.db import DatabaseError, InternalError, ProgrammingError
from django.db.models.base import Model
from django.utils.text import slugify
from docker.constants import DEFAULT_UNIX_SOCKET
from kubernetes.config.incluster_config import SERVICE_TOKEN_FILENAME
from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION
from structlog.stdlib import get_logger
from yaml import safe_load
from authentik.events.models import TaskStatus
from authentik.events.system_tasks import SystemTask, prefill_task
from authentik.lib.config import CONFIG
from authentik.lib.utils.reflection import path_to_class
from authentik.outposts.consumer import OUTPOST_GROUP
from authentik.outposts.controllers.base import BaseController, ControllerException
from authentik.outposts.controllers.docker import DockerClient
from authentik.outposts.controllers.kubernetes import KubernetesClient
from authentik.outposts.models import (
DockerServiceConnection,
KubernetesServiceConnection,
Outpost,
OutpostModel,
OutpostServiceConnection,
OutpostType,
ServiceConnectionInvalid,
)
from authentik.providers.ldap.controllers.docker import LDAPDockerController
from authentik.providers.ldap.controllers.kubernetes import LDAPKubernetesController
from authentik.providers.proxy.controllers.docker import ProxyDockerController
from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesController
from authentik.providers.rac.controllers.docker import RACDockerController
from authentik.providers.rac.controllers.kubernetes import RACKubernetesController
from authentik.providers.radius.controllers.docker import RadiusDockerController
from authentik.providers.radius.controllers.kubernetes import RadiusKubernetesController
from authentik.root.celery import CELERY_APP
LOGGER = get_logger()
CACHE_KEY_OUTPOST_DOWN = "goauthentik.io/outposts/teardown/%s"
def hash_session_key(session_key: str) -> str:
"""Hash the session key for sending session end signals"""
return sha256(session_key.encode("ascii")).hexdigest()
def controller_for_outpost(outpost: Outpost) -> type[BaseController] | None:
"""Get a controller for the outpost, when a service connection is defined"""
if not outpost.service_connection:
return None
service_connection = outpost.service_connection
if outpost.type == OutpostType.PROXY:
if isinstance(service_connection, DockerServiceConnection):
return ProxyDockerController
if isinstance(service_connection, KubernetesServiceConnection):
return ProxyKubernetesController
if outpost.type == OutpostType.LDAP:
if isinstance(service_connection, DockerServiceConnection):
return LDAPDockerController
if isinstance(service_connection, KubernetesServiceConnection):
return LDAPKubernetesController
if outpost.type == OutpostType.RADIUS:
if isinstance(service_connection, DockerServiceConnection):
return RadiusDockerController
if isinstance(service_connection, KubernetesServiceConnection):
return RadiusKubernetesController
if outpost.type == OutpostType.RAC:
if isinstance(service_connection, DockerServiceConnection):
return RACDockerController
if isinstance(service_connection, KubernetesServiceConnection):
return RACKubernetesController
return None
@CELERY_APP.task()
def outpost_service_connection_state(connection_pk: Any):
"""Update cached state of a service connection"""
connection: OutpostServiceConnection = (
OutpostServiceConnection.objects.filter(pk=connection_pk).select_subclasses().first()
)
if not connection:
return
cls = None
if isinstance(connection, DockerServiceConnection):
cls = DockerClient
if isinstance(connection, KubernetesServiceConnection):
cls = KubernetesClient
if not cls:
LOGGER.warning("No class found for service connection", connection=connection)
return
try:
with cls(connection) as client:
state = client.fetch_state()
except ServiceConnectionInvalid as exc:
LOGGER.warning("Failed to get client status", exc=exc)
return
cache.set(connection.state_key, state, timeout=None)
@CELERY_APP.task(
bind=True,
base=SystemTask,
throws=(DatabaseError, ProgrammingError, InternalError),
)
@prefill_task
def outpost_service_connection_monitor(self: SystemTask):
"""Regularly check the state of Outpost Service Connections"""
connections = OutpostServiceConnection.objects.all()
for connection in connections.iterator():
outpost_service_connection_state.delay(connection.pk)
self.set_status(
TaskStatus.SUCCESSFUL,
f"Successfully updated {len(connections)} connections.",
)
@CELERY_APP.task(
throws=(DatabaseError, ProgrammingError, InternalError),
)
def outpost_controller_all():
"""Launch Controller for all Outposts which support it"""
for outpost in Outpost.objects.exclude(service_connection=None):
outpost_controller.delay(outpost.pk.hex, "up", from_cache=False)
@CELERY_APP.task(bind=True, base=SystemTask)
def outpost_controller(
self: SystemTask, outpost_pk: str, action: str = "up", from_cache: bool = False
):
"""Create/update/monitor/delete the deployment of an Outpost"""
logs = []
if from_cache:
outpost: Outpost = cache.get(CACHE_KEY_OUTPOST_DOWN % outpost_pk)
LOGGER.debug("Getting outpost from cache to delete")
else:
outpost: Outpost = Outpost.objects.filter(pk=outpost_pk).first()
LOGGER.debug("Getting outpost from DB")
if not outpost:
LOGGER.warning("No outpost")
return
self.set_uid(slugify(outpost.name))
try:
controller_type = controller_for_outpost(outpost)
if not controller_type:
return
with controller_type(outpost, outpost.service_connection) as controller:
LOGGER.debug("---------------Outpost Controller logs starting----------------")
logs = getattr(controller, f"{action}_with_logs")()
LOGGER.debug("-----------------Outpost Controller logs end-------------------")
except (ControllerException, ServiceConnectionInvalid) as exc:
self.set_error(exc)
else:
if from_cache:
cache.delete(CACHE_KEY_OUTPOST_DOWN % outpost_pk)
self.set_status(TaskStatus.SUCCESSFUL, *logs)
@CELERY_APP.task(bind=True, base=SystemTask)
@prefill_task
def outpost_token_ensurer(self: SystemTask):
"""Periodically ensure that all Outposts have valid Service Accounts
and Tokens"""
all_outposts = Outpost.objects.all()
for outpost in all_outposts:
_ = outpost.token
outpost.build_user_permissions(outpost.user)
self.set_status(
TaskStatus.SUCCESSFUL,
f"Successfully checked {len(all_outposts)} Outposts.",
)
@CELERY_APP.task()
def outpost_post_save(model_class: str, model_pk: Any):
"""If an Outpost is saved, Ensure that token is created/updated
If an OutpostModel, or a model that is somehow connected to an OutpostModel is saved,
we send a message down the relevant OutpostModels WS connection to trigger an update"""
model: Model = path_to_class(model_class)
try:
instance = model.objects.get(pk=model_pk)
except model.DoesNotExist:
LOGGER.warning("Model does not exist", model=model, pk=model_pk)
return
if isinstance(instance, Outpost):
LOGGER.debug("Trigger reconcile for outpost", instance=instance)
outpost_controller.delay(str(instance.pk))
if isinstance(instance, OutpostModel | Outpost):
LOGGER.debug("triggering outpost update from outpostmodel/outpost", instance=instance)
outpost_send_update(instance)
if isinstance(instance, OutpostServiceConnection):
LOGGER.debug("triggering ServiceConnection state update", instance=instance)
outpost_service_connection_state.delay(str(instance.pk))
for field in instance._meta.get_fields():
# Each field is checked if it has a `related_model` attribute (when ForeginKeys or M2Ms)
# are used, and if it has a value
if not hasattr(field, "related_model"):
continue
if not field.related_model:
continue
if not issubclass(field.related_model, OutpostModel):
continue
field_name = f"{field.name}_set"
if not hasattr(instance, field_name):
continue
LOGGER.debug("triggering outpost update from field", field=field.name)
# Because the Outpost Model has an M2M to Provider,
# we have to iterate over the entire QS
for reverse in getattr(instance, field_name).all():
outpost_send_update(reverse)
def outpost_send_update(model_instance: Model):
"""Send outpost update to all registered outposts, regardless to which authentik
instance they are connected"""
channel_layer = get_channel_layer()
if isinstance(model_instance, OutpostModel):
for outpost in model_instance.outpost_set.all():
_outpost_single_update(outpost, channel_layer)
elif isinstance(model_instance, Outpost):
_outpost_single_update(model_instance, channel_layer)
def _outpost_single_update(outpost: Outpost, layer=None):
"""Update outpost instances connected to a single outpost"""
# Ensure token again, because this function is called when anything related to an
# OutpostModel is saved, so we can be sure permissions are right
_ = outpost.token
outpost.build_user_permissions(outpost.user)
if not layer: # pragma: no cover
layer = get_channel_layer()
group = OUTPOST_GROUP % {"outpost_pk": str(outpost.pk)}
LOGGER.debug("sending update", channel=group, outpost=outpost)
async_to_sync(layer.group_send)(group, {"type": "event.update"})
@CELERY_APP.task(
base=SystemTask,
bind=True,
)
def outpost_connection_discovery(self: SystemTask):
"""Checks the local environment and create Service connections."""
messages = []
if not CONFIG.get_bool("outposts.discover"):
messages.append("Outpost integration discovery is disabled")
self.set_status(TaskStatus.SUCCESSFUL, *messages)
return
# Explicitly check against token filename, as that's
# only present when the integration is enabled
if Path(SERVICE_TOKEN_FILENAME).exists():
messages.append("Detected in-cluster Kubernetes Config")
if not KubernetesServiceConnection.objects.filter(local=True).exists():
messages.append("Created Service Connection for in-cluster")
KubernetesServiceConnection.objects.create(
name="Local Kubernetes Cluster", local=True, kubeconfig={}
)
# For development, check for the existence of a kubeconfig file
kubeconfig_path = Path(KUBE_CONFIG_DEFAULT_LOCATION).expanduser()
if kubeconfig_path.exists():
messages.append("Detected kubeconfig")
kubeconfig_local_name = f"k8s-{gethostname()}"
if not KubernetesServiceConnection.objects.filter(name=kubeconfig_local_name).exists():
messages.append("Creating kubeconfig Service Connection")
with kubeconfig_path.open("r", encoding="utf8") as _kubeconfig:
KubernetesServiceConnection.objects.create(
name=kubeconfig_local_name,
kubeconfig=safe_load(_kubeconfig),
)
unix_socket_path = urlparse(DEFAULT_UNIX_SOCKET).path
socket = Path(unix_socket_path)
if socket.exists() and access(socket, R_OK):
messages.append("Detected local docker socket")
if len(DockerServiceConnection.objects.filter(local=True)) == 0:
messages.append("Created Service Connection for docker")
DockerServiceConnection.objects.create(
name="Local Docker connection",
local=True,
url=unix_socket_path,
)
self.set_status(TaskStatus.SUCCESSFUL, *messages)
@CELERY_APP.task()
def outpost_session_end(session_id: str):
"""Update outpost instances connected to a single outpost"""
layer = get_channel_layer()
hashed_session_id = hash_session_key(session_id)
for outpost in Outpost.objects.all():
LOGGER.info("Sending session end signal to outpost", outpost=outpost)
group = OUTPOST_GROUP % {"outpost_pk": str(outpost.pk)}
async_to_sync(layer.group_send)(
group,
{
"type": "event.session.end",
"session_id": hashed_session_id,
},
)