Compare commits

...

1 Commits

Author SHA1 Message Date
7d0abbf072 core, events: reduce memory usage when batch deleting objects
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2024-12-20 11:57:53 +01:00
3 changed files with 30 additions and 4 deletions

View File

@ -18,6 +18,7 @@ from authentik.core.models import (
)
from authentik.events.system_tasks import SystemTask, TaskStatus, prefill_task
from authentik.lib.config import CONFIG
from authentik.lib.utils.db import qs_batch_iter
from authentik.root.celery import CELERY_APP
LOGGER = get_logger()
@ -34,14 +35,14 @@ def clean_expired_models(self: SystemTask):
cls.objects.all().exclude(expiring=False).exclude(expiring=True, expires__gt=now())
)
amount = objects.count()
for obj in objects:
for obj in qs_batch_iter(objects):
obj.expire_action()
LOGGER.debug("Expired models", model=cls, amount=amount)
messages.append(f"Expired {amount} {cls._meta.verbose_name_plural}")
# Special case
amount = 0
for session in AuthenticatedSession.objects.all():
for session in qs_batch_iter(AuthenticatedSession.objects.all()):
match CONFIG.get("session_storage", "cache"):
case "cache":
cache_key = f"{KEY_PREFIX}{session.session_key}"

View File

@ -15,6 +15,7 @@ from authentik.events.models import (
TaskStatus,
)
from authentik.events.system_tasks import SystemTask, prefill_task
from authentik.lib.utils.db import qs_batch_iter
from authentik.policies.engine import PolicyEngine
from authentik.policies.models import PolicyBinding, PolicyEngineMode
from authentik.root.celery import CELERY_APP
@ -129,7 +130,8 @@ def gdpr_cleanup(user_pk: int):
"""cleanup events from gdpr_compliance"""
events = Event.objects.filter(user__pk=user_pk)
LOGGER.debug("GDPR cleanup, removing events from user", events=events.count())
events.delete()
for event in qs_batch_iter(events):
event.delete()
@CELERY_APP.task(bind=True, base=SystemTask)
@ -138,6 +140,7 @@ def notification_cleanup(self: SystemTask):
"""Cleanup seen notifications and notifications whose event expired."""
notifications = Notification.objects.filter(Q(event=None) | Q(seen=True))
amount = notifications.count()
notifications.delete()
for notification in qs_batch_iter(notifications):
notification.delete()
LOGGER.debug("Expired notifications", amount=amount)
self.set_status(TaskStatus.SUCCESSFUL, f"Expired {amount} Notifications")

22
authentik/lib/utils/db.py Normal file
View File

@ -0,0 +1,22 @@
"""authentik database utilities"""
import gc
from django.db.models import QuerySet
def qs_batch_iter(qs: QuerySet, batch_size: int = 10_000, gc_collect: bool = True):
pk_iter = qs.values_list("pk", flat=True).order_by("pk").distinct().iterator()
eof = False
while not eof:
pk_buffer = []
i = 0
try:
while i < batch_size:
pk_buffer.append(pk_iter.next())
i += 1
except StopIteration:
eof = True
yield from qs.filter(pk__in=pk_buffer).order_by("pk").iterator()
if gc_collect:
gc.collect()