fix logs when task fails, make more options configurable

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
Marc 'risson' Schmitt
2025-06-23 16:40:22 +02:00
parent 7e7b33dba7
commit 2509ccde1c
5 changed files with 67 additions and 45 deletions

View File

@ -160,6 +160,8 @@ worker:
processes: 2 processes: 2
threads: 1 threads: 1
consumer_listen_timeout: "seconds=30" consumer_listen_timeout: "seconds=30"
task_max_retries: 20
task_default_time_limit: "minutes=10"
task_purge_interval: "days=1" task_purge_interval: "days=1"
task_expiration: "days=30" task_expiration: "days=30"
scheduler_interval: "seconds=60" scheduler_interval: "seconds=60"

View File

@ -380,23 +380,27 @@ DRAMATIQ = {
CONFIG.get("worker.scheduler_interval") CONFIG.get("worker.scheduler_interval")
).total_seconds(), ).total_seconds(),
"middlewares": ( "middlewares": (
("django_dramatiq_postgres.middleware.SchedulerMiddleware", {}), # ("django_dramatiq_postgres.middleware.SchedulerMiddleware", {}),
("django_dramatiq_postgres.middleware.FullyQualifiedActorName", {}), ("django_dramatiq_postgres.middleware.FullyQualifiedActorName", {}),
# TODO: fixme # TODO: fixme
# ("dramatiq.middleware.prometheus.Prometheus", {}), # ("dramatiq.middleware.prometheus.Prometheus", {}),
("django_dramatiq_postgres.middleware.DbConnectionMiddleware", {}), ("django_dramatiq_postgres.middleware.DbConnectionMiddleware", {}),
("dramatiq.middleware.age_limit.AgeLimit", {}), ("dramatiq.middleware.age_limit.AgeLimit", {}),
( (
# 5 minutes task timeout by default for all tasks, in ms
"dramatiq.middleware.time_limit.TimeLimit", "dramatiq.middleware.time_limit.TimeLimit",
{"time_limit": 600_000}, {
"time_limit": timedelta_from_string(
CONFIG.get("worker.task_default_time_limit")
).total_seconds()
* 1000
},
), ),
("dramatiq.middleware.shutdown.ShutdownNotifications", {}), ("dramatiq.middleware.shutdown.ShutdownNotifications", {}),
("dramatiq.middleware.callbacks.Callbacks", {}), ("dramatiq.middleware.callbacks.Callbacks", {}),
("dramatiq.middleware.pipelines.Pipelines", {}), ("dramatiq.middleware.pipelines.Pipelines", {}),
( (
"dramatiq.middleware.retries.Retries", "dramatiq.middleware.retries.Retries",
{"max_retries": 20 if not TEST else 0}, {"max_retries": CONFIG.get_int("worker.task_max_retries") if not TEST else 0},
), ),
# TODO: results # TODO: results
("django_dramatiq_postgres.middleware.CurrentTask", {}), ("django_dramatiq_postgres.middleware.CurrentTask", {}),

View File

@ -1,5 +1,6 @@
from typing import Any from typing import Any
from dramatiq import get_logger
from dramatiq.broker import Broker from dramatiq.broker import Broker
from dramatiq.message import Message from dramatiq.message import Message
from dramatiq.middleware import Middleware from dramatiq.middleware import Middleware
@ -35,21 +36,23 @@ class RelObjMiddleware(Middleware):
class LoggingMiddleware(Middleware): class LoggingMiddleware(Middleware):
def before_enqueue(self, broker: Broker, message: Message, delay: int): def after_enqueue(self, broker: Broker, message: Message, delay: int):
message.options["model_defaults"]["_messages"] = [ task: Task = message.options["task"]
task_created: bool = message.options["task_created"]
task._messages.append(
Task._make_message( Task._make_message(
str(type(self)), str(type(self)),
TaskStatus.INFO, TaskStatus.INFO,
"Task is being queued", "Task is being queued" if task_created else "Task is being retried",
delay=delay, delay=delay,
) )
] )
task.save(update_fields=("_messages",))
def before_process_message(self, broker: Broker, message: Message): def before_process_message(self, broker: Broker, message: Message):
task: Task = message.options["task"] task: Task = message.options["task"]
task.log(str(type(self)), TaskStatus.INFO, "Task is being processed") task.log(str(type(self)), TaskStatus.INFO, "Task is being processed")
# TODO: also after_skip_message
def after_process_message( def after_process_message(
self, self,
broker: Broker, broker: Broker,
@ -74,6 +77,10 @@ class LoggingMiddleware(Middleware):
"{exception_to_string(exception)}", "{exception_to_string(exception)}",
).save() ).save()
def after_skip_message(self, broker: Broker, message: Message):
task: Task = message.options["task"]
task.log(str(type(self)), TaskStatus.INFO, "Task has been skipped")
class DescriptionMiddleware(Middleware): class DescriptionMiddleware(Middleware):
@property @property

View File

@ -14,6 +14,7 @@ from django.db import (
InterfaceError, InterfaceError,
OperationalError, OperationalError,
connections, connections,
transaction,
) )
from django.db.backends.postgresql.base import DatabaseWrapper from django.db.backends.postgresql.base import DatabaseWrapper
from django.db.models import QuerySet from django.db.models import QuerySet
@ -152,6 +153,7 @@ class PostgresBroker(Broker):
message.options["model_defaults"] = self.model_defaults(message) message.options["model_defaults"] = self.model_defaults(message)
self.emit_before("enqueue", message, delay) self.emit_before("enqueue", message, delay)
with transaction.atomic(using=self.db_alias):
query = { query = {
"message_id": message.message_id, "message_id": message.message_id,
} }
@ -162,11 +164,13 @@ class PostgresBroker(Broker):
**defaults, **defaults,
} }
self.query_set.update_or_create( task, created = self.query_set.update_or_create(
**query, **query,
defaults=defaults, defaults=defaults,
create_defaults=create_defaults, create_defaults=create_defaults,
) )
message.options["task"] = task
message.options["task_created"] = created
self.emit_after("enqueue", message, delay) self.emit_after("enqueue", message, delay)
return message return message

