I love deleting code

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens Langhammer
2025-06-06 03:58:43 +02:00
parent b046181947
commit 105524651c

View File

@ -1,7 +1,5 @@
"""authentik events models""" """authentik events models"""
import time
from collections import Counter
from datetime import timedelta from datetime import timedelta
from difflib import get_close_matches from difflib import get_close_matches
from functools import lru_cache from functools import lru_cache
@ -11,11 +9,6 @@ from uuid import uuid4
from django.apps import apps from django.apps import apps
from django.db import connection, models from django.db import connection, models
from django.db.models import Count, ExpressionWrapper, F
from django.db.models.fields import DurationField
from django.db.models.functions import Extract
from django.db.models.manager import Manager
from django.db.models.query import QuerySet
from django.http import HttpRequest from django.http import HttpRequest
from django.http.request import QueryDict from django.http.request import QueryDict
from django.utils.timezone import now from django.utils.timezone import now
@ -124,60 +117,6 @@ class EventAction(models.TextChoices):
CUSTOM_PREFIX = "custom_" CUSTOM_PREFIX = "custom_"
class EventQuerySet(QuerySet):
"""Custom events query set with helper functions"""
def get_events_per(
self,
time_since: timedelta,
extract: Extract,
data_points: int,
) -> list[dict[str, int]]:
"""Get event count by hour in the last day, fill with zeros"""
_now = now()
max_since = timedelta(days=60)
# Allow maximum of 60 days to limit load
if time_since.total_seconds() > max_since.total_seconds():
time_since = max_since
date_from = _now - time_since
result = (
self.filter(created__gte=date_from)
.annotate(age=ExpressionWrapper(_now - F("created"), output_field=DurationField()))
.annotate(age_interval=extract("age"))
.values("age_interval")
.annotate(count=Count("pk"))
.order_by("age_interval")
)
data = Counter({int(d["age_interval"]): d["count"] for d in result})
results = []
interval_delta = time_since / data_points
for interval in range(1, -data_points, -1):
results.append(
{
"x_cord": time.mktime((_now + (interval_delta * interval)).timetuple()) * 1000,
"y_cord": data[interval * -1],
}
)
return results
class EventManager(Manager):
"""Custom helper methods for Events"""
def get_queryset(self) -> QuerySet:
"""use custom queryset"""
return EventQuerySet(self.model, using=self._db)
def get_events_per(
self,
time_since: timedelta,
extract: Extract,
data_points: int,
) -> list[dict[str, int]]:
"""Wrap method from queryset"""
return self.get_queryset().get_events_per(time_since, extract, data_points)
class Event(SerializerModel, ExpiringModel): class Event(SerializerModel, ExpiringModel):
"""An individual Audit/Metrics/Notification/Error Event""" """An individual Audit/Metrics/Notification/Error Event"""
@ -193,8 +132,6 @@ class Event(SerializerModel, ExpiringModel):
# Shadow the expires attribute from ExpiringModel to override the default duration # Shadow the expires attribute from ExpiringModel to override the default duration
expires = models.DateTimeField(default=default_event_duration) expires = models.DateTimeField(default=default_event_duration)
objects = EventManager()
@staticmethod @staticmethod
def _get_app_from_request(request: HttpRequest) -> str: def _get_app_from_request(request: HttpRequest) -> str:
if not isinstance(request, HttpRequest): if not isinstance(request, HttpRequest):