lib: save task's call arguments for manual retry
This commit is contained in:
@ -2,7 +2,7 @@
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from celery import Task
|
||||
from django.core.cache import cache
|
||||
@ -40,15 +40,30 @@ class TaskInfo:
|
||||
|
||||
result: TaskResult
|
||||
|
||||
task_call_module: str
|
||||
task_call_func: str
|
||||
task_call_args: List[Any] = field(default_factory=list)
|
||||
task_call_kwargs: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
task_description: Optional[str] = field(default=None)
|
||||
|
||||
@staticmethod
|
||||
def all() -> Dict[str, "TaskInfo"]:
|
||||
"""Get all TaskInfo objects"""
|
||||
return cache.get_many(cache.keys("task_*"))
|
||||
|
||||
@staticmethod
|
||||
def by_name(name: str) -> Optional["TaskInfo"]:
|
||||
"""Get TaskInfo Object by name"""
|
||||
return cache.get(f"task_{name}")
|
||||
|
||||
def save(self):
|
||||
"""Save task into cache"""
|
||||
key = f"task_{self.task_name}"
|
||||
if self.result.uid:
|
||||
key += f"_{self.result.uid}"
|
||||
self.task_name += f"_{self.result.uid}"
|
||||
cache.set(key, self)
|
||||
cache.set(key, self, timeout=6 * 60 * 60)
|
||||
|
||||
|
||||
class MonitoredTask(Task):
|
||||
@ -65,12 +80,18 @@ class MonitoredTask(Task):
|
||||
self._result = result
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
def after_return(self, status, retval, task_id, args, kwargs, einfo):
|
||||
def after_return(
|
||||
self, status, retval, task_id, args: List[Any], kwargs: Dict[str, Any], einfo
|
||||
):
|
||||
TaskInfo(
|
||||
task_name=self.__name__,
|
||||
task_description=self.__doc__,
|
||||
finish_timestamp=datetime.now(),
|
||||
result=self._result,
|
||||
task_call_module=self.__module__,
|
||||
task_call_func=self.__name__,
|
||||
task_call_args=args,
|
||||
task_call_kwargs=kwargs,
|
||||
).save()
|
||||
return super().after_return(status, retval, task_id, args, kwargs, einfo=einfo)
|
||||
|
||||
@ -81,6 +102,10 @@ class MonitoredTask(Task):
|
||||
task_description=self.__doc__,
|
||||
finish_timestamp=datetime.now(),
|
||||
result=self._result,
|
||||
task_call_module=self.__module__,
|
||||
task_call_func=self.__name__,
|
||||
task_call_args=args,
|
||||
task_call_kwargs=kwargs,
|
||||
).save()
|
||||
return super().on_failure(exc, task_id, args, kwargs, einfo=einfo)
|
||||
|
||||
|
Reference in New Issue
Block a user