From 411c52491eace347963fed8f38327e558f1c6734 Mon Sep 17 00:00:00 2001 From: Marc 'risson' Schmitt Date: Wed, 25 Jun 2025 16:50:21 +0200 Subject: [PATCH] add worker api Signed-off-by: Marc 'risson' Schmitt --- authentik/root/settings.py | 1 + authentik/tasks/api/__init__.py | 0 authentik/tasks/{api.py => api/tasks.py} | 0 authentik/tasks/api/workers.py | 48 +++++++++++++++++++ authentik/tasks/apps.py | 13 +++++ authentik/tasks/forks.py | 5 ++ authentik/tasks/middleware.py | 28 ++++++++++- .../tasks/migrations/0005_workerstatus.py | 27 +++++++++++ .../0006_alter_workerstatus_options.py | 21 ++++++++ authentik/tasks/models.py | 17 ++++++- authentik/tasks/schedules/scheduler.py | 1 - authentik/tasks/tasks.py | 11 +++-- authentik/tasks/urls.py | 6 ++- authentik/tasks/worker.py | 0 .../django_dramatiq_postgres/scheduler.py | 1 - schema.yml | 42 ++++++++++++++++ .../admin-overview/cards/WorkerStatusCard.ts | 4 +- 17 files changed, 213 insertions(+), 12 deletions(-) create mode 100644 authentik/tasks/api/__init__.py rename authentik/tasks/{api.py => api/tasks.py} (100%) create mode 100644 authentik/tasks/api/workers.py create mode 100644 authentik/tasks/forks.py create mode 100644 authentik/tasks/migrations/0005_workerstatus.py create mode 100644 authentik/tasks/migrations/0006_alter_workerstatus_options.py create mode 100644 authentik/tasks/worker.py diff --git a/authentik/root/settings.py b/authentik/root/settings.py index 3c5a8ba194..70367f9ce9 100644 --- a/authentik/root/settings.py +++ b/authentik/root/settings.py @@ -409,6 +409,7 @@ DRAMATIQ = { ("authentik.tasks.middleware.MessagesMiddleware", {}), ("authentik.tasks.middleware.LoggingMiddleware", {}), ("authentik.tasks.middleware.DescriptionMiddleware", {}), + ("authentik.tasks.middleware.WorkerStatusMiddleware", {}), ), "test": TEST, } diff --git a/authentik/tasks/api/__init__.py b/authentik/tasks/api/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/authentik/tasks/api.py b/authentik/tasks/api/tasks.py similarity index 100% rename from authentik/tasks/api.py rename to authentik/tasks/api/tasks.py diff --git a/authentik/tasks/api/workers.py b/authentik/tasks/api/workers.py new file mode 100644 index 0000000000..d6a855e3f5 --- /dev/null +++ b/authentik/tasks/api/workers.py @@ -0,0 +1,48 @@ +import pglock +from django.utils.timezone import now, timedelta +from drf_spectacular.utils import extend_schema, inline_serializer +from packaging.version import parse +from rest_framework.fields import BooleanField, CharField +from rest_framework.request import Request +from rest_framework.response import Response +from rest_framework.views import APIView + +from authentik import get_full_version +from authentik.rbac.permissions import HasPermission +from authentik.tasks.models import WorkerStatus + + +class WorkerView(APIView): + """Get currently connected worker count.""" + + permission_classes = [HasPermission("authentik_rbac.view_system_info")] + + @extend_schema( + responses=inline_serializer( + "Worker", + fields={ + "worker_id": CharField(), + "version": CharField(), + "version_matching": BooleanField(), + }, + many=True, + ) + ) + def get(self, request: Request) -> Response: + response = [] + our_version = parse(get_full_version()) + for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)): + lock_id = f"goauthentik.io/worker/status/{status.pk}" + with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired: + # The worker doesn't hold the lock, it isn't running + if acquired: + continue + version_matching = parse(status.version) == our_version + response.append( + { + "worker_id": f"{status.pk}@{status.hostname}", + "version": status.version, + "version_matching": version_matching, + } + ) + return Response(response) diff --git a/authentik/tasks/apps.py b/authentik/tasks/apps.py index 7a0f14810f..1c6e31ea53 100644 --- a/authentik/tasks/apps.py +++ b/authentik/tasks/apps.py @@ -1,4 +1,6 @@ from authentik.blueprints.apps import ManagedAppConfig +from authentik.lib.utils.time import fqdn_rand +from authentik.tasks.schedules.lib import ScheduleSpec class AuthentikTasksConfig(ManagedAppConfig): @@ -18,3 +20,14 @@ class AuthentikTasksConfig(ManagedAppConfig): # actor = old_broker.get_actor(actor_name) # actor.broker = broker # actor.broker.declare_actor(actor) + + @property + def global_schedule_specs(self) -> list[ScheduleSpec]: + from authentik.tasks.tasks import clean_worker_statuses + + return [ + ScheduleSpec( + actor=clean_worker_statuses, + crontab=f"{fqdn_rand('clean_worker_statuses')} {fqdn_rand('clean_worker_statuses', 24)} * * *", # noqa: E501 + ), + ] diff --git a/authentik/tasks/forks.py b/authentik/tasks/forks.py new file mode 100644 index 0000000000..b601bf7c56 --- /dev/null +++ b/authentik/tasks/forks.py @@ -0,0 +1,5 @@ +def worker_status(): + import authentik.tasks.setup # noqa + from authentik.tasks.middleware import WorkerStatusMiddleware + + WorkerStatusMiddleware.worker_status() diff --git a/authentik/tasks/middleware.py b/authentik/tasks/middleware.py index 32a59d9377..c180fc363f 100644 --- a/authentik/tasks/middleware.py +++ b/authentik/tasks/middleware.py @@ -1,13 +1,18 @@ +from socket import gethostname +from time import sleep from typing import Any +import pglock +from django.utils.timezone import now from dramatiq.broker import Broker from dramatiq.message import Message from dramatiq.middleware import Middleware from structlog.stdlib import get_logger +from authentik import get_full_version from authentik.events.models import Event, EventAction from authentik.lib.utils.errors import exception_to_string -from authentik.tasks.models import Task, TaskStatus +from authentik.tasks.models import Task, TaskStatus, WorkerStatus from authentik.tenants.models import Tenant from authentik.tenants.utils import get_current_tenant @@ -130,3 +135,24 @@ class DescriptionMiddleware(Middleware): @property def actor_options(self): return {"description"} + + +class WorkerStatusMiddleware(Middleware): + @property + def forks(self): + from authentik.tasks.forks import worker_status + + return [worker_status] + + @staticmethod + def worker_status(): + status = WorkerStatus.objects.create( + hostname=gethostname(), + version=get_full_version(), + ) + lock_id = f"goauthentik.io/worker/status/{status.pk}" + with pglock.advisory(lock_id, side_effect=pglock.Raise): + while True: + status.last_seen = now() + status.save(update_fields=("last_seen",)) + sleep(30) diff --git a/authentik/tasks/migrations/0005_workerstatus.py b/authentik/tasks/migrations/0005_workerstatus.py new file mode 100644 index 0000000000..d7279e6a0e --- /dev/null +++ b/authentik/tasks/migrations/0005_workerstatus.py @@ -0,0 +1,27 @@ +# Generated by Django 5.1.11 on 2025-06-25 13:58 + +import uuid +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("authentik_tasks", "0004_alter_task_options"), + ] + + operations = [ + migrations.CreateModel( + name="WorkerStatus", + fields=[ + ("id", models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)), + ("hostname", models.TextField()), + ("version", models.TextField()), + ("last_seen", models.DateTimeField(auto_now_add=True)), + ], + options={ + "verbose_name": "Worker status", + "verbose_name_plural": "Worker statuses", + }, + ), + ] diff --git a/authentik/tasks/migrations/0006_alter_workerstatus_options.py b/authentik/tasks/migrations/0006_alter_workerstatus_options.py new file mode 100644 index 0000000000..142bbffc8e --- /dev/null +++ b/authentik/tasks/migrations/0006_alter_workerstatus_options.py @@ -0,0 +1,21 @@ +# Generated by Django 5.1.11 on 2025-06-25 14:42 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("authentik_tasks", "0005_workerstatus"), + ] + + operations = [ + migrations.AlterModelOptions( + name="workerstatus", + options={ + "default_permissions": [], + "verbose_name": "Worker status", + "verbose_name_plural": "Worker statuses", + }, + ), + ] diff --git a/authentik/tasks/models.py b/authentik/tasks/models.py index 0327823d60..9f2ef99e67 100644 --- a/authentik/tasks/models.py +++ b/authentik/tasks/models.py @@ -1,5 +1,5 @@ from typing import Any -from uuid import UUID +from uuid import UUID, uuid4 import pgtrigger from django.contrib.contenttypes.fields import ContentType, GenericForeignKey, GenericRelation @@ -148,3 +148,18 @@ class TasksModel(models.Model): class Meta: abstract = True + + +class WorkerStatus(models.Model): + id = models.UUIDField(primary_key=True, default=uuid4) + hostname = models.TextField() + version = models.TextField() + last_seen = models.DateTimeField(auto_now_add=True) + + class Meta: + default_permissions = [] + verbose_name = _("Worker status") + verbose_name_plural = _("Worker statuses") + + def __str__(self): + return f"{self.id} - {self.hostname} - {self.version} - {self.last_seen}" diff --git a/authentik/tasks/schedules/scheduler.py b/authentik/tasks/schedules/scheduler.py index c5acb0467b..ec4b486d12 100644 --- a/authentik/tasks/schedules/scheduler.py +++ b/authentik/tasks/schedules/scheduler.py @@ -1,4 +1,3 @@ - import pglock from django_dramatiq_postgres.scheduler import Scheduler as SchedulerBase from structlog.stdlib import get_logger diff --git a/authentik/tasks/tasks.py b/authentik/tasks/tasks.py index 9358b65c3b..59ef5d5ce3 100644 --- a/authentik/tasks/tasks.py +++ b/authentik/tasks/tasks.py @@ -1,9 +1,10 @@ +from django.utils.timezone import now, timedelta +from django.utils.translation import gettext_lazy as _ from dramatiq import actor +from authentik.tasks.models import WorkerStatus -@actor -def test_actor(): - import time - time.sleep(2) - print("done sleeping") +@actor(description=_("Remove old worker statuses.")) +def clean_worker_statuses(): + WorkerStatus.objects.filter(last_seen__lt=now() - timedelta(days=1)).delete() diff --git a/authentik/tasks/urls.py b/authentik/tasks/urls.py index 7defb480d5..d48ef1597d 100644 --- a/authentik/tasks/urls.py +++ b/authentik/tasks/urls.py @@ -1,5 +1,9 @@ -from authentik.tasks.api import TaskViewSet +from django.urls import path + +from authentik.tasks.api.tasks import TaskViewSet +from authentik.tasks.api.workers import WorkerView api_urlpatterns = [ ("tasks/tasks", TaskViewSet), + path("tasks/workers", WorkerView.as_view(), name="tasks_workers"), ] diff --git a/authentik/tasks/worker.py b/authentik/tasks/worker.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py index 63e6aee34e..3383d62f53 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py @@ -1,4 +1,3 @@ - import pglock from django.db import router, transaction from django.db.models import QuerySet diff --git a/schema.yml b/schema.yml index 49022956f3..2825441272 100644 --- a/schema.yml +++ b/schema.yml @@ -40658,6 +40658,35 @@ paths: schema: $ref: '#/components/schemas/GenericError' description: '' + /tasks/workers: + get: + operationId: tasks_workers_list + description: Get currently connected worker count. + tags: + - tasks + security: + - authentik: [] + responses: + '200': + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Worker' + description: '' + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/ValidationError' + description: '' + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/GenericError' + description: '' /tenants/domains/: get: operationId: tenants_domains_list @@ -61880,6 +61909,19 @@ components: required: - aaguid - description + Worker: + type: object + properties: + worker_id: + type: string + version: + type: string + version_matching: + type: boolean + required: + - version + - version_matching + - worker_id modelRequest: oneOf: - $ref: '#/components/schemas/GoogleWorkspaceProviderRequest' diff --git a/web/src/admin/admin-overview/cards/WorkerStatusCard.ts b/web/src/admin/admin-overview/cards/WorkerStatusCard.ts index ad158bc07e..269a1b0682 100644 --- a/web/src/admin/admin-overview/cards/WorkerStatusCard.ts +++ b/web/src/admin/admin-overview/cards/WorkerStatusCard.ts @@ -8,14 +8,14 @@ import { msg } from "@lit/localize"; import { TemplateResult, html } from "lit"; import { customElement } from "lit/decorators.js"; -import { AdminApi, Worker } from "@goauthentik/api"; +import { TasksApi, Worker } from "@goauthentik/api"; @customElement("ak-admin-status-card-workers") export class WorkersStatusCard extends AdminStatusCard { icon = "pf-icon pf-icon-server"; getPrimaryValue(): Promise { - return new AdminApi(DEFAULT_CONFIG).adminWorkersList(); + return new TasksApi(DEFAULT_CONFIG).tasksWorkersList(); } renderHeader(): TemplateResult {