From 7fe91339ada98ec0311fa90d398317e6ccd2e5d3 Mon Sep 17 00:00:00 2001 From: Marc 'risson' Schmitt Date: Mon, 24 Mar 2025 15:41:23 +0100 Subject: [PATCH] wip Signed-off-by: Marc 'risson' Schmitt --- authentik/admin/tests/test_tasks.py | 15 ++++++++++----- authentik/tasks/broker.py | 23 ++++++++--------------- authentik/tasks/tests.py | 4 ++++ 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/authentik/admin/tests/test_tasks.py b/authentik/admin/tests/test_tasks.py index 716168eb21..1ca53680cd 100644 --- a/authentik/admin/tests/test_tasks.py +++ b/authentik/admin/tests/test_tasks.py @@ -30,7 +30,8 @@ class TestAdminTasks(TaskTestCase): """Test Update checker with valid response""" with Mocker() as mocker, CONFIG.patch("disable_update_check", False): mocker.get("https://version.goauthentik.io/version.json", json=RESPONSE_VALID) - update_latest_version.delay().get() + update_latest_version.send() + self.tasks_join(update_latest_version.queue_name) self.assertEqual(cache.get(VERSION_CACHE_KEY), "99999999.9999999") self.assertTrue( Event.objects.filter( @@ -40,7 +41,8 @@ class TestAdminTasks(TaskTestCase): ).exists() ) # test that a consecutive check doesn't create a duplicate event - update_latest_version.delay().get() + update_latest_version.send() + self.tasks_join(update_latest_version.queue_name) self.assertEqual( len( Event.objects.filter( @@ -56,7 +58,8 @@ class TestAdminTasks(TaskTestCase): """Test Update checker with invalid response""" with Mocker() as mocker: mocker.get("https://version.goauthentik.io/version.json", status_code=400) - update_latest_version.delay().get() + update_latest_version.send() + self.tasks_join(update_latest_version.queue_name) self.assertEqual(cache.get(VERSION_CACHE_KEY), "0.0.0") self.assertFalse( Event.objects.filter( @@ -67,7 +70,8 @@ class TestAdminTasks(TaskTestCase): def test_version_disabled(self): """Test Update checker while its disabled""" with CONFIG.patch("disable_update_check", True): - update_latest_version.delay().get() + update_latest_version.send() + self.tasks_join(update_latest_version.queue_name) self.assertEqual(cache.get(VERSION_CACHE_KEY), "0.0.0") def test_clear_update_notifications(self): @@ -77,7 +81,8 @@ class TestAdminTasks(TaskTestCase): ) Event.objects.create(action=EventAction.UPDATE_AVAILABLE, context={"new_version": "1.1.1"}) Event.objects.create(action=EventAction.UPDATE_AVAILABLE, context={}) - clear_update_notifications() + clear_update_notifications.send() + self.tasks_join(clear_update_notifications.queue_name) self.assertFalse( Event.objects.filter( action=EventAction.UPDATE_AVAILABLE, context__new_version="1.1" diff --git a/authentik/tasks/broker.py b/authentik/tasks/broker.py index 5059c01586..e9d29f22a6 100644 --- a/authentik/tasks/broker.py +++ b/authentik/tasks/broker.py @@ -188,29 +188,22 @@ class PostgresBroker(Broker): def join( self, queue_name: str, - min_successes: int = 10, - idle_time: int = 100, + interval: int = 100, *, timeout: int | None = None, ): deadline = timeout and time.monotonic() + timeout / 1000 - successes = 0 - while successes < min_successes: + while True: if deadline and time.monotonic() >= deadline: raise QueueJoinTimeout(queue_name) - if ( - self.query_set.filter( - queue_name=queue_name, - state__in=(TaskState.QUEUED, TaskState.CONSUMED), - ) - == 0 - ): - successes += 1 - else: - successes = 0 + if self.query_set.filter( + queue_name=queue_name, + state__in=(TaskState.QUEUED, TaskState.CONSUMED), + ).exists(): + return - time.sleep(idle_time / 1000) + time.sleep(interval / 1000) class _PostgresConsumer(Consumer): diff --git a/authentik/tasks/tests.py b/authentik/tasks/tests.py index 4a82f623a8..cfa85d66c9 100644 --- a/authentik/tasks/tests.py +++ b/authentik/tasks/tests.py @@ -16,3 +16,7 @@ class TaskTestCase(TransactionTestCase): self.worker.stop() super()._post_teardown() + + def tasks_join(self, queue_name: str): + self.broker.join(queue_name) + self.worker.join()