add worker api

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
Marc 'risson' Schmitt
2025-06-25 16:50:21 +02:00
parent 85869806a2
commit 411c52491e
17 changed files with 213 additions and 12 deletions

View File

@ -409,6 +409,7 @@ DRAMATIQ = {
("authentik.tasks.middleware.MessagesMiddleware", {}), ("authentik.tasks.middleware.MessagesMiddleware", {}),
("authentik.tasks.middleware.LoggingMiddleware", {}), ("authentik.tasks.middleware.LoggingMiddleware", {}),
("authentik.tasks.middleware.DescriptionMiddleware", {}), ("authentik.tasks.middleware.DescriptionMiddleware", {}),
("authentik.tasks.middleware.WorkerStatusMiddleware", {}),
), ),
"test": TEST, "test": TEST,
} }

View File

View File

@ -0,0 +1,48 @@
import pglock
from django.utils.timezone import now, timedelta
from drf_spectacular.utils import extend_schema, inline_serializer
from packaging.version import parse
from rest_framework.fields import BooleanField, CharField
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView
from authentik import get_full_version
from authentik.rbac.permissions import HasPermission
from authentik.tasks.models import WorkerStatus
class WorkerView(APIView):
"""Get currently connected worker count."""
permission_classes = [HasPermission("authentik_rbac.view_system_info")]
@extend_schema(
responses=inline_serializer(
"Worker",
fields={
"worker_id": CharField(),
"version": CharField(),
"version_matching": BooleanField(),
},
many=True,
)
)
def get(self, request: Request) -> Response:
response = []
our_version = parse(get_full_version())
for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)):
lock_id = f"goauthentik.io/worker/status/{status.pk}"
with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired:
# The worker doesn't hold the lock, it isn't running
if acquired:
continue
version_matching = parse(status.version) == our_version
response.append(
{
"worker_id": f"{status.pk}@{status.hostname}",
"version": status.version,
"version_matching": version_matching,
}
)
return Response(response)

View File

@ -1,4 +1,6 @@
from authentik.blueprints.apps import ManagedAppConfig from authentik.blueprints.apps import ManagedAppConfig
from authentik.lib.utils.time import fqdn_rand
from authentik.tasks.schedules.lib import ScheduleSpec
class AuthentikTasksConfig(ManagedAppConfig): class AuthentikTasksConfig(ManagedAppConfig):
@ -18,3 +20,14 @@ class AuthentikTasksConfig(ManagedAppConfig):
# actor = old_broker.get_actor(actor_name) # actor = old_broker.get_actor(actor_name)
# actor.broker = broker # actor.broker = broker
# actor.broker.declare_actor(actor) # actor.broker.declare_actor(actor)
@property
def global_schedule_specs(self) -> list[ScheduleSpec]:
from authentik.tasks.tasks import clean_worker_statuses
return [
ScheduleSpec(
actor=clean_worker_statuses,
crontab=f"{fqdn_rand('clean_worker_statuses')} {fqdn_rand('clean_worker_statuses', 24)} * * *", # noqa: E501
),
]

5
authentik/tasks/forks.py Normal file
View File

@ -0,0 +1,5 @@
def worker_status():
import authentik.tasks.setup # noqa
from authentik.tasks.middleware import WorkerStatusMiddleware
WorkerStatusMiddleware.worker_status()

View File

@ -1,13 +1,18 @@
from socket import gethostname
from time import sleep
from typing import Any from typing import Any
import pglock
from django.utils.timezone import now
from dramatiq.broker import Broker from dramatiq.broker import Broker
from dramatiq.message import Message from dramatiq.message import Message
from dramatiq.middleware import Middleware from dramatiq.middleware import Middleware
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
from authentik import get_full_version
from authentik.events.models import Event, EventAction from authentik.events.models import Event, EventAction
from authentik.lib.utils.errors import exception_to_string from authentik.lib.utils.errors import exception_to_string
from authentik.tasks.models import Task, TaskStatus from authentik.tasks.models import Task, TaskStatus, WorkerStatus
from authentik.tenants.models import Tenant from authentik.tenants.models import Tenant
from authentik.tenants.utils import get_current_tenant from authentik.tenants.utils import get_current_tenant
@ -130,3 +135,24 @@ class DescriptionMiddleware(Middleware):
@property @property
def actor_options(self): def actor_options(self):
return {"description"} return {"description"}
class WorkerStatusMiddleware(Middleware):
@property
def forks(self):
from authentik.tasks.forks import worker_status
return [worker_status]
@staticmethod
def worker_status():
status = WorkerStatus.objects.create(
hostname=gethostname(),
version=get_full_version(),
)
lock_id = f"goauthentik.io/worker/status/{status.pk}"
with pglock.advisory(lock_id, side_effect=pglock.Raise):
while True:
status.last_seen = now()
status.save(update_fields=("last_seen",))
sleep(30)

View File

