outposts: ensure all Service Connection state updates are done by the task
This commit is contained in:
		@ -35,6 +35,7 @@ from authentik.lib.models import InheritanceForeignKey
 | 
			
		||||
from authentik.lib.sentry import SentryIgnoredException
 | 
			
		||||
from authentik.lib.utils.template import render_to_string
 | 
			
		||||
from authentik.outposts.docker_tls import DockerInlineTLS
 | 
			
		||||
from authentik.outposts.tasks import outpost_service_connection_state
 | 
			
		||||
 | 
			
		||||
OUR_VERSION = parse(__version__)
 | 
			
		||||
OUTPOST_HELLO_INTERVAL = 10
 | 
			
		||||
@ -113,17 +114,22 @@ class OutpostServiceConnection(models.Model):
 | 
			
		||||
 | 
			
		||||
    objects = InheritanceManager()
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def state_key(self) -> str:
 | 
			
		||||
        """Key used to save connection state in cache"""
 | 
			
		||||
        return f"outpost_service_connection_{self.pk.hex}"
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def state(self) -> OutpostServiceConnectionState:
 | 
			
		||||
        """Get state of service connection"""
 | 
			
		||||
        state_key = f"outpost_service_connection_{self.pk.hex}"
 | 
			
		||||
        state = cache.get(state_key, None)
 | 
			
		||||
        state = cache.get(self.state_key, None)
 | 
			
		||||
        if not state:
 | 
			
		||||
            state = self._get_state()
 | 
			
		||||
            cache.set(state_key, state, timeout=0)
 | 
			
		||||
            outpost_service_connection_state.delay(self.pk)
 | 
			
		||||
            return OutpostServiceConnectionState("", False)
 | 
			
		||||
        return state
 | 
			
		||||
 | 
			
		||||
    def _get_state(self) -> OutpostServiceConnectionState:
 | 
			
		||||
    def fetch_state(self) -> OutpostServiceConnectionState:
 | 
			
		||||
        """Fetch current Service Connection state"""
 | 
			
		||||
        raise NotImplementedError
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
@ -203,7 +209,7 @@ class DockerServiceConnection(OutpostServiceConnection):
 | 
			
		||||
            raise ServiceConnectionInvalid from exc
 | 
			
		||||
        return client
 | 
			
		||||
 | 
			
		||||
    def _get_state(self) -> OutpostServiceConnectionState:
 | 
			
		||||
    def fetch_state(self) -> OutpostServiceConnectionState:
 | 
			
		||||
        try:
 | 
			
		||||
            client = self.client()
 | 
			
		||||
            return OutpostServiceConnectionState(
 | 
			
		||||
@ -239,7 +245,7 @@ class KubernetesServiceConnection(OutpostServiceConnection):
 | 
			
		||||
    def __str__(self) -> str:
 | 
			
		||||
        return f"Kubernetes Service-Connection {self.name}"
 | 
			
		||||
 | 
			
		||||
    def _get_state(self) -> OutpostServiceConnectionState:
 | 
			
		||||
    def fetch_state(self) -> OutpostServiceConnectionState:
 | 
			
		||||
        try:
 | 
			
		||||
            client = self.client()
 | 
			
		||||
            api_instance = VersionApi(client)
 | 
			
		||||
 | 
			
		||||
@ -35,21 +35,23 @@ def outpost_controller_all():
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@CELERY_APP.task()
 | 
			
		||||
def outpost_service_connection_state(state_pk: Any):
 | 
			
		||||
def outpost_service_connection_state(connection_pk: Any):
 | 
			
		||||
    """Update cached state of a service connection"""
 | 
			
		||||
    connection: OutpostServiceConnection = (
 | 
			
		||||
        OutpostServiceConnection.objects.filter(pk=state_pk).select_subclasses().first()
 | 
			
		||||
        OutpostServiceConnection.objects.filter(pk=connection_pk)
 | 
			
		||||
        .select_subclasses()
 | 
			
		||||
        .first()
 | 
			
		||||
    )
 | 
			
		||||
    cache.delete(f"outpost_service_connection_{connection.pk.hex}")
 | 
			
		||||
    _ = connection.state
 | 
			
		||||
    state = connection.fetch_state()
 | 
			
		||||
    cache.set(connection.state_key, state, timeout=0)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
 | 
			
		||||
def outpost_service_connection_monitor(self: MonitoredTask):
 | 
			
		||||
    """Regularly check the state of Outpost Service Connections"""
 | 
			
		||||
    for connection in OutpostServiceConnection.objects.select_subclasses():
 | 
			
		||||
        cache.delete(f"outpost_service_connection_{connection.pk.hex}")
 | 
			
		||||
        _ = connection.state
 | 
			
		||||
    for connection in OutpostServiceConnection.objects.all():
 | 
			
		||||
        outpost_service_connection_state.delay(connection.pk)
 | 
			
		||||
    self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user