diff --git a/authentik/root/settings.py b/authentik/root/settings.py index 03a34b8e0f..ce61b13b8e 100644 --- a/authentik/root/settings.py +++ b/authentik/root/settings.py @@ -380,7 +380,6 @@ DRAMATIQ = { CONFIG.get("worker.scheduler_interval") ).total_seconds(), "middlewares": ( - # ("django_dramatiq_postgres.middleware.SchedulerMiddleware", {}), ("django_dramatiq_postgres.middleware.FullyQualifiedActorName", {}), # TODO: fixme # ("dramatiq.middleware.prometheus.Prometheus", {}), diff --git a/authentik/tasks/schedules/scheduler.py b/authentik/tasks/schedules/scheduler.py index da38251f1c..d6aca6ef54 100644 --- a/authentik/tasks/schedules/scheduler.py +++ b/authentik/tasks/schedules/scheduler.py @@ -27,4 +27,3 @@ class Scheduler(SchedulerBase): return count = self._run() self.logger.info(f"Sent {count} scheduled tasks") - sleep(Conf().scheduler_interval) diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py index 3c114fd4e7..e368eb024f 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py @@ -236,6 +236,13 @@ class _PostgresConsumer(Consumer): # Override because dramatiq doesn't allow us setting this manually self.timeout = Conf().worker["consumer_listen_timeout"] + self.scheduler = None + if Conf().schedule_model: + self.scheduler = import_string(Conf().scheduler_class)() + self.scheduler.broker = self.broker + self.scheduler_interval = timezone.timedelta(seconds=Conf().scheduler_interval) + self.scheduler_last_run = timezone.now() - self.scheduler_interval + @property def connection(self) -> DatabaseWrapper: return connections[self.db_alias] @@ -391,6 +398,7 @@ class _PostgresConsumer(Consumer): # No message to process self._purge_locks() self._auto_purge() + self._scheduler() def _purge_locks(self): while True: @@ -419,3 +427,10 @@ class _PostgresConsumer(Consumer): result_expiry__lte=timezone.now(), ).delete() self.logger.info(f"Purged {count} messages in all queues") + + def _scheduler(self): + if not self.scheduler: + return + if timezone.now() - self.scheduler_last_run < self.scheduler_interval: + return + self.scheduler.run() diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/middleware.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/middleware.py index 49370af8b7..f7083045c0 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/middleware.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/middleware.py @@ -102,19 +102,3 @@ class CurrentTask(Middleware): def after_skip_message(self, broker: Broker, message: Message): self.after_process_message(broker, message) - - -class SchedulerMiddleware(Middleware): - def __init__(self): - self.logger = get_logger(__name__, type(self)) - - if not Conf().schedule_model: - raise ImproperlyConfigured( - "When using the scheduler, DRAMATIQ.schedule_class must be set." - ) - - self.scheduler: Scheduler = import_string(Conf().scheduler_class)() - - def after_process_boot(self, broker: Broker): - self.scheduler.broker = broker - self.scheduler.start() diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py index 6285d5e44f..fd820ed05a 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py @@ -14,7 +14,7 @@ from django_dramatiq_postgres.conf import Conf from django_dramatiq_postgres.models import ScheduleBase -class Scheduler(Thread): +class Scheduler: broker: Broker def __init__(self, *args, **kwargs): @@ -58,4 +58,3 @@ class Scheduler(Thread): return count = self._run() self.logger.info(f"Sent {count} scheduled tasks") - sleep(Conf().scheduler_interval)