providers/scim, sources/ldap: switch to using postgres advisory locks instead of redis locks (#9511)
* providers/scim, sources/ldap: switch to using postgres advisory locks instead of redis locks Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> * website/integrations: discord: fix typo Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> * fix timeout logic Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> * remove redis locks completely Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> * Apply suggestions from code review Signed-off-by: Jens L. <jens@beryju.org> --------- Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> Signed-off-by: Jens L. <jens@beryju.org> Co-authored-by: Jens L <jens@goauthentik.io>
This commit is contained in:
committed by
GitHub
parent
fbab822db1
commit
fbad02fac1
@ -3,3 +3,4 @@
|
||||
PAGE_SIZE = 100
|
||||
PAGE_TIMEOUT = 60 * 60 * 0.5 # Half an hour
|
||||
HTTP_CONFLICT = 409
|
||||
LOCK_ACQUIRE_TIMEOUT = 5
|
||||
|
||||
@ -1,11 +1,11 @@
|
||||
from typing import Any, Self
|
||||
|
||||
from django.core.cache import cache
|
||||
import pglock
|
||||
from django.db import connection
|
||||
from django.db.models import Model, QuerySet, TextChoices
|
||||
from redis.lock import Lock
|
||||
|
||||
from authentik.core.models import Group, User
|
||||
from authentik.lib.sync.outgoing import PAGE_TIMEOUT
|
||||
from authentik.lib.sync.outgoing import LOCK_ACQUIRE_TIMEOUT
|
||||
from authentik.lib.sync.outgoing.base import BaseOutgoingSyncClient
|
||||
|
||||
|
||||
@ -32,10 +32,10 @@ class OutgoingSyncProvider(Model):
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def sync_lock(self) -> Lock:
|
||||
"""Redis lock to prevent multiple parallel syncs happening"""
|
||||
return Lock(
|
||||
cache.client.get_client(),
|
||||
name=f"goauthentik.io/providers/outgoing-sync/{str(self.pk)}",
|
||||
timeout=(60 * 60 * PAGE_TIMEOUT) * 3,
|
||||
def sync_lock(self) -> pglock.advisory:
|
||||
"""Postgres lock for syncing SCIM to prevent multiple parallel syncs happening"""
|
||||
return pglock.advisory(
|
||||
lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}",
|
||||
timeout=LOCK_ACQUIRE_TIMEOUT,
|
||||
side_effect=pglock.Raise,
|
||||
)
|
||||
|
||||
@ -4,6 +4,7 @@ from celery.exceptions import Retry
|
||||
from celery.result import allow_join_result
|
||||
from django.core.paginator import Paginator
|
||||
from django.db.models import Model, QuerySet
|
||||
from django.db.utils import OperationalError
|
||||
from django.utils.text import slugify
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from structlog.stdlib import BoundLogger, get_logger
|
||||
@ -64,40 +65,40 @@ class SyncTasks:
|
||||
).first()
|
||||
if not provider:
|
||||
return
|
||||
lock = provider.sync_lock
|
||||
if lock.locked():
|
||||
self.logger.debug("Sync locked, skipping task", source=provider.name)
|
||||
return
|
||||
task.set_uid(slugify(provider.name))
|
||||
messages = []
|
||||
messages.append(_("Starting full provider sync"))
|
||||
self.logger.debug("Starting provider sync")
|
||||
users_paginator = Paginator(provider.get_object_qs(User), PAGE_SIZE)
|
||||
groups_paginator = Paginator(provider.get_object_qs(Group), PAGE_SIZE)
|
||||
with allow_join_result(), lock:
|
||||
try:
|
||||
for page in users_paginator.page_range:
|
||||
messages.append(_("Syncing page %(page)d of users" % {"page": page}))
|
||||
for msg in sync_objects.apply_async(
|
||||
args=(class_to_path(User), page, provider_pk),
|
||||
time_limit=PAGE_TIMEOUT,
|
||||
soft_time_limit=PAGE_TIMEOUT,
|
||||
).get():
|
||||
messages.append(msg)
|
||||
for page in groups_paginator.page_range:
|
||||
messages.append(_("Syncing page %(page)d of groups" % {"page": page}))
|
||||
for msg in sync_objects.apply_async(
|
||||
args=(class_to_path(Group), page, provider_pk),
|
||||
time_limit=PAGE_TIMEOUT,
|
||||
soft_time_limit=PAGE_TIMEOUT,
|
||||
).get():
|
||||
messages.append(msg)
|
||||
except TransientSyncException as exc:
|
||||
self.logger.warning("transient sync exception", exc=exc)
|
||||
raise task.retry(exc=exc) from exc
|
||||
except StopSync as exc:
|
||||
task.set_error(exc)
|
||||
return
|
||||
try:
|
||||
with allow_join_result(), provider.sync_lock:
|
||||
try:
|
||||
for page in users_paginator.page_range:
|
||||
messages.append(_("Syncing page %(page)d of users" % {"page": page}))
|
||||
for msg in sync_objects.apply_async(
|
||||
args=(class_to_path(User), page, provider_pk),
|
||||
time_limit=PAGE_TIMEOUT,
|
||||
soft_time_limit=PAGE_TIMEOUT,
|
||||
).get():
|
||||
messages.append(msg)
|
||||
for page in groups_paginator.page_range:
|
||||
messages.append(_("Syncing page %(page)d of groups" % {"page": page}))
|
||||
for msg in sync_objects.apply_async(
|
||||
args=(class_to_path(Group), page, provider_pk),
|
||||
time_limit=PAGE_TIMEOUT,
|
||||
soft_time_limit=PAGE_TIMEOUT,
|
||||
).get():
|
||||
messages.append(msg)
|
||||
except TransientSyncException as exc:
|
||||
self.logger.warning("transient sync exception", exc=exc)
|
||||
raise task.retry(exc=exc) from exc
|
||||
except StopSync as exc:
|
||||
task.set_error(exc)
|
||||
return
|
||||
except OperationalError:
|
||||
self.logger.debug("Failed to acquire sync lock, skipping", provider=provider.name)
|
||||
return
|
||||
task.set_status(TaskStatus.SUCCESSFUL, *messages)
|
||||
|
||||
def sync_objects(self, object_type: str, page: int, provider_pk: int):
|
||||
|
||||
Reference in New Issue
Block a user