run scheduler in broker
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
@ -380,7 +380,6 @@ DRAMATIQ = {
|
||||
CONFIG.get("worker.scheduler_interval")
|
||||
).total_seconds(),
|
||||
"middlewares": (
|
||||
# ("django_dramatiq_postgres.middleware.SchedulerMiddleware", {}),
|
||||
("django_dramatiq_postgres.middleware.FullyQualifiedActorName", {}),
|
||||
# TODO: fixme
|
||||
# ("dramatiq.middleware.prometheus.Prometheus", {}),
|
||||
|
@ -27,4 +27,3 @@ class Scheduler(SchedulerBase):
|
||||
return
|
||||
count = self._run()
|
||||
self.logger.info(f"Sent {count} scheduled tasks")
|
||||
sleep(Conf().scheduler_interval)
|
||||
|
@ -236,6 +236,13 @@ class _PostgresConsumer(Consumer):
|
||||
# Override because dramatiq doesn't allow us setting this manually
|
||||
self.timeout = Conf().worker["consumer_listen_timeout"]
|
||||
|
||||
self.scheduler = None
|
||||
if Conf().schedule_model:
|
||||
self.scheduler = import_string(Conf().scheduler_class)()
|
||||
self.scheduler.broker = self.broker
|
||||
self.scheduler_interval = timezone.timedelta(seconds=Conf().scheduler_interval)
|
||||
self.scheduler_last_run = timezone.now() - self.scheduler_interval
|
||||
|
||||
@property
|
||||
def connection(self) -> DatabaseWrapper:
|
||||
return connections[self.db_alias]
|
||||
@ -391,6 +398,7 @@ class _PostgresConsumer(Consumer):
|
||||
# No message to process
|
||||
self._purge_locks()
|
||||
self._auto_purge()
|
||||
self._scheduler()
|
||||
|
||||
def _purge_locks(self):
|
||||
while True:
|
||||
@ -419,3 +427,10 @@ class _PostgresConsumer(Consumer):
|
||||
result_expiry__lte=timezone.now(),
|
||||
).delete()
|
||||
self.logger.info(f"Purged {count} messages in all queues")
|
||||
|
||||
def _scheduler(self):
|
||||
if not self.scheduler:
|
||||
return
|
||||
if timezone.now() - self.scheduler_last_run < self.scheduler_interval:
|
||||
return
|
||||
self.scheduler.run()
|
||||
|
@ -102,19 +102,3 @@ class CurrentTask(Middleware):
|
||||
|
||||
def after_skip_message(self, broker: Broker, message: Message):
|
||||
self.after_process_message(broker, message)
|
||||
|
||||
|
||||
class SchedulerMiddleware(Middleware):
|
||||
def __init__(self):
|
||||
self.logger = get_logger(__name__, type(self))
|
||||
|
||||
if not Conf().schedule_model:
|
||||
raise ImproperlyConfigured(
|
||||
"When using the scheduler, DRAMATIQ.schedule_class must be set."
|
||||
)
|
||||
|
||||
self.scheduler: Scheduler = import_string(Conf().scheduler_class)()
|
||||
|
||||
def after_process_boot(self, broker: Broker):
|
||||
self.scheduler.broker = broker
|
||||
self.scheduler.start()
|
||||
|
@ -14,7 +14,7 @@ from django_dramatiq_postgres.conf import Conf
|
||||
from django_dramatiq_postgres.models import ScheduleBase
|
||||
|
||||
|
||||
class Scheduler(Thread):
|
||||
class Scheduler:
|
||||
broker: Broker
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
@ -58,4 +58,3 @@ class Scheduler(Thread):
|
||||
return
|
||||
count = self._run()
|
||||
self.logger.info(f"Sent {count} scheduled tasks")
|
||||
sleep(Conf().scheduler_interval)
|
||||
|
Reference in New Issue
Block a user