@ -0,0 +1,27 @@
# Generated by Django 5.1.11 on 2025-06-25 13:58
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_tasks", "0004_alter_task_options"),
]
operations = [
migrations.CreateModel(
name="WorkerStatus",
fields=[
("id", models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
("hostname", models.TextField()),
("version", models.TextField()),
("last_seen", models.DateTimeField(auto_now_add=True)),
],
options={
"verbose_name": "Worker status",
"verbose_name_plural": "Worker statuses",
},
),
]

View File

@ -0,0 +1,21 @@
# Generated by Django 5.1.11 on 2025-06-25 14:42
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("authentik_tasks", "0005_workerstatus"),
]
operations = [
migrations.AlterModelOptions(
name="workerstatus",
options={
"default_permissions": [],
"verbose_name": "Worker status",
"verbose_name_plural": "Worker statuses",
},
),
]

View File

@ -1,5 +1,5 @@
from typing import Any from typing import Any
from uuid import UUID from uuid import UUID, uuid4
import pgtrigger import pgtrigger
from django.contrib.contenttypes.fields import ContentType, GenericForeignKey, GenericRelation from django.contrib.contenttypes.fields import ContentType, GenericForeignKey, GenericRelation
@ -148,3 +148,18 @@ class TasksModel(models.Model):
class Meta: class Meta:
abstract = True abstract = True
class WorkerStatus(models.Model):
id = models.UUIDField(primary_key=True, default=uuid4)
hostname = models.TextField()
version = models.TextField()
last_seen = models.DateTimeField(auto_now_add=True)
class Meta:
default_permissions = []
verbose_name = _("Worker status")
verbose_name_plural = _("Worker statuses")
def __str__(self):
return f"{self.id} - {self.hostname} - {self.version} - {self.last_seen}"

View File

@ -1,4 +1,3 @@
import pglock import pglock
from django_dramatiq_postgres.scheduler import Scheduler as SchedulerBase from django_dramatiq_postgres.scheduler import Scheduler as SchedulerBase
from structlog.stdlib import get_logger from structlog.stdlib import get_logger

View File

@ -1,9 +1,10 @@
from django.utils.timezone import now, timedelta
from django.utils.translation import gettext_lazy as _
from dramatiq import actor from dramatiq import actor
from authentik.tasks.models import WorkerStatus
@actor
def test_actor():
import time
time.sleep(2) @actor(description=_("Remove old worker statuses."))
print("done sleeping") def clean_worker_statuses():
WorkerStatus.objects.filter(last_seen__lt=now() - timedelta(days=1)).delete()

View File

@ -1,5 +1,9 @@
from authentik.tasks.api import TaskViewSet from django.urls import path
from authentik.tasks.api.tasks import TaskViewSet
from authentik.tasks.api.workers import WorkerView
api_urlpatterns = [ api_urlpatterns = [
("tasks/tasks", TaskViewSet), ("tasks/tasks", TaskViewSet),
path("tasks/workers", WorkerView.as_view(), name="tasks_workers"),
] ]

View File

View File

@ -1,4 +1,3 @@
import pglock import pglock
from django.db import router, transaction from django.db import router, transaction
from django.db.models import QuerySet from django.db.models import QuerySet

View File

@ -40658,6 +40658,35 @@ paths:
schema: schema:
$ref: '#/components/schemas/GenericError' $ref: '#/components/schemas/GenericError'
description: '' description: ''
/tasks/workers:
get:
operationId: tasks_workers_list
description: Get currently connected worker count.
tags:
- tasks
security:
- authentik: []
responses:
'200':
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/Worker'
description: ''
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/ValidationError'
description: ''
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/GenericError'
description: ''
/tenants/domains/: /tenants/domains/:
get: get:
operationId: tenants_domains_list operationId: tenants_domains_list
@ -61880,6 +61909,19 @@ components:
required: required:
- aaguid - aaguid
- description - description
Worker:
type: object
properties:
worker_id:
type: string
version:
type: string
version_matching:
type: boolean
required:
- version
- version_matching
- worker_id
modelRequest: modelRequest:
oneOf: oneOf:
- $ref: '#/components/schemas/GoogleWorkspaceProviderRequest' - $ref: '#/components/schemas/GoogleWorkspaceProviderRequest'

View File

@ -8,14 +8,14 @@ import { msg } from "@lit/localize";
import { TemplateResult, html } from "lit"; import { TemplateResult, html } from "lit";
import { customElement } from "lit/decorators.js"; import { customElement } from "lit/decorators.js";
import { AdminApi, Worker } from "@goauthentik/api"; import { TasksApi, Worker } from "@goauthentik/api";
@customElement("ak-admin-status-card-workers") @customElement("ak-admin-status-card-workers")
export class WorkersStatusCard extends AdminStatusCard<Worker[]> { export class WorkersStatusCard extends AdminStatusCard<Worker[]> {
icon = "pf-icon pf-icon-server"; icon = "pf-icon pf-icon-server";
getPrimaryValue(): Promise<Worker[]> { getPrimaryValue(): Promise<Worker[]> {
return new AdminApi(DEFAULT_CONFIG).adminWorkersList(); return new TasksApi(DEFAULT_CONFIG).tasksWorkersList();
} }
renderHeader(): TemplateResult { renderHeader(): TemplateResult {