From 13e1e446263160503aff215a5812f55894184776 Mon Sep 17 00:00:00 2001 From: Marc 'risson' Schmitt Date: Fri, 20 Jun 2025 19:01:34 +0200 Subject: [PATCH] cleanup scheduler Signed-off-by: Marc 'risson' Schmitt --- authentik/tasks/schedules/scheduler.py | 2 +- .../django_dramatiq_postgres/broker.py | 2 +- .../django_dramatiq_postgres/middleware.py | 3 +-- .../django_dramatiq_postgres/scheduler.py | 18 ++++++++---------- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/authentik/tasks/schedules/scheduler.py b/authentik/tasks/schedules/scheduler.py index 6d97fcee3c..da38251f1c 100644 --- a/authentik/tasks/schedules/scheduler.py +++ b/authentik/tasks/schedules/scheduler.py @@ -27,4 +27,4 @@ class Scheduler(SchedulerBase): return count = self._run() self.logger.info(f"Sent {count} scheduled tasks") - sleep(Conf().scheduler_interval) + sleep(Conf().scheduler_interval) diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py index 6d59ed8213..a782ba745c 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py @@ -405,7 +405,7 @@ class _PostgresConsumer(Consumer): # Automatically purge messages on average every n iterations. # We manually set the timeout to 30s, so we need to divide by 30 to # get the number of actual iterations. - iterations = Conf().task_purge_interval // 30 + iterations = int(Conf().task_purge_interval / 30) if randint(0, iterations): # nosec return self.logger.debug("Running garbage collector") diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/middleware.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/middleware.py index 9ce10525e4..3a39c5e69d 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/middleware.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/middleware.py @@ -108,8 +108,7 @@ class SchedulerMiddleware(Middleware): "When using the scheduler, DRAMATIQ.schedule_class must be set." ) - self.scheduler_stop_event = Event() - self.scheduler: Scheduler = import_string(Conf().scheduler_class)(self.scheduler_stop_event) + self.scheduler: Scheduler = import_string(Conf().scheduler_class)() def after_process_boot(self, broker: Broker): self.scheduler.broker = broker diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py index 4b6053f076..6285d5e44f 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py @@ -17,9 +17,8 @@ from django_dramatiq_postgres.models import ScheduleBase class Scheduler(Thread): broker: Broker - def __init__(self, stop_event: Event, *args, **kwargs): + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.stop_event = stop_event self.logger = get_logger(__name__, type(self)) @cached_property @@ -53,11 +52,10 @@ class Scheduler(Thread): return count def run(self): - while not self.stop_event.is_set(): - with self._lock() as lock_acquired: - if not lock_acquired: - self.logger.debug("Could not acquire lock, skipping scheduling") - return - count = self._run() - self.logger.info(f"Sent {count} scheduled tasks") - sleep(Conf().scheduler_interval) + with self._lock() as lock_acquired: + if not lock_acquired: + self.logger.debug("Could not acquire lock, skipping scheduling") + return + count = self._run() + self.logger.info(f"Sent {count} scheduled tasks") + sleep(Conf().scheduler_interval)