diff --git a/authentik/outposts/models.py b/authentik/outposts/models.py index b84f0e02a6..871de3cff6 100644 --- a/authentik/outposts/models.py +++ b/authentik/outposts/models.py @@ -152,7 +152,7 @@ class OutpostServiceConnection(ScheduledModel, models.Model): state = cache.get(self.state_key, None) if not state: - outpost_service_connection_monitor.send(self.pk) + outpost_service_connection_monitor.send_with_options(args=(self.pk), rel_obj=self) return OutpostServiceConnectionState("", False) return state @@ -165,9 +165,11 @@ class OutpostServiceConnection(ScheduledModel, models.Model): @property def schedule_specs(self) -> list[ScheduleSpec]: + from authentik.outposts.tasks import outpost_service_connection_monitor + return [ ScheduleSpec( - actor_name="authentik.outposts.tasks.outpost_service_connection_monitor", + actor_name=outpost_service_connection_monitor.actor_name, uid=self.pk, args=(self.pk,), crontab="3-59/15 * * * *", @@ -315,9 +317,11 @@ class Outpost(ScheduledModel, SerializerModel, ManagedModel): @property def schedule_specs(self) -> list[ScheduleSpec]: + from authentik.outposts.tasks import outpost_controller + return [ ScheduleSpec( - actor_name="authentik.outposts.tasks.outpost_controller", + actor_name=outpost_controller.actor_name, uid=self.pk, args=(self.pk,), kwargs={"action": "up", "from_cache": False}, diff --git a/authentik/outposts/signals.py b/authentik/outposts/signals.py index 37b9fb777b..0a2145e420 100644 --- a/authentik/outposts/signals.py +++ b/authentik/outposts/signals.py @@ -39,14 +39,22 @@ def pre_save_outpost(sender, instance: Outpost, **_): if bool(dirty): LOGGER.info("Outpost needs re-deployment due to changes", instance=instance) cache.set(CACHE_KEY_OUTPOST_DOWN % instance.pk.hex, old_instance) - outpost_controller.send(instance.pk.hex, action="down", from_cache=True) + outpost_controller.send_with_options( + args=(instance.pk.hex,), + kwargs={"action": "down", "from_cache": True}, + rel_obj=instance, + ) @receiver(m2m_changed, sender=Outpost.providers.through) def m2m_changed_update(sender, instance: Model, action: str, **_): """Update outpost on m2m change, when providers are added or removed""" if action in ["post_add", "post_remove", "post_clear"]: - outpost_post_save.send(class_to_path(instance.__class__), instance.pk) + outpost_post_save.send_with_options( + args=(class_to_path(instance.__class__), instance.pk), + # TODO: how do we get the outpost here, if it makes sense + rel_obj=None, + ) @receiver(post_save) @@ -64,7 +72,11 @@ def post_save_update(sender, instance: Model, created: bool, **_): if isinstance(instance, Outpost) and created: LOGGER.info("New outpost saved, ensuring initial token and user are created") _ = instance.token - outpost_post_save.send(class_to_path(instance.__class__), instance.pk) + outpost_post_save.send_with_options( + args=(class_to_path(instance.__class__), instance.pk), + # TODO: how do we get the outpost here, if it makes sense + rel_obj=None, + ) @receiver(pre_delete, sender=Outpost) diff --git a/authentik/outposts/tasks.py b/authentik/outposts/tasks.py index 6010e90388..3fcbafa305 100644 --- a/authentik/outposts/tasks.py +++ b/authentik/outposts/tasks.py @@ -104,7 +104,8 @@ def outpost_service_connection_monitor(connection_pk: Any): @actor def outpost_controller(outpost_pk: str, action: str = "up", from_cache: bool = False): """Create/update/monitor/delete the deployment of an Outpost""" - self: Task = CurrentTask.get_task() + self = CurrentTask.get_task() + self.set_uid(outpost_pk) logs = [] if from_cache: outpost: Outpost = cache.get(CACHE_KEY_OUTPOST_DOWN % outpost_pk) @@ -125,11 +126,11 @@ def outpost_controller(outpost_pk: str, action: str = "up", from_cache: bool = F logs = getattr(controller, f"{action}_with_logs")() LOGGER.debug("-----------------Outpost Controller logs end-------------------") except (ControllerException, ServiceConnectionInvalid) as exc: - self.set_error(exc) + self.error(exc) else: if from_cache: cache.delete(CACHE_KEY_OUTPOST_DOWN % outpost_pk) - self.set_status(TaskStatus.SUCCESSFUL, *logs) + self.info(*logs) @actor @@ -137,15 +138,12 @@ def outpost_token_ensurer(): """ Periodically ensure that all Outposts have valid Service Accounts and Tokens """ - self: Task = CurrentTask.get_task() + self = CurrentTask.get_task() all_outposts = Outpost.objects.all() for outpost in all_outposts: _ = outpost.token outpost.build_user_permissions(outpost.user) - self.set_status( - TaskStatus.SUCCESSFUL, - f"Successfully checked {len(all_outposts)} Outposts.", - ) + self.info(f"Successfully checked {len(all_outposts)} Outposts.") @actor