From c62e1d54576dedb2ab4fc8e41863d0cc4ae0efc8 Mon Sep 17 00:00:00 2001 From: Marc 'risson' Schmitt Date: Fri, 13 Jun 2025 15:53:57 +0200 Subject: [PATCH] backport broker changes Signed-off-by: Marc 'risson' Schmitt --- .../django_dramatiq_postgres/broker.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py index 426bd301a8..3592855c40 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py @@ -240,6 +240,10 @@ class _PostgresConsumer(Consumer): self._listen_connection: DatabaseWrapper | None = None self.postgres_channel = channel_name(self.queue_name, ChannelIdentifier.ENQUEUE) + # Override because dramatiq doesn't allow us setting this manually + # TODO: turn it into a setting + self.timeout = 30000 // 1000 + @property def connection(self) -> DatabaseWrapper: return connections[self.db_alias] @@ -313,13 +317,12 @@ class _PostgresConsumer(Consumer): ) .values_list("message_id", flat=True) ) - channel = channel_name(self.queue_name, ChannelIdentifier.ENQUEUE) - return [Notify(pid=0, channel=channel, payload=item) for item in notifies] + return [Notify(pid=0, channel=self.postgres_channel, payload=item) for item in notifies] def _poll_for_notify(self): with self.listen_connection.cursor() as cursor: self.logger.debug(f"timeout is {self.timeout}") - notifies = list(cursor.connection.notifies(timeout=self.timeout)) + notifies = list(cursor.connection.notifies(timeout=self.timeout, stop_after=1)) self.logger.debug( f"Received {len(notifies)} postgres notifies on channel {self.postgres_channel}" )