fix final todos in package
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
@ -159,7 +159,9 @@ web:
|
|||||||
worker:
|
worker:
|
||||||
processes: 2
|
processes: 2
|
||||||
threads: 1
|
threads: 1
|
||||||
consumer_listen_timeout: 30
|
consumer_listen_timeout: "seconds=30"
|
||||||
|
task_purge_interval: "days=1"
|
||||||
|
task_expiration: "days=30"
|
||||||
|
|
||||||
storage:
|
storage:
|
||||||
media:
|
media:
|
||||||
|
@ -358,6 +358,10 @@ DRAMATIQ = {
|
|||||||
"broker_class": "authentik.tasks.broker.Broker",
|
"broker_class": "authentik.tasks.broker.Broker",
|
||||||
"channel_prefix": "authentik",
|
"channel_prefix": "authentik",
|
||||||
"task_model": "authentik.tasks.models.Task",
|
"task_model": "authentik.tasks.models.Task",
|
||||||
|
"task_purge_interval": timedelta_from_string(
|
||||||
|
CONFIG.get("worker.task_purge_interval")
|
||||||
|
).total_seconds,
|
||||||
|
"task_expiration": timedelta_from_string(CONFIG.get("worker.task_expiration")).total_seconds,
|
||||||
"autodiscovery": {
|
"autodiscovery": {
|
||||||
"enabled": True,
|
"enabled": True,
|
||||||
"setup_module": "authentik.tasks.setup",
|
"setup_module": "authentik.tasks.setup",
|
||||||
@ -366,7 +370,9 @@ DRAMATIQ = {
|
|||||||
"worker": {
|
"worker": {
|
||||||
"processes": CONFIG.get_int("worker.processes", 2),
|
"processes": CONFIG.get_int("worker.processes", 2),
|
||||||
"threads": CONFIG.get_int("worker.threads", 1),
|
"threads": CONFIG.get_int("worker.threads", 1),
|
||||||
"consumer_listen_timeout": CONFIG.get_int("worker.consumer_listen_timeout", 30),
|
"consumer_listen_timeout": timedelta_from_string(
|
||||||
|
CONFIG.get("worker.consumer_listen_timeout")
|
||||||
|
),
|
||||||
},
|
},
|
||||||
"scheduler_class": "authentik.tasks.schedules.scheduler.Scheduler",
|
"scheduler_class": "authentik.tasks.schedules.scheduler.Scheduler",
|
||||||
"schedule_model": "authentik.tasks.schedules.models.Schedule",
|
"schedule_model": "authentik.tasks.schedules.models.Schedule",
|
||||||
|
@ -230,7 +230,6 @@ class _PostgresConsumer(Consumer):
|
|||||||
self.postgres_channel = channel_name(self.queue_name, ChannelIdentifier.ENQUEUE)
|
self.postgres_channel = channel_name(self.queue_name, ChannelIdentifier.ENQUEUE)
|
||||||
|
|
||||||
# Override because dramatiq doesn't allow us setting this manually
|
# Override because dramatiq doesn't allow us setting this manually
|
||||||
# TODO: turn it into a setting
|
|
||||||
self.timeout = Conf().worker["consumer_listen_timeout"]
|
self.timeout = Conf().worker["consumer_listen_timeout"]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -403,16 +402,16 @@ class _PostgresConsumer(Consumer):
|
|||||||
self.unlock_queue.task_done()
|
self.unlock_queue.task_done()
|
||||||
|
|
||||||
def _auto_purge(self):
|
def _auto_purge(self):
|
||||||
# TODO: allow configuring this
|
# Automatically purge messages on average every n iterations.
|
||||||
# Automatically purge messages on average every 100k iteration.
|
# We manually set the timeout to 30s, so we need to divide by 30 to
|
||||||
# Dramatiq defaults to 1s, so this means one purge every 28 hours.
|
# get the number of actual iterations.
|
||||||
if randint(0, 100_000): # nosec
|
iterations = Conf().task_purge_interval // 30
|
||||||
|
if randint(0, iterations): # nosec
|
||||||
return
|
return
|
||||||
self.logger.debug("Running garbage collector")
|
self.logger.debug("Running garbage collector")
|
||||||
count = self.query_set.filter(
|
count = self.query_set.filter(
|
||||||
state__in=(TaskState.DONE, TaskState.REJECTED),
|
state__in=(TaskState.DONE, TaskState.REJECTED),
|
||||||
# TODO: allow configuring this
|
mtime__lte=timezone.now() - timezone.timedelta(seconds=Conf().task_purge_interval),
|
||||||
mtime__lte=timezone.now() - timezone.timedelta(days=30),
|
|
||||||
result_expiry__lte=timezone.now(),
|
result_expiry__lte=timezone.now(),
|
||||||
).delete()
|
).delete()
|
||||||
self.logger.info(f"Purged {count} messages in all queues")
|
self.logger.info(f"Purged {count} messages in all queues")
|
||||||
|
@ -56,6 +56,16 @@ class Conf:
|
|||||||
def task_model(self) -> str:
|
def task_model(self) -> str:
|
||||||
return self.conf["task_model"]
|
return self.conf["task_model"]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def task_purge_interval(self) -> int:
|
||||||
|
# 24 hours
|
||||||
|
return self.conf.get("task_purge_interval", 24 * 60 * 60)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def task_expiration(self) -> int:
|
||||||
|
# 30 days
|
||||||
|
return self.conf.get("task_expiration", 60 * 60 * 24 * 30)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def autodiscovery(self) -> dict[str, Any]:
|
def autodiscovery(self) -> dict[str, Any]:
|
||||||
autodiscovery = {
|
autodiscovery = {
|
||||||
|
@ -49,7 +49,7 @@ def generate_local_config():
|
|||||||
"worker": {
|
"worker": {
|
||||||
"processes": 1,
|
"processes": 1,
|
||||||
"threads": 1,
|
"threads": 1,
|
||||||
"consumer_listen_timeout": 10,
|
"consumer_listen_timeout": "seconds=10",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user