61 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			61 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Kubernetes Service Reconciler"""
 | |
| from typing import TYPE_CHECKING
 | |
| 
 | |
| from kubernetes.client import CoreV1Api, V1Service, V1ServicePort, V1ServiceSpec
 | |
| 
 | |
| from passbook.outposts.controllers.k8s.base import (
 | |
|     KubernetesObjectReconciler,
 | |
|     NeedsUpdate,
 | |
| )
 | |
| 
 | |
| if TYPE_CHECKING:
 | |
|     from passbook.outposts.controllers.kubernetes import KubernetesController
 | |
| 
 | |
| 
 | |
| class ServiceReconciler(KubernetesObjectReconciler[V1Service]):
 | |
|     """Kubernetes Service Reconciler"""
 | |
| 
 | |
|     def __init__(self, controller: "KubernetesController") -> None:
 | |
|         super().__init__(controller)
 | |
|         self.api = CoreV1Api()
 | |
| 
 | |
|     @property
 | |
|     def name(self) -> str:
 | |
|         return f"passbook-outpost-{self.controller.outpost.name}"
 | |
| 
 | |
|     def reconcile(self, current: V1Service, reference: V1Service):
 | |
|         if len(current.spec.ports) != len(reference.spec.ports):
 | |
|             raise NeedsUpdate()
 | |
|         for port in reference.spec.ports:
 | |
|             if port not in current.spec.ports:
 | |
|                 raise NeedsUpdate()
 | |
| 
 | |
|     def get_reference_object(self) -> V1Service:
 | |
|         """Get deployment object for outpost"""
 | |
|         meta = self.get_object_meta(name=self.name)
 | |
|         ports = []
 | |
|         for port_name, port in self.controller.deployment_ports.items():
 | |
|             ports.append(V1ServicePort(name=port_name, port=port))
 | |
|         return V1Service(
 | |
|             metadata=meta,
 | |
|             spec=V1ServiceSpec(ports=ports, selector=meta.labels, type="ClusterIP"),
 | |
|         )
 | |
| 
 | |
|     def create(self, reference: V1Service):
 | |
|         return self.api.create_namespaced_service(self.namespace, reference)
 | |
| 
 | |
|     def delete(self, reference: V1Service):
 | |
|         return self.api.delete_namespaced_service(
 | |
|             reference.metadata.name, self.namespace
 | |
|         )
 | |
| 
 | |
|     def retrieve(self) -> V1Service:
 | |
|         return self.api.read_namespaced_service(
 | |
|             f"passbook-outpost-{self.controller.outpost.name}", self.namespace
 | |
|         )
 | |
| 
 | |
|     def update(self, current: V1Service, reference: V1Service):
 | |
|         return self.api.patch_namespaced_service(
 | |
|             current.metadata.name, self.namespace, reference
 | |
|         )
 | 