View File

@ -1,6 +1,6 @@
import contextvars import contextvars
from threading import Event from threading import Event
from typing import Any from typing import Any, override
from django.core.exceptions import ImproperlyConfigured from django.core.exceptions import ImproperlyConfigured
from django.db import ( from django.db import (
@ -58,6 +58,9 @@ class CurrentTask(Middleware):
raise RuntimeError("CurrentTask.get_task() can only be called in a running task") raise RuntimeError("CurrentTask.get_task() can only be called in a running task")
return task[-1] return task[-1]
def before_enqueue(self, broker: Broker, message: Message, delay: int):
self.after_process_message(broker, message)
def before_process_message(self, broker: Broker, message: Message): def before_process_message(self, broker: Broker, message: Message):
tasks = self._TASKS.get() tasks = self._TASKS.get()
if tasks is None: if tasks is None:
@ -75,9 +78,8 @@ class CurrentTask(Middleware):
): ):
tasks: list[TaskBase] | None = self._TASKS.get() tasks: list[TaskBase] | None = self._TASKS.get()
if tasks is None or len(tasks) == 0: if tasks is None or len(tasks) == 0:
self.logger.warning("Task was None, not saving. This should not happen.")
return return
else:
task = tasks[-1] task = tasks[-1]
fields_to_exclude = { fields_to_exclude = {
"message_id", "message_id",
@ -95,9 +97,12 @@ class CurrentTask(Middleware):
if f.name not in fields_to_exclude and not f.auto_created and f.column if f.name not in fields_to_exclude and not f.auto_created and f.column
] ]
if fields_to_update: if fields_to_update:
tasks[-1].save(update_fields=fields_to_update) task.save(update_fields=fields_to_update)
self._TASKS.set(tasks[:-1]) self._TASKS.set(tasks[:-1])
def after_skip_message(self, broker: Broker, message: Message):
self.after_process_message(broker, message)
class SchedulerMiddleware(Middleware): class SchedulerMiddleware(Middleware):
def __init__(self): def __init__(self):