Merge branch 'main' into celery-2-dramatiq

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
Marc 'risson' Schmitt
2025-06-10 14:49:45 +02:00
314 changed files with 3185 additions and 3009 deletions

View File

@ -1,5 +1,6 @@
"""outpost tasks"""
from hashlib import sha256
from os import R_OK, access
from pathlib import Path
from socket import gethostname
@ -48,6 +49,11 @@ LOGGER = get_logger()
CACHE_KEY_OUTPOST_DOWN = "goauthentik.io/outposts/teardown/%s"
def hash_session_key(session_key: str) -> str:
"""Hash the session key for sending session end signals"""
return sha256(session_key.encode("ascii")).hexdigest()
def controller_for_outpost(outpost: Outpost) -> type[BaseController] | None:
"""Get a controller for the outpost, when a service connection is defined"""
if not outpost.service_connection:
@ -80,7 +86,9 @@ def controller_for_outpost(outpost: Outpost) -> type[BaseController] | None:
def outpost_service_connection_monitor(connection_pk: Any):
"""Update cached state of a service connection"""
connection: OutpostServiceConnection = (
OutpostServiceConnection.objects.filter(pk=connection_pk).select_subclasses().first()
OutpostServiceConnection.objects.filter(pk=connection_pk)
.select_subclasses()
.first()
)
if not connection:
return
@ -122,9 +130,13 @@ def outpost_controller(outpost_pk: str, action: str = "up", from_cache: bool = F
if not controller_type:
return
with controller_type(outpost, outpost.service_connection) as controller:
LOGGER.debug("---------------Outpost Controller logs starting----------------")
LOGGER.debug(
"---------------Outpost Controller logs starting----------------"
)
logs = getattr(controller, f"{action}_with_logs")()
LOGGER.debug("-----------------Outpost Controller logs end-------------------")
LOGGER.debug(
"-----------------Outpost Controller logs end-------------------"
)
except (ControllerException, ServiceConnectionInvalid) as exc:
self.error(exc)
else:
@ -166,7 +178,9 @@ def outpost_post_save(model_class: str, model_pk: Any):
schedule.send()
if isinstance(instance, OutpostModel | Outpost):
LOGGER.debug("triggering outpost update from outpostmodel/outpost", instance=instance)
LOGGER.debug(
"triggering outpost update from outpostmodel/outpost", instance=instance
)
outpost_send_update(instance)
if isinstance(instance, OutpostServiceConnection):
@ -240,7 +254,9 @@ def outpost_connection_discovery():
if kubeconfig_path.exists():
self.info("Detected kubeconfig")
kubeconfig_local_name = f"k8s-{gethostname()}"
if not KubernetesServiceConnection.objects.filter(name=kubeconfig_local_name).exists():
if not KubernetesServiceConnection.objects.filter(
name=kubeconfig_local_name
).exists():
self.info("Creating kubeconfig Service Connection")
with kubeconfig_path.open("r", encoding="utf8") as _kubeconfig:
KubernetesServiceConnection.objects.create(
@ -258,3 +274,20 @@ def outpost_connection_discovery():
local=True,
url=unix_socket_path,
)
@actor
def outpost_session_end(session_id: str):
"""Update outpost instances connected to a single outpost"""
layer = get_channel_layer()
hashed_session_id = hash_session_key(session_id)
for outpost in Outpost.objects.all():
LOGGER.info("Sending session end signal to outpost", outpost=outpost)
group = OUTPOST_GROUP % {"outpost_pk": str(outpost.pk)}
async_to_sync(layer.group_send)(
group,
{
"type": "event.session.end",
"session_id": hashed_session_id,
},
)