@ -152,7 +152,7 @@ class OutpostServiceConnection(ScheduledModel, models.Model):
|
||||
|
||||
state = cache.get(self.state_key, None)
|
||||
if not state:
|
||||
outpost_service_connection_monitor.send(self.pk)
|
||||
outpost_service_connection_monitor.send_with_options(args=(self.pk), rel_obj=self)
|
||||
return OutpostServiceConnectionState("", False)
|
||||
return state
|
||||
|
||||
@ -165,9 +165,11 @@ class OutpostServiceConnection(ScheduledModel, models.Model):
|
||||
|
||||
@property
|
||||
def schedule_specs(self) -> list[ScheduleSpec]:
|
||||
from authentik.outposts.tasks import outpost_service_connection_monitor
|
||||
|
||||
return [
|
||||
ScheduleSpec(
|
||||
actor_name="authentik.outposts.tasks.outpost_service_connection_monitor",
|
||||
actor_name=outpost_service_connection_monitor.actor_name,
|
||||
uid=self.pk,
|
||||
args=(self.pk,),
|
||||
crontab="3-59/15 * * * *",
|
||||
@ -315,9 +317,11 @@ class Outpost(ScheduledModel, SerializerModel, ManagedModel):
|
||||
|
||||
@property
|
||||
def schedule_specs(self) -> list[ScheduleSpec]:
|
||||
from authentik.outposts.tasks import outpost_controller
|
||||
|
||||
return [
|
||||
ScheduleSpec(
|
||||
actor_name="authentik.outposts.tasks.outpost_controller",
|
||||
actor_name=outpost_controller.actor_name,
|
||||
uid=self.pk,
|
||||
args=(self.pk,),
|
||||
kwargs={"action": "up", "from_cache": False},
|
||||
|
||||
@ -39,14 +39,22 @@ def pre_save_outpost(sender, instance: Outpost, **_):
|
||||
if bool(dirty):
|
||||
LOGGER.info("Outpost needs re-deployment due to changes", instance=instance)
|
||||
cache.set(CACHE_KEY_OUTPOST_DOWN % instance.pk.hex, old_instance)
|
||||
outpost_controller.send(instance.pk.hex, action="down", from_cache=True)
|
||||
outpost_controller.send_with_options(
|
||||
args=(instance.pk.hex,),
|
||||
kwargs={"action": "down", "from_cache": True},
|
||||
rel_obj=instance,
|
||||
)
|
||||
|
||||
|
||||
@receiver(m2m_changed, sender=Outpost.providers.through)
|
||||
def m2m_changed_update(sender, instance: Model, action: str, **_):
|
||||
"""Update outpost on m2m change, when providers are added or removed"""
|
||||
if action in ["post_add", "post_remove", "post_clear"]:
|
||||
outpost_post_save.send(class_to_path(instance.__class__), instance.pk)
|
||||
outpost_post_save.send_with_options(
|
||||
args=(class_to_path(instance.__class__), instance.pk),
|
||||
# TODO: how do we get the outpost here, if it makes sense
|
||||
rel_obj=None,
|
||||
)
|
||||
|
||||
|
||||
@receiver(post_save)
|
||||
@ -64,7 +72,11 @@ def post_save_update(sender, instance: Model, created: bool, **_):
|
||||
if isinstance(instance, Outpost) and created:
|
||||
LOGGER.info("New outpost saved, ensuring initial token and user are created")
|
||||
_ = instance.token
|
||||
outpost_post_save.send(class_to_path(instance.__class__), instance.pk)
|
||||
outpost_post_save.send_with_options(
|
||||
args=(class_to_path(instance.__class__), instance.pk),
|
||||
# TODO: how do we get the outpost here, if it makes sense
|
||||
rel_obj=None,
|
||||
)
|
||||
|
||||
|
||||
@receiver(pre_delete, sender=Outpost)
|
||||
|
||||
@ -104,7 +104,8 @@ def outpost_service_connection_monitor(connection_pk: Any):
|
||||
@actor
|
||||
def outpost_controller(outpost_pk: str, action: str = "up", from_cache: bool = False):
|
||||
"""Create/update/monitor/delete the deployment of an Outpost"""
|
||||
self: Task = CurrentTask.get_task()
|
||||
self = CurrentTask.get_task()
|
||||
self.set_uid(outpost_pk)
|
||||
logs = []
|
||||
if from_cache:
|
||||
outpost: Outpost = cache.get(CACHE_KEY_OUTPOST_DOWN % outpost_pk)
|
||||
@ -125,11 +126,11 @@ def outpost_controller(outpost_pk: str, action: str = "up", from_cache: bool = F
|
||||
logs = getattr(controller, f"{action}_with_logs")()
|
||||
LOGGER.debug("-----------------Outpost Controller logs end-------------------")
|
||||
except (ControllerException, ServiceConnectionInvalid) as exc:
|
||||
self.set_error(exc)
|
||||
self.error(exc)
|
||||
else:
|
||||
if from_cache:
|
||||
cache.delete(CACHE_KEY_OUTPOST_DOWN % outpost_pk)
|
||||
self.set_status(TaskStatus.SUCCESSFUL, *logs)
|
||||
self.info(*logs)
|
||||
|
||||
|
||||
@actor
|
||||
@ -137,15 +138,12 @@ def outpost_token_ensurer():
|
||||
"""
|
||||
Periodically ensure that all Outposts have valid Service Accounts and Tokens
|
||||
"""
|
||||
self: Task = CurrentTask.get_task()
|
||||
self = CurrentTask.get_task()
|
||||
all_outposts = Outpost.objects.all()
|
||||
for outpost in all_outposts:
|
||||
_ = outpost.token
|
||||
outpost.build_user_permissions(outpost.user)
|
||||
self.set_status(
|
||||
TaskStatus.SUCCESSFUL,
|
||||
f"Successfully checked {len(all_outposts)} Outposts.",
|
||||
)
|
||||
self.info(f"Successfully checked {len(all_outposts)} Outposts.")
|
||||
|
||||
|
||||
@actor
|
||||
|
||||
Reference in New Issue
Block a user