From fbad02fac1ff898e4b1eb008e2854e74e5cc1ee6 Mon Sep 17 00:00:00 2001 From: Marc 'risson' Schmitt Date: Thu, 23 May 2024 13:41:42 +0200 Subject: [PATCH] providers/scim, sources/ldap: switch to using postgres advisory locks instead of redis locks (#9511) * providers/scim, sources/ldap: switch to using postgres advisory locks instead of redis locks Signed-off-by: Marc 'risson' Schmitt * website/integrations: discord: fix typo Signed-off-by: Marc 'risson' Schmitt * fix timeout logic Signed-off-by: Marc 'risson' Schmitt * remove redis locks completely Signed-off-by: Marc 'risson' Schmitt * Apply suggestions from code review Signed-off-by: Jens L. --------- Signed-off-by: Marc 'risson' Schmitt Signed-off-by: Jens L. Co-authored-by: Jens L --- authentik/lib/sync/outgoing/__init__.py | 1 + authentik/lib/sync/outgoing/models.py | 18 ++++---- authentik/lib/sync/outgoing/tasks.py | 57 +++++++++++++------------ authentik/root/settings.py | 2 + authentik/sources/ldap/models.py | 19 ++++----- authentik/sources/ldap/tasks.py | 14 ++---- poetry.lock | 33 +++++++++++++- pyproject.toml | 1 + 8 files changed, 85 insertions(+), 60 deletions(-) diff --git a/authentik/lib/sync/outgoing/__init__.py b/authentik/lib/sync/outgoing/__init__.py index 1005a6b242..148a20f5bd 100644 --- a/authentik/lib/sync/outgoing/__init__.py +++ b/authentik/lib/sync/outgoing/__init__.py @@ -3,3 +3,4 @@ PAGE_SIZE = 100 PAGE_TIMEOUT = 60 * 60 * 0.5 # Half an hour HTTP_CONFLICT = 409 +LOCK_ACQUIRE_TIMEOUT = 5 diff --git a/authentik/lib/sync/outgoing/models.py b/authentik/lib/sync/outgoing/models.py index 1e4c598c2d..f792e11ab0 100644 --- a/authentik/lib/sync/outgoing/models.py +++ b/authentik/lib/sync/outgoing/models.py @@ -1,11 +1,11 @@ from typing import Any, Self -from django.core.cache import cache +import pglock +from django.db import connection from django.db.models import Model, QuerySet, TextChoices -from redis.lock import Lock from authentik.core.models import Group, User -from authentik.lib.sync.outgoing import PAGE_TIMEOUT +from authentik.lib.sync.outgoing import LOCK_ACQUIRE_TIMEOUT from authentik.lib.sync.outgoing.base import BaseOutgoingSyncClient @@ -32,10 +32,10 @@ class OutgoingSyncProvider(Model): raise NotImplementedError @property - def sync_lock(self) -> Lock: - """Redis lock to prevent multiple parallel syncs happening""" - return Lock( - cache.client.get_client(), - name=f"goauthentik.io/providers/outgoing-sync/{str(self.pk)}", - timeout=(60 * 60 * PAGE_TIMEOUT) * 3, + def sync_lock(self) -> pglock.advisory: + """Postgres lock for syncing SCIM to prevent multiple parallel syncs happening""" + return pglock.advisory( + lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}", + timeout=LOCK_ACQUIRE_TIMEOUT, + side_effect=pglock.Raise, ) diff --git a/authentik/lib/sync/outgoing/tasks.py b/authentik/lib/sync/outgoing/tasks.py index bf23f30844..d84705bd30 100644 --- a/authentik/lib/sync/outgoing/tasks.py +++ b/authentik/lib/sync/outgoing/tasks.py @@ -4,6 +4,7 @@ from celery.exceptions import Retry from celery.result import allow_join_result from django.core.paginator import Paginator from django.db.models import Model, QuerySet +from django.db.utils import OperationalError from django.utils.text import slugify from django.utils.translation import gettext_lazy as _ from structlog.stdlib import BoundLogger, get_logger @@ -64,40 +65,40 @@ class SyncTasks: ).first() if not provider: return - lock = provider.sync_lock - if lock.locked(): - self.logger.debug("Sync locked, skipping task", source=provider.name) - return task.set_uid(slugify(provider.name)) messages = [] messages.append(_("Starting full provider sync")) self.logger.debug("Starting provider sync") users_paginator = Paginator(provider.get_object_qs(User), PAGE_SIZE) groups_paginator = Paginator(provider.get_object_qs(Group), PAGE_SIZE) - with allow_join_result(), lock: - try: - for page in users_paginator.page_range: - messages.append(_("Syncing page %(page)d of users" % {"page": page})) - for msg in sync_objects.apply_async( - args=(class_to_path(User), page, provider_pk), - time_limit=PAGE_TIMEOUT, - soft_time_limit=PAGE_TIMEOUT, - ).get(): - messages.append(msg) - for page in groups_paginator.page_range: - messages.append(_("Syncing page %(page)d of groups" % {"page": page})) - for msg in sync_objects.apply_async( - args=(class_to_path(Group), page, provider_pk), - time_limit=PAGE_TIMEOUT, - soft_time_limit=PAGE_TIMEOUT, - ).get(): - messages.append(msg) - except TransientSyncException as exc: - self.logger.warning("transient sync exception", exc=exc) - raise task.retry(exc=exc) from exc - except StopSync as exc: - task.set_error(exc) - return + try: + with allow_join_result(), provider.sync_lock: + try: + for page in users_paginator.page_range: + messages.append(_("Syncing page %(page)d of users" % {"page": page})) + for msg in sync_objects.apply_async( + args=(class_to_path(User), page, provider_pk), + time_limit=PAGE_TIMEOUT, + soft_time_limit=PAGE_TIMEOUT, + ).get(): + messages.append(msg) + for page in groups_paginator.page_range: + messages.append(_("Syncing page %(page)d of groups" % {"page": page})) + for msg in sync_objects.apply_async( + args=(class_to_path(Group), page, provider_pk), + time_limit=PAGE_TIMEOUT, + soft_time_limit=PAGE_TIMEOUT, + ).get(): + messages.append(msg) + except TransientSyncException as exc: + self.logger.warning("transient sync exception", exc=exc) + raise task.retry(exc=exc) from exc + except StopSync as exc: + task.set_error(exc) + return + except OperationalError: + self.logger.debug("Failed to acquire sync lock, skipping", provider=provider.name) + return task.set_status(TaskStatus.SUCCESSFUL, *messages) def sync_objects(self, object_type: str, page: int, provider_pk: int): diff --git a/authentik/root/settings.py b/authentik/root/settings.py index 2b416cf95f..15e689b06e 100644 --- a/authentik/root/settings.py +++ b/authentik/root/settings.py @@ -60,6 +60,8 @@ SHARED_APPS = [ "django_filters", "drf_spectacular", "django_prometheus", + "pgactivity", + "pglock", "channels", ] TENANT_APPS = [ diff --git a/authentik/sources/ldap/models.py b/authentik/sources/ldap/models.py index 587794ae04..23e7dc2777 100644 --- a/authentik/sources/ldap/models.py +++ b/authentik/sources/ldap/models.py @@ -6,19 +6,19 @@ from shutil import rmtree from ssl import CERT_REQUIRED from tempfile import NamedTemporaryFile, mkdtemp -from django.core.cache import cache +import pglock from django.db import connection, models from django.templatetags.static import static from django.utils.translation import gettext_lazy as _ from ldap3 import ALL, NONE, RANDOM, Connection, Server, ServerPool, Tls from ldap3.core.exceptions import LDAPException, LDAPInsufficientAccessRightsResult, LDAPSchemaError -from redis.lock import Lock from rest_framework.serializers import Serializer from authentik.core.models import Group, PropertyMapping, Source from authentik.crypto.models import CertificateKeyPair from authentik.lib.config import CONFIG from authentik.lib.models import DomainlessURLValidator +from authentik.lib.sync.outgoing import LOCK_ACQUIRE_TIMEOUT LDAP_TIMEOUT = 15 @@ -209,15 +209,12 @@ class LDAPSource(Source): return RuntimeError("Failed to bind") @property - def sync_lock(self) -> Lock: - """Redis lock for syncing LDAP to prevent multiple parallel syncs happening""" - return Lock( - cache.client.get_client(), - name=f"goauthentik.io/sources/ldap/sync/{connection.schema_name}-{self.slug}", - # Convert task timeout hours to seconds, and multiply times 3 - # (see authentik/sources/ldap/tasks.py:54) - # multiply by 3 to add even more leeway - timeout=(60 * 60 * CONFIG.get_int("ldap.task_timeout_hours")) * 3, + def sync_lock(self) -> pglock.advisory: + """Postgres lock for syncing LDAP to prevent multiple parallel syncs happening""" + return pglock.advisory( + lock_id=f"goauthentik.io/{connection.schema_name}/sources/ldap/sync/{self.slug}", + timeout=LOCK_ACQUIRE_TIMEOUT, + side_effect=pglock.Raise, ) def check_connection(self) -> dict[str, dict[str, str]]: diff --git a/authentik/sources/ldap/tasks.py b/authentik/sources/ldap/tasks.py index 356ffa9f40..540f10cbaf 100644 --- a/authentik/sources/ldap/tasks.py +++ b/authentik/sources/ldap/tasks.py @@ -4,8 +4,8 @@ from uuid import uuid4 from celery import chain, group from django.core.cache import cache +from django.db.utils import OperationalError from ldap3.core.exceptions import LDAPException -from redis.exceptions import LockError from structlog.stdlib import get_logger from authentik.events.models import SystemTask as DBSystemTask @@ -64,12 +64,8 @@ def ldap_sync_single(source_pk: str): source: LDAPSource = LDAPSource.objects.filter(pk=source_pk).first() if not source: return - lock = source.sync_lock - if lock.locked(): - LOGGER.debug("LDAP sync locked, skipping task", source=source.slug) - return try: - with lock: + with source.sync_lock: # Delete all sync tasks from the cache DBSystemTask.objects.filter(name="ldap_sync", uid__startswith=source.slug).delete() task = chain( @@ -84,10 +80,8 @@ def ldap_sync_single(source_pk: str): ), ) task() - except LockError: - # This should never happen, we check if the lock is locked above so this - # would only happen if there was some other timeout - LOGGER.debug("Failed to acquire lock for LDAP sync", source=source.slug) + except OperationalError: + LOGGER.debug("Failed to acquire lock for LDAP sync, skipping task", source=source.slug) def ldap_sync_paginator(source: LDAPSource, sync: type[BaseLDAPSynchronizer]) -> list: diff --git a/poetry.lock b/poetry.lock index 84cfd953e0..0cd2fb9012 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "aiohttp" @@ -1236,6 +1236,35 @@ files = [ [package.dependencies] Django = ">=3.2" +[[package]] +name = "django-pgactivity" +version = "1.4.1" +description = "Monitor, kill, and analyze Postgres queries." +optional = false +python-versions = "<4,>=3.8.0" +files = [ + {file = "django_pgactivity-1.4.1-py3-none-any.whl", hash = "sha256:e7affa4dc08e7650092a582375729081362a3103f1148e34e8406ddf114eeb95"}, + {file = "django_pgactivity-1.4.1.tar.gz", hash = "sha256:00da0f0156daa37f5f113c7a6d9378a6f6d111e44f20d3b30b367d5428e18b07"}, +] + +[package.dependencies] +django = ">=3" + +[[package]] +name = "django-pglock" +version = "1.5.1" +description = "Postgres locking routines and lock table access." +optional = false +python-versions = "<4,>=3.8.0" +files = [ + {file = "django_pglock-1.5.1-py3-none-any.whl", hash = "sha256:d3b977922abbaffd43968714b69cdab7453866adf2b0695fb497491748d7bc67"}, + {file = "django_pglock-1.5.1.tar.gz", hash = "sha256:291903d5d877b68558003e1d64d764ebd5590344ba3b7aa1d5127df5947869b1"}, +] + +[package.dependencies] +django = ">=3" +django-pgactivity = ">=1.2,<2" + [[package]] name = "django-prometheus" version = "2.3.1" @@ -5303,4 +5332,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "~3.12" -content-hash = "6399c90a2adca3e7119277bf4d0649fe0826d5fb4454a23b1b1fad3e64a1fe90" +content-hash = "112c777b6cf6bec7583f3994cd1fa0165d046d09a2f378990ba6bb626f9739ca" diff --git a/pyproject.toml b/pyproject.toml index 967576b11d..809721751a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,6 +97,7 @@ django = "*" django-filter = "*" django-guardian = "*" django-model-utils = "*" +django-pglock = "*" django-prometheus = "*" django-redis = "*" django-storages = { extras = ["s3"], version = "*" }