fix metrics

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
Marc 'risson' Schmitt
2025-06-26 18:05:01 +02:00
parent 0908d0b559
commit 8956606564
10 changed files with 35 additions and 66 deletions

View File

@ -1,5 +1,5 @@
from dramatiq.actor import Actor from dramatiq.actor import Actor
from drf_spectacular.utils import OpenApiResponse, extend_schema from drf_spectacular.utils import extend_schema
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.fields import BooleanField, CharField, ChoiceField from rest_framework.fields import BooleanField, CharField, ChoiceField
from rest_framework.request import Request from rest_framework.request import Request

View File

@ -1,7 +1,6 @@
"""authentik outpost signals""" """authentik outpost signals"""
from django.core.cache import cache from django.core.cache import cache
from django.db.models import Model
from django.db.models.signals import m2m_changed, post_save, pre_delete, pre_save from django.db.models.signals import m2m_changed, post_save, pre_delete, pre_save
from django.dispatch import receiver from django.dispatch import receiver
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
@ -9,7 +8,6 @@ from structlog.stdlib import get_logger
from authentik.brands.models import Brand from authentik.brands.models import Brand
from authentik.core.models import AuthenticatedSession, Provider from authentik.core.models import AuthenticatedSession, Provider
from authentik.crypto.models import CertificateKeyPair from authentik.crypto.models import CertificateKeyPair
from authentik.lib.utils.reflection import class_to_path
from authentik.outposts.models import Outpost, OutpostModel, OutpostServiceConnection from authentik.outposts.models import Outpost, OutpostModel, OutpostServiceConnection
from authentik.outposts.tasks import ( from authentik.outposts.tasks import (
CACHE_KEY_OUTPOST_DOWN, CACHE_KEY_OUTPOST_DOWN,

View File

@ -10,7 +10,6 @@ from urllib.parse import urlparse
from asgiref.sync import async_to_sync from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer from channels.layers import get_channel_layer
from django.core.cache import cache from django.core.cache import cache
from django.db.models.base import Model
from django.utils.text import slugify from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.middleware import CurrentTask from django_dramatiq_postgres.middleware import CurrentTask
@ -22,7 +21,6 @@ from structlog.stdlib import get_logger
from yaml import safe_load from yaml import safe_load
from authentik.lib.config import CONFIG from authentik.lib.config import CONFIG
from authentik.lib.utils.reflection import path_to_class
from authentik.outposts.consumer import OUTPOST_GROUP from authentik.outposts.consumer import OUTPOST_GROUP
from authentik.outposts.controllers.base import BaseController, ControllerException from authentik.outposts.controllers.base import BaseController, ControllerException
from authentik.outposts.controllers.docker import DockerClient from authentik.outposts.controllers.docker import DockerClient
@ -31,7 +29,6 @@ from authentik.outposts.models import (
DockerServiceConnection, DockerServiceConnection,
KubernetesServiceConnection, KubernetesServiceConnection,
Outpost, Outpost,
OutpostModel,
OutpostServiceConnection, OutpostServiceConnection,
OutpostType, OutpostType,
ServiceConnectionInvalid, ServiceConnectionInvalid,

View File

@ -17,8 +17,8 @@ from authentik.core.models import ExpiringModel, PropertyMapping, Provider, User
from authentik.events.models import Event, EventAction from authentik.events.models import Event, EventAction
from authentik.lib.models import SerializerModel from authentik.lib.models import SerializerModel
from authentik.lib.utils.time import timedelta_string_validator from authentik.lib.utils.time import timedelta_string_validator
from authentik.policies.models import PolicyBindingModel
from authentik.outposts.models import OutpostModel from authentik.outposts.models import OutpostModel
from authentik.policies.models import PolicyBindingModel
LOGGER = get_logger() LOGGER = get_logger()

View File

@ -1,7 +1,7 @@
"""Source API Views""" """Source API Views"""
from django.core.cache import cache from django.core.cache import cache
from drf_spectacular.utils import OpenApiResponse, extend_schema from drf_spectacular.utils import extend_schema
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.fields import SerializerMethodField from rest_framework.fields import SerializerMethodField
from rest_framework.request import Request from rest_framework.request import Request

View File

@ -4,7 +4,7 @@ from typing import Any
from django.core.cache import cache from django.core.cache import cache
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from drf_spectacular.utils import OpenApiResponse, extend_schema, inline_serializer from drf_spectacular.utils import extend_schema, inline_serializer
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError from rest_framework.exceptions import ValidationError
from rest_framework.fields import DictField, ListField, SerializerMethodField from rest_framework.fields import DictField, ListField, SerializerMethodField

View File

@ -1,22 +1,16 @@
from http.server import BaseHTTPRequestHandler, HTTPServer import os
from os import getpid from os import getpid
from dramatiq.common import current_millis from pathlib import Path
from prometheus_client import ( from signal import pause
CONTENT_TYPE_LATEST,
CollectorRegistry,
Counter,
Gauge,
Histogram,
generate_latest,
multiprocess,
)
from socket import gethostname from socket import gethostname
from tempfile import gettempdir
from time import sleep from time import sleep
from typing import Any from typing import Any
import pglock import pglock
from django.utils.timezone import now from django.utils.timezone import now
from dramatiq.broker import Broker from dramatiq.broker import Broker
from dramatiq.common import current_millis
from dramatiq.message import Message from dramatiq.message import Message
from dramatiq.middleware import Middleware from dramatiq.middleware import Middleware
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
@ -180,6 +174,11 @@ class MetricsMiddleware(Middleware):
self.delayed_messages = set() self.delayed_messages = set()
self.message_start_times = {} self.message_start_times = {}
_tmp = Path(gettempdir())
prometheus_tmp_dir = str(_tmp.joinpath("authentik_prometheus_tmp"))
os.makedirs(prometheus_tmp_dir, exist_ok=True)
os.environ.setdefault("PROMETHEUS_MULTIPROC_DIR", prometheus_tmp_dir)
@property @property
def forks(self): def forks(self):
from authentik.tasks.forks import worker_metrics from authentik.tasks.forks import worker_metrics
@ -187,43 +186,38 @@ class MetricsMiddleware(Middleware):
return [worker_metrics] return [worker_metrics]
def before_worker_boot(self, broker: Broker, worker): def before_worker_boot(self, broker: Broker, worker):
registry = CollectorRegistry() from prometheus_client import Counter, Gauge, Histogram
self.total_messages = Counter( self.total_messages = Counter(
"authentik_tasks_total", "authentik_tasks_total",
"The total number of tasks processed.", "The total number of tasks processed.",
["queue_name", "actor_name"], ["queue_name", "actor_name"],
registry=registry,
) )
self.total_errored_messages = Counter( self.total_errored_messages = Counter(
"authentik_tasks_errors_total", "authentik_tasks_errors_total",
"The total number of errored tasks.", "The total number of errored tasks.",
["queue_name", "actor_name"], ["queue_name", "actor_name"],
registry=registry,
) )
self.total_retried_messages = Counter( self.total_retried_messages = Counter(
"authentik_tasks_retries_total", "authentik_tasks_retries_total",
"The total number of retried tasks.", "The total number of retried tasks.",
["queue_name", "actor_name"], ["queue_name", "actor_name"],
registry=registry,
) )
self.total_rejected_messages = Counter( self.total_rejected_messages = Counter(
"authentik_tasks_rejected_total", "authentik_tasks_rejected_total",
"The total number of dead-lettered tasks.", "The total number of dead-lettered tasks.",
["queue_name", "actor_name"], ["queue_name", "actor_name"],
registry=registry,
) )
self.inprogress_messages = Gauge( self.inprogress_messages = Gauge(
"authentik_tasks_inprogress", "authentik_tasks_inprogress",
"The number of tasks in progress.", "The number of tasks in progress.",
["queue_name", "actor_name"], ["queue_name", "actor_name"],
multiprocess_mode="livesum", multiprocess_mode="livesum",
registry=registry,
) )
self.inprogress_delayed_messages = Gauge( self.inprogress_delayed_messages = Gauge(
"authentik_tasks_delayed_inprogress", "authentik_tasks_delayed_inprogress",
"The number of delayed tasks in memory.", "The number of delayed tasks in memory.",
["queue_name", "actor_name"], ["queue_name", "actor_name"],
registry=registry,
) )
self.messages_durations = Histogram( self.messages_durations = Histogram(
"authentik_tasks_duration_miliseconds", "authentik_tasks_duration_miliseconds",
@ -252,10 +246,11 @@ class MetricsMiddleware(Middleware):
3_600_000, 3_600_000,
float("inf"), float("inf"),
), ),
registry=registry,
) )
def after_worker_shutdown(self, broker: Broker, worker): def after_worker_shutdown(self, broker: Broker, worker):
from prometheus_client import multiprocess
# TODO: worker_id # TODO: worker_id
multiprocess.mark_process_dead(getpid()) multiprocess.mark_process_dead(getpid())
@ -305,29 +300,18 @@ class MetricsMiddleware(Middleware):
@staticmethod @staticmethod
def run(): def run():
address, _, port = CONFIG.get("listen.listen_metrics").rpartition(":") from prometheus_client import CollectorRegistry, multiprocess, start_http_server
port = 9301
addr, _, port = CONFIG.get("listen.listen_metrics").rpartition(":")
try: try:
httpd = HTTPServer((address, int(port)), _MetricsHandler) port = int(port)
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
start_http_server(port, addr, registry)
except ValueError:
LOGGER.error(f"Invalid port entered: {port}")
except OSError: except OSError:
LOGGER.error(f"Could not listen on {address}:{port}, not starting the metrics server") LOGGER.warning("Port is already in use, not starting metrics server")
return pause()
try:
httpd.serve_forever()
except KeyboardInterrupt:
httpd.shutdown()
class _MetricsHandler(BaseHTTPRequestHandler):
def do_GET(self):
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
output = generate_latest(registry)
self.send_response(200)
self.send_header("Content-Type", CONTENT_TYPE_LATEST)
self.end_headers()
self.wfile.write(output)
def log_message(self, format: str, *args: Any):
logger = get_logger(__name__, type(self))
logger.debug(format, *args)

View File

@ -6,7 +6,7 @@ from dramatiq.errors import ActorNotFound
from drf_spectacular.types import OpenApiTypes from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiResponse, extend_schema from drf_spectacular.utils import OpenApiResponse, extend_schema
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.fields import ChoiceField, ReadOnlyField from rest_framework.fields import ReadOnlyField
from rest_framework.mixins import ( from rest_framework.mixins import (
ListModelMixin, ListModelMixin,
RetrieveModelMixin, RetrieveModelMixin,

View File

@ -1,7 +1,3 @@
import os
from pathlib import Path
from tempfile import gettempdir
from authentik.root.setup import setup from authentik.root.setup import setup
setup() setup()
@ -12,11 +8,6 @@ django.setup()
from authentik.root.signals import post_startup, pre_startup, startup # noqa: E402 from authentik.root.signals import post_startup, pre_startup, startup # noqa: E402
_tmp = Path(gettempdir())
prometheus_tmp_dir = str(_tmp.joinpath("authentik_worker_prometheus_tmp"))
os.makedirs(prometheus_tmp_dir, exist_ok=True)
os.environ.setdefault("PROMETHEUS_MULTIPROC_DIR", prometheus_tmp_dir)
_startup_sender = type("WorkerStartup", (object,), {}) _startup_sender = type("WorkerStartup", (object,), {})
pre_startup.send(sender=_startup_sender) pre_startup.send(sender=_startup_sender)
startup.send(sender=_startup_sender) startup.send(sender=_startup_sender)

View File

@ -2,7 +2,6 @@
import os import os
from hashlib import sha512 from hashlib import sha512
from os import makedirs
from pathlib import Path from pathlib import Path
from tempfile import gettempdir from tempfile import gettempdir
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@ -33,11 +32,11 @@ wait_for_db()
_tmp = Path(gettempdir()) _tmp = Path(gettempdir())
worker_class = "lifecycle.worker.DjangoUvicornWorker" worker_class = "lifecycle.worker.DjangoUvicornWorker"
worker_tmp_dir = str(_tmp.joinpath("authentik_worker_tmp")) worker_tmp_dir = str(_tmp.joinpath("authentik_gunicorn_tmp"))
prometheus_tmp_dir = str(_tmp.joinpath("authentik_prometheus_tmp")) prometheus_tmp_dir = str(_tmp.joinpath("authentik_prometheus_tmp"))
makedirs(worker_tmp_dir, exist_ok=True) os.makedirs(worker_tmp_dir, exist_ok=True)
makedirs(prometheus_tmp_dir, exist_ok=True) os.makedirs(prometheus_tmp_dir, exist_ok=True)
bind = f"unix://{str(_tmp.joinpath('authentik-core.sock'))}" bind = f"unix://{str(_tmp.joinpath('authentik-core.sock'))}"