tenants: fix scheduled tasks not running on default tenant (#9583)
* tenants: fix scheduled tasks not running on default tenant Signed-off-by: Jens Langhammer <jens@goauthentik.io> * add some extra time to keep system task around Signed-off-by: Jens Langhammer <jens@goauthentik.io> * make sure we actually send it to all tenants Signed-off-by: Jens Langhammer <jens@goauthentik.io> --------- Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
@ -119,7 +119,7 @@ class SystemTask(TenantTask):
|
|||||||
"task_call_kwargs": sanitize_item(kwargs),
|
"task_call_kwargs": sanitize_item(kwargs),
|
||||||
"status": self._status,
|
"status": self._status,
|
||||||
"messages": sanitize_item(self._messages),
|
"messages": sanitize_item(self._messages),
|
||||||
"expires": now() + timedelta(hours=self.result_timeout_hours),
|
"expires": now() + timedelta(hours=self.result_timeout_hours + 3),
|
||||||
"expiring": True,
|
"expiring": True,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@ -53,6 +53,7 @@ cache:
|
|||||||
|
|
||||||
# result_backend:
|
# result_backend:
|
||||||
# url: ""
|
# url: ""
|
||||||
|
# transport_options: ""
|
||||||
|
|
||||||
debug: false
|
debug: false
|
||||||
remote_debug: false
|
remote_debug: false
|
||||||
|
|||||||
@ -376,7 +376,13 @@ CELERY = {
|
|||||||
"task_default_queue": "authentik",
|
"task_default_queue": "authentik",
|
||||||
"broker_url": CONFIG.get("broker.url") or redis_url(CONFIG.get("redis.db")),
|
"broker_url": CONFIG.get("broker.url") or redis_url(CONFIG.get("redis.db")),
|
||||||
"result_backend": CONFIG.get("result_backend.url") or redis_url(CONFIG.get("redis.db")),
|
"result_backend": CONFIG.get("result_backend.url") or redis_url(CONFIG.get("redis.db")),
|
||||||
"broker_transport_options": CONFIG.get_dict_from_b64_json("broker.transport_options"),
|
"broker_transport_options": CONFIG.get_dict_from_b64_json(
|
||||||
|
"broker.transport_options", {"retry_policy": {"timeout": 5.0}}
|
||||||
|
),
|
||||||
|
"result_backend_transport_options": CONFIG.get_dict_from_b64_json(
|
||||||
|
"result_backend.transport_options", {"retry_policy": {"timeout": 5.0}}
|
||||||
|
),
|
||||||
|
"redis_retry_on_timeout": True,
|
||||||
}
|
}
|
||||||
|
|
||||||
# Sentry integration
|
# Sentry integration
|
||||||
|
|||||||
@ -3,6 +3,7 @@
|
|||||||
from tenant_schemas_celery.scheduler import (
|
from tenant_schemas_celery.scheduler import (
|
||||||
TenantAwarePersistentScheduler as BaseTenantAwarePersistentScheduler,
|
TenantAwarePersistentScheduler as BaseTenantAwarePersistentScheduler,
|
||||||
)
|
)
|
||||||
|
from tenant_schemas_celery.scheduler import TenantAwareScheduleEntry
|
||||||
|
|
||||||
|
|
||||||
class TenantAwarePersistentScheduler(BaseTenantAwarePersistentScheduler):
|
class TenantAwarePersistentScheduler(BaseTenantAwarePersistentScheduler):
|
||||||
@ -11,3 +12,11 @@ class TenantAwarePersistentScheduler(BaseTenantAwarePersistentScheduler):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def get_queryset(cls):
|
def get_queryset(cls):
|
||||||
return super().get_queryset().filter(ready=True)
|
return super().get_queryset().filter(ready=True)
|
||||||
|
|
||||||
|
def apply_entry(self, entry: TenantAwareScheduleEntry, producer=None):
|
||||||
|
# https://github.com/maciej-gol/tenant-schemas-celery/blob/master/tenant_schemas_celery/scheduler.py#L85
|
||||||
|
# When (as by default) no tenant schemas are set, the public schema is excluded
|
||||||
|
# so we need to explicitly include it here, otherwise the task is not executed
|
||||||
|
if entry.tenant_schemas is None:
|
||||||
|
entry.tenant_schemas = self.get_queryset().values_list("schema_name", flat=True)
|
||||||
|
return super().apply_entry(entry, producer)
|
||||||
|
|||||||
Reference in New Issue
Block a user