@ -43,7 +43,6 @@ from authentik.providers.rac.controllers.kubernetes import RACKubernetesControll
|
|||||||
from authentik.providers.radius.controllers.docker import RadiusDockerController
|
from authentik.providers.radius.controllers.docker import RadiusDockerController
|
||||||
from authentik.providers.radius.controllers.kubernetes import RadiusKubernetesController
|
from authentik.providers.radius.controllers.kubernetes import RadiusKubernetesController
|
||||||
from authentik.tasks.middleware import CurrentTask
|
from authentik.tasks.middleware import CurrentTask
|
||||||
from authentik.tasks.models import Task, TaskStatus
|
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
CACHE_KEY_OUTPOST_DOWN = "goauthentik.io/outposts/teardown/%s"
|
CACHE_KEY_OUTPOST_DOWN = "goauthentik.io/outposts/teardown/%s"
|
||||||
@ -86,9 +85,7 @@ def controller_for_outpost(outpost: Outpost) -> type[BaseController] | None:
|
|||||||
def outpost_service_connection_monitor(connection_pk: Any):
|
def outpost_service_connection_monitor(connection_pk: Any):
|
||||||
"""Update cached state of a service connection"""
|
"""Update cached state of a service connection"""
|
||||||
connection: OutpostServiceConnection = (
|
connection: OutpostServiceConnection = (
|
||||||
OutpostServiceConnection.objects.filter(pk=connection_pk)
|
OutpostServiceConnection.objects.filter(pk=connection_pk).select_subclasses().first()
|
||||||
.select_subclasses()
|
|
||||||
.first()
|
|
||||||
)
|
)
|
||||||
if not connection:
|
if not connection:
|
||||||
return
|
return
|
||||||
@ -130,13 +127,9 @@ def outpost_controller(outpost_pk: str, action: str = "up", from_cache: bool = F
|
|||||||
if not controller_type:
|
if not controller_type:
|
||||||
return
|
return
|
||||||
with controller_type(outpost, outpost.service_connection) as controller:
|
with controller_type(outpost, outpost.service_connection) as controller:
|
||||||
LOGGER.debug(
|
LOGGER.debug("---------------Outpost Controller logs starting----------------")
|
||||||
"---------------Outpost Controller logs starting----------------"
|
|
||||||
)
|
|
||||||
logs = getattr(controller, f"{action}_with_logs")()
|
logs = getattr(controller, f"{action}_with_logs")()
|
||||||
LOGGER.debug(
|
LOGGER.debug("-----------------Outpost Controller logs end-------------------")
|
||||||
"-----------------Outpost Controller logs end-------------------"
|
|
||||||
)
|
|
||||||
except (ControllerException, ServiceConnectionInvalid) as exc:
|
except (ControllerException, ServiceConnectionInvalid) as exc:
|
||||||
self.error(exc)
|
self.error(exc)
|
||||||
else:
|
else:
|
||||||
@ -178,9 +171,7 @@ def outpost_post_save(model_class: str, model_pk: Any):
|
|||||||
schedule.send()
|
schedule.send()
|
||||||
|
|
||||||
if isinstance(instance, OutpostModel | Outpost):
|
if isinstance(instance, OutpostModel | Outpost):
|
||||||
LOGGER.debug(
|
LOGGER.debug("triggering outpost update from outpostmodel/outpost", instance=instance)
|
||||||
"triggering outpost update from outpostmodel/outpost", instance=instance
|
|
||||||
)
|
|
||||||
outpost_send_update(instance)
|
outpost_send_update(instance)
|
||||||
|
|
||||||
if isinstance(instance, OutpostServiceConnection):
|
if isinstance(instance, OutpostServiceConnection):
|
||||||
@ -254,9 +245,7 @@ def outpost_connection_discovery():
|
|||||||
if kubeconfig_path.exists():
|
if kubeconfig_path.exists():
|
||||||
self.info("Detected kubeconfig")
|
self.info("Detected kubeconfig")
|
||||||
kubeconfig_local_name = f"k8s-{gethostname()}"
|
kubeconfig_local_name = f"k8s-{gethostname()}"
|
||||||
if not KubernetesServiceConnection.objects.filter(
|
if not KubernetesServiceConnection.objects.filter(name=kubeconfig_local_name).exists():
|
||||||
name=kubeconfig_local_name
|
|
||||||
).exists():
|
|
||||||
self.info("Creating kubeconfig Service Connection")
|
self.info("Creating kubeconfig Service Connection")
|
||||||
with kubeconfig_path.open("r", encoding="utf8") as _kubeconfig:
|
with kubeconfig_path.open("r", encoding="utf8") as _kubeconfig:
|
||||||
KubernetesServiceConnection.objects.create(
|
KubernetesServiceConnection.objects.create(
|
||||||
|
|||||||
@ -13,7 +13,7 @@ from authentik.tenants.utils import get_current_tenant
|
|||||||
|
|
||||||
|
|
||||||
class TaskSerializer(ModelSerializer):
|
class TaskSerializer(ModelSerializer):
|
||||||
messages = LogEventSerializer(many=True)
|
messages = LogEventSerializer(many=True, source="_messages")
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
model = Task
|
model = Task
|
||||||
@ -23,6 +23,8 @@ class TaskSerializer(ModelSerializer):
|
|||||||
"actor_name",
|
"actor_name",
|
||||||
"state",
|
"state",
|
||||||
"mtime",
|
"mtime",
|
||||||
|
"rel_obj_content_type",
|
||||||
|
"rel_obj_id",
|
||||||
"uid",
|
"uid",
|
||||||
"messages",
|
"messages",
|
||||||
]
|
]
|
||||||
|
|||||||
@ -30,14 +30,6 @@ class TaskState(models.TextChoices):
|
|||||||
DONE = "done"
|
DONE = "done"
|
||||||
|
|
||||||
|
|
||||||
class TaskStatus(models.TextChoices):
|
|
||||||
"""Task soft-state. Self-reported by the task"""
|
|
||||||
|
|
||||||
INFO = "info"
|
|
||||||
WARNING = "warning"
|
|
||||||
ERROR = "error"
|
|
||||||
|
|
||||||
|
|
||||||
class Task(SerializerModel):
|
class Task(SerializerModel):
|
||||||
message_id = models.UUIDField(primary_key=True, default=uuid4)
|
message_id = models.UUIDField(primary_key=True, default=uuid4)
|
||||||
queue_name = models.TextField(default="default", help_text=_("Queue name"))
|
queue_name = models.TextField(default="default", help_text=_("Queue name"))
|
||||||
@ -64,7 +56,9 @@ class Task(SerializerModel):
|
|||||||
rel_obj = GenericForeignKey("rel_obj_content_type", "rel_obj_id")
|
rel_obj = GenericForeignKey("rel_obj_content_type", "rel_obj_id")
|
||||||
|
|
||||||
_uid = models.TextField(blank=True, null=True)
|
_uid = models.TextField(blank=True, null=True)
|
||||||
messages = models.JSONField(default=list)
|
_messages = models.JSONField(default=list)
|
||||||
|
|
||||||
|
aggregated_status = models.TextField()
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
verbose_name = _("Task")
|
verbose_name = _("Task")
|
||||||
@ -94,6 +88,19 @@ class Task(SerializerModel):
|
|||||||
def __str__(self):
|
def __str__(self):
|
||||||
return str(self.message_id)
|
return str(self.message_id)
|
||||||
|
|
||||||
|
def update_aggregated_status(self):
|
||||||
|
if self.state != TaskState.DONE:
|
||||||
|
self.aggregated_status = self.state
|
||||||
|
return
|
||||||
|
status = "info"
|
||||||
|
for message in self._messages:
|
||||||
|
message_level = message["log_level"]
|
||||||
|
if status == "info" and message_level in ("warning", "error"):
|
||||||
|
status = message_level
|
||||||
|
if status == "warning" and message_level == "error":
|
||||||
|
status = message_level
|
||||||
|
self.aggregated_status = status
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def uid(self) -> str:
|
def uid(self) -> str:
|
||||||
uid = str(self.actor_name)
|
uid = str(self.actor_name)
|
||||||
@ -112,20 +119,21 @@ class Task(SerializerModel):
|
|||||||
if save:
|
if save:
|
||||||
self.save()
|
self.save()
|
||||||
|
|
||||||
def log(self, status: TaskStatus, message: str | Exception, save: bool = False, **attributes):
|
def log(self, log_level: str, message: str | Exception, save: bool = False, **attributes):
|
||||||
self.messages: list
|
self._messages: list
|
||||||
if isinstance(message, Exception):
|
if isinstance(message, Exception):
|
||||||
message = exception_to_string(message)
|
message = exception_to_string(message)
|
||||||
message = LogEvent(message, logger=self.uid, log_level=status.value, attributes=attributes)
|
log = LogEvent(message, logger=self.uid, log_level=status.value, attributes=attributes)
|
||||||
self.messages.append(sanitize_item(message))
|
self._messages.append(sanitize_item(log))
|
||||||
|
self.update_aggregated_status()
|
||||||
if save:
|
if save:
|
||||||
self.save()
|
self.save()
|
||||||
|
|
||||||
def info(self, message: str | Exception, save: bool = False, **attributes):
|
def info(self, message: str | Exception, save: bool = False, **attributes):
|
||||||
self.log(TaskStatus.INFO, message, save=save, **attributes)
|
self.log("info", message, save=save, **attributes)
|
||||||
|
|
||||||
def warning(self, message: str | Exception, save: bool = False, **attributes):
|
def warning(self, message: str | Exception, save: bool = False, **attributes):
|
||||||
self.log(TaskStatus.WARNING, message, save=save, **attributes)
|
self.log("warning", message, save=save, **attributes)
|
||||||
|
|
||||||
def error(self, message: str | Exception, save: bool = False, **attributes):
|
def error(self, message: str | Exception, save: bool = False, **attributes):
|
||||||
self.log(TaskStatus.ERROR, message, save=save, **attributes)
|
self.log("error", message, save=save, **attributes)
|
||||||
|
|||||||
Reference in New Issue
Block a user