admin: monitor worker version (#12463)
* root: include version in celery ping Signed-off-by: Jens Langhammer <jens@goauthentik.io> * check version in worker endpoint Signed-off-by: Jens Langhammer <jens@goauthentik.io> * include worker version in prom metrics Signed-off-by: Jens Langhammer <jens@goauthentik.io> * format Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix tests Signed-off-by: Jens Langhammer <jens@goauthentik.io> --------- Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
		| @ -1,12 +1,16 @@ | ||||
| """authentik administration overview""" | ||||
|  | ||||
| from socket import gethostname | ||||
|  | ||||
| from django.conf import settings | ||||
| from drf_spectacular.utils import extend_schema, inline_serializer | ||||
| from rest_framework.fields import IntegerField | ||||
| from packaging.version import parse | ||||
| from rest_framework.fields import BooleanField, CharField | ||||
| from rest_framework.request import Request | ||||
| from rest_framework.response import Response | ||||
| from rest_framework.views import APIView | ||||
|  | ||||
| from authentik import get_full_version | ||||
| from authentik.rbac.permissions import HasPermission | ||||
| from authentik.root.celery import CELERY_APP | ||||
|  | ||||
| @ -16,11 +20,38 @@ class WorkerView(APIView): | ||||
|  | ||||
|     permission_classes = [HasPermission("authentik_rbac.view_system_info")] | ||||
|  | ||||
|     @extend_schema(responses=inline_serializer("Workers", fields={"count": IntegerField()})) | ||||
|     @extend_schema( | ||||
|         responses=inline_serializer( | ||||
|             "Worker", | ||||
|             fields={ | ||||
|                 "worker_id": CharField(), | ||||
|                 "version": CharField(), | ||||
|                 "version_matching": BooleanField(), | ||||
|             }, | ||||
|             many=True, | ||||
|         ) | ||||
|     ) | ||||
|     def get(self, request: Request) -> Response: | ||||
|         """Get currently connected worker count.""" | ||||
|         count = len(CELERY_APP.control.ping(timeout=0.5)) | ||||
|         raw: list[dict[str, dict]] = CELERY_APP.control.ping(timeout=0.5) | ||||
|         our_version = parse(get_full_version()) | ||||
|         response = [] | ||||
|         for worker in raw: | ||||
|             key = list(worker.keys())[0] | ||||
|             version = worker[key].get("version") | ||||
|             version_matching = False | ||||
|             if version: | ||||
|                 version_matching = parse(version) == our_version | ||||
|             response.append( | ||||
|                 {"worker_id": key, "version": version, "version_matching": version_matching} | ||||
|             ) | ||||
|         # In debug we run with `task_always_eager`, so tasks are ran on the main process | ||||
|         if settings.DEBUG:  # pragma: no cover | ||||
|             count += 1 | ||||
|         return Response({"count": count}) | ||||
|             response.append( | ||||
|                 { | ||||
|                     "worker_id": f"authentik-debug@{gethostname()}", | ||||
|                     "version": get_full_version(), | ||||
|                     "version_matching": True, | ||||
|                 } | ||||
|             ) | ||||
|         return Response(response) | ||||
|  | ||||
| @ -1,11 +1,10 @@ | ||||
| """authentik admin app config""" | ||||
|  | ||||
| from prometheus_client import Gauge, Info | ||||
| from prometheus_client import Info | ||||
|  | ||||
| from authentik.blueprints.apps import ManagedAppConfig | ||||
|  | ||||
| PROM_INFO = Info("authentik_version", "Currently running authentik version") | ||||
| GAUGE_WORKERS = Gauge("authentik_admin_workers", "Currently connected workers") | ||||
|  | ||||
|  | ||||
| class AuthentikAdminConfig(ManagedAppConfig): | ||||
|  | ||||
| @ -1,14 +1,35 @@ | ||||
| """admin signals""" | ||||
|  | ||||
| from django.dispatch import receiver | ||||
| from packaging.version import parse | ||||
| from prometheus_client import Gauge | ||||
|  | ||||
| from authentik.admin.apps import GAUGE_WORKERS | ||||
| from authentik import get_full_version | ||||
| from authentik.root.celery import CELERY_APP | ||||
| from authentik.root.monitoring import monitoring_set | ||||
|  | ||||
| GAUGE_WORKERS = Gauge( | ||||
|     "authentik_admin_workers", | ||||
|     "Currently connected workers, their versions and if they are the same version as authentik", | ||||
|     ["version", "version_matched"], | ||||
| ) | ||||
|  | ||||
|  | ||||
| _version = parse(get_full_version()) | ||||
|  | ||||
|  | ||||
| @receiver(monitoring_set) | ||||
| def monitoring_set_workers(sender, **kwargs): | ||||
|     """Set worker gauge""" | ||||
|     count = len(CELERY_APP.control.ping(timeout=0.5)) | ||||
|     GAUGE_WORKERS.set(count) | ||||
|     raw: list[dict[str, dict]] = CELERY_APP.control.ping(timeout=0.5) | ||||
|     worker_version_count = {} | ||||
|     for worker in raw: | ||||
|         key = list(worker.keys())[0] | ||||
|         version = worker[key].get("version") | ||||
|         version_matching = False | ||||
|         if version: | ||||
|             version_matching = parse(version) == _version | ||||
|         worker_version_count.setdefault(version, {"count": 0, "matching": version_matching}) | ||||
|         worker_version_count[version]["count"] += 1 | ||||
|     for version, stats in worker_version_count.items(): | ||||
|         GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"]) | ||||
|  | ||||
| @ -34,7 +34,7 @@ class TestAdminAPI(TestCase): | ||||
|         response = self.client.get(reverse("authentik_api:admin_workers")) | ||||
|         self.assertEqual(response.status_code, 200) | ||||
|         body = loads(response.content) | ||||
|         self.assertEqual(body["count"], 0) | ||||
|         self.assertEqual(len(body), 0) | ||||
|  | ||||
|     def test_metrics(self): | ||||
|         """Test metrics API""" | ||||
|  | ||||
| @ -18,6 +18,7 @@ from celery.signals import ( | ||||
|     task_prerun, | ||||
|     worker_ready, | ||||
| ) | ||||
| from celery.worker.control import inspect_command | ||||
| from django.conf import settings | ||||
| from django.db import ProgrammingError | ||||
| from django_tenants.utils import get_public_schema_name | ||||
| @ -25,6 +26,7 @@ from structlog.contextvars import STRUCTLOG_KEY_PREFIX | ||||
| from structlog.stdlib import get_logger | ||||
| from tenant_schemas_celery.app import CeleryApp as TenantAwareCeleryApp | ||||
|  | ||||
| from authentik import get_full_version | ||||
| from authentik.lib.sentry import before_send | ||||
| from authentik.lib.utils.errors import exception_to_string | ||||
|  | ||||
| @ -159,6 +161,12 @@ class LivenessProbe(bootsteps.StartStopStep): | ||||
|         HEARTBEAT_FILE.touch() | ||||
|  | ||||
|  | ||||
| @inspect_command(default_timeout=0.2) | ||||
| def ping(state, **kwargs): | ||||
|     """Ping worker(s).""" | ||||
|     return {"ok": "pong", "version": get_full_version()} | ||||
|  | ||||
|  | ||||
| CELERY_APP.config_from_object(settings.CELERY) | ||||
|  | ||||
| # Load task modules from all registered Django app configs. | ||||
|  | ||||
| @ -4159,7 +4159,7 @@ | ||||
|                 "re_evaluate_policies": { | ||||
|                     "type": "boolean", | ||||
|                     "title": "Re evaluate policies", | ||||
|                     "description": "Evaluate policies when the Stage is present to the user." | ||||
|                     "description": "Evaluate policies when the Stage is presented to the user." | ||||
|                 }, | ||||
|                 "order": { | ||||
|                     "type": "integer", | ||||
|  | ||||
							
								
								
									
										20
									
								
								schema.yml
									
									
									
									
									
								
							
							
						
						
									
										20
									
								
								schema.yml
									
									
									
									
									
								
							| @ -349,7 +349,7 @@ paths: | ||||
|           description: '' | ||||
|   /admin/workers/: | ||||
|     get: | ||||
|       operationId: admin_workers_retrieve | ||||
|       operationId: admin_workers_list | ||||
|       description: Get currently connected worker count. | ||||
|       tags: | ||||
|       - admin | ||||
| @ -360,7 +360,9 @@ paths: | ||||
|           content: | ||||
|             application/json: | ||||
|               schema: | ||||
|                 $ref: '#/components/schemas/Workers' | ||||
|                 type: array | ||||
|                 items: | ||||
|                   $ref: '#/components/schemas/Worker' | ||||
|           description: '' | ||||
|         '400': | ||||
|           content: | ||||
| @ -56987,13 +56989,19 @@ components: | ||||
|       required: | ||||
|       - aaguid | ||||
|       - description | ||||
|     Workers: | ||||
|     Worker: | ||||
|       type: object | ||||
|       properties: | ||||
|         count: | ||||
|           type: integer | ||||
|         worker_id: | ||||
|           type: string | ||||
|         version: | ||||
|           type: string | ||||
|         version_matching: | ||||
|           type: boolean | ||||
|       required: | ||||
|       - count | ||||
|       - version | ||||
|       - version_matching | ||||
|       - worker_id | ||||
|     modelRequest: | ||||
|       oneOf: | ||||
|       - $ref: '#/components/schemas/GoogleWorkspaceProviderRequest' | ||||
|  | ||||
| @ -8,34 +8,41 @@ import { msg } from "@lit/localize"; | ||||
| import { TemplateResult, html } from "lit"; | ||||
| import { customElement } from "lit/decorators.js"; | ||||
|  | ||||
| import { AdminApi } from "@goauthentik/api"; | ||||
| import { AdminApi, Worker } from "@goauthentik/api"; | ||||
|  | ||||
| @customElement("ak-admin-status-card-workers") | ||||
| export class WorkersStatusCard extends AdminStatusCard<number> { | ||||
| export class WorkersStatusCard extends AdminStatusCard<Worker[]> { | ||||
|     icon = "pf-icon pf-icon-server"; | ||||
|  | ||||
|     getPrimaryValue(): Promise<number> { | ||||
|         return new AdminApi(DEFAULT_CONFIG).adminWorkersRetrieve().then((workers) => { | ||||
|             return workers.count; | ||||
|         }); | ||||
|     getPrimaryValue(): Promise<Worker[]> { | ||||
|         return new AdminApi(DEFAULT_CONFIG).adminWorkersList(); | ||||
|     } | ||||
|  | ||||
|     renderHeader(): TemplateResult { | ||||
|         return html`${msg("Workers")}`; | ||||
|     } | ||||
|  | ||||
|     getStatus(value: number): Promise<AdminStatus> { | ||||
|         if (value < 1) { | ||||
|     getStatus(value: Worker[]): Promise<AdminStatus> { | ||||
|         if (value.length < 1) { | ||||
|             return Promise.resolve<AdminStatus>({ | ||||
|                 icon: "fa fa-times-circle pf-m-danger", | ||||
|                 message: html`${msg("No workers connected. Background tasks will not run.")}`, | ||||
|             }); | ||||
|         } else if (value.filter((w) => !w.versionMatching).length > 0) { | ||||
|             return Promise.resolve<AdminStatus>({ | ||||
|                 icon: "fa fa-times-circle pf-m-danger", | ||||
|                 message: html`${msg("Worker with incorrect version connected.")}`, | ||||
|             }); | ||||
|         } else { | ||||
|             return Promise.resolve<AdminStatus>({ | ||||
|                 icon: "fa fa-check-circle pf-m-success", | ||||
|             }); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     renderValue() { | ||||
|         return html`${this.value?.length}`; | ||||
|     } | ||||
| } | ||||
|  | ||||
| declare global { | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 Jens L.
					Jens L.