44 lines
1.5 KiB
Python
44 lines
1.5 KiB
Python
from django.db import DEFAULT_DB_ALIAS
|
|
from django.db.models import QuerySet
|
|
from django.utils import timezone
|
|
from django.utils.functional import cached_property
|
|
from django.utils.module_loading import import_string
|
|
from dramatiq.message import Message
|
|
from dramatiq.results.backend import Missing, MResult, Result, ResultBackend
|
|
|
|
from django_dramatiq_postgres.conf import Conf
|
|
from django_dramatiq_postgres.models import TaskBase
|
|
|
|
|
|
class PostgresBackend(ResultBackend):
|
|
def __init__(self, *args, db_alias: str = DEFAULT_DB_ALIAS, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.db_alias = db_alias
|
|
|
|
@cached_property
|
|
def model(self) -> type[TaskBase]:
|
|
return import_string(Conf().task_model)
|
|
|
|
@property
|
|
def query_set(self) -> QuerySet:
|
|
return self.model.objects.using(self.db_alias)
|
|
|
|
def build_message_key(self, message: Message) -> str:
|
|
return str(message.message_id)
|
|
|
|
def _get(self, message_key: str) -> MResult:
|
|
message = self.query_set.filter(message_id=message_key).first()
|
|
if message is None:
|
|
return Missing
|
|
data = message.result
|
|
if data is None:
|
|
return Missing
|
|
return self.encoder.decode(data)
|
|
|
|
def _store(self, message_key: str, result: Result, ttl: int) -> None:
|
|
self.query_set.filter(message_id=message_key).update(
|
|
mtime=timezone.now(),
|
|
result=self.encoder.encode(result),
|
|
result_expiry=timezone.now() + timezone.timedelta(milliseconds=ttl),
|
|
)
|