diff --git a/authentik/lib/config.py b/authentik/lib/config.py index 54bacc7f09..eed711b657 100644 --- a/authentik/lib/config.py +++ b/authentik/lib/config.py @@ -41,6 +41,7 @@ REDIS_ENV_KEYS = [ # Old key -> new key DEPRECATIONS = { "geoip": "events.context_processors.geoip", + "worker.concurrency": "worker.processes", "redis.broker_url": "broker.url", "redis.broker_transport_options": "broker.transport_options", "redis.cache_timeout": "cache.timeout", diff --git a/authentik/lib/default.yml b/authentik/lib/default.yml index 52a79972c6..751cf55f0b 100644 --- a/authentik/lib/default.yml +++ b/authentik/lib/default.yml @@ -8,9 +8,9 @@ # make gen-dev-config # ``` # -# You may edit the generated file to override the configuration below. +# You may edit the generated file to override the configuration below. # -# When making modifying the default configuration file, +# When making modifying the default configuration file, # ensure that the corresponding documentation is updated to match. # # @see {@link ../../website/docs/install-config/configuration/configuration.mdx Configuration documentation} for more information. @@ -157,8 +157,8 @@ web: path: / worker: - embedded: false - concurrency: 2 + processes: 2 + threads: 1 storage: media: diff --git a/authentik/root/settings.py b/authentik/root/settings.py index bb0cf8a568..2accd91ebc 100644 --- a/authentik/root/settings.py +++ b/authentik/root/settings.py @@ -357,6 +357,15 @@ DRAMATIQ = { "broker_class": "authentik.tasks.broker.Broker", "channel_prefix": "authentik", "task_class": "authentik.tasks.models.Task", + "autodiscovery": { + "enabled": True, + "setup_module": "authentik.tasks.setup", + "apps_prefix": "authentik", + }, + "worker": { + "processes": CONFIG.get_int("worker.processes", 2), + "threads": CONFIG.get_int("worker.threads", 1), + }, "middlewares": ( # TODO: fixme # ("dramatiq.middleware.prometheus.Prometheus", {}), diff --git a/authentik/tasks/management/commands/worker.py b/authentik/tasks/management/commands/worker.py deleted file mode 100644 index 71441db7eb..0000000000 --- a/authentik/tasks/management/commands/worker.py +++ /dev/null @@ -1,106 +0,0 @@ -import os -import sys - -from django.core.management.base import BaseCommand -from django.utils.module_loading import module_has_submodule - -from authentik.lib.utils.reflection import get_apps - - -class Command(BaseCommand): - """Run worker""" - - def add_arguments(self, parser): - parser.add_argument( - "--pid-file", - action="store", - default=None, - dest="pid_file", - help="PID file", - ) - parser.add_argument( - "--reload", - action="store_true", - dest="use_watcher", - help="Enable autoreload", - ) - parser.add_argument( - "--reload-use-polling", - action="store_true", - dest="use_polling_watcher", - help="Use a poll-based file watcher for autoreload", - ) - parser.add_argument( - "--use-gevent", - action="store_true", - help="Use gevent for worker concurrency", - ) - parser.add_argument( - "--processes", - "-p", - default=1, - type=int, - help="The number of processes to run", - ) - parser.add_argument( - "--threads", - "-t", - default=1, - type=int, - help="The number of threads per process to use", - ) - - def handle( - self, - pid_file, - use_watcher, - use_polling_watcher, - use_gevent, - processes, - threads, - verbosity, - **options, - ): - executable_name = "dramatiq-gevent" if use_gevent else "dramatiq" - executable_path = self._resolve_executable(executable_name) - watch_args = ["--watch", "authentik"] if use_watcher else [] - if watch_args and use_polling_watcher: - watch_args.append("--watch-use-polling") - - pid_file_args = [] - if pid_file is not None: - pid_file_args = ["--pid-file", pid_file] - - verbosity_args = ["-v"] * (verbosity - 1) - - tasks_modules = self._discover_tasks_modules() - process_args = [ - executable_name, - "--path", - ".", - "--processes", - str(processes), - "--threads", - str(threads), - *watch_args, - *pid_file_args, - *verbosity_args, - *tasks_modules, - ] - - os.execvp(executable_path, process_args) # nosec - - def _resolve_executable(self, exec_name: str): - bin_dir = os.path.dirname(sys.executable) - if bin_dir: - for d in [bin_dir, os.path.join(bin_dir, "Scripts")]: - exec_path = os.path.join(d, exec_name) - if os.path.isfile(exec_path): - return exec_path - return exec_name - - def _discover_tasks_modules(self) -> list[str]: - # Does not support a tasks directory - return ["authentik.tasks.setup"] + [ - f"{app.name}.tasks" for app in get_apps() if module_has_submodule(app.module, "tasks") - ] diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py index 55146d079b..68408913b8 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py @@ -303,7 +303,6 @@ class _PostgresConsumer(Consumer): def _poll_for_notify(self): with self.listen_connection.cursor() as cursor: - self.logger.debug(f"timeout is {self.timeout}") notifies = list(cursor.connection.notifies(timeout=self.timeout, stop_after=1)) self.logger.debug( f"Received {len(notifies)} postgres notifies on channel {self.postgres_channel}" diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/conf.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/conf.py index 3e1014f8bb..515ce82b86 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/conf.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/conf.py @@ -7,12 +7,16 @@ from django.core.exceptions import ImproperlyConfigured class Conf: def __init__(self): try: - self.conf = settings.DRAMATIQ + _ = settings.DRAMATIQ except AttributeError as exc: raise ImproperlyConfigured("Setting DRAMATIQ not set.") from exc if "task_class" not in self.conf: raise ImproperlyConfigured("DRAMATIQ.task_class not defined") + @property + def conf(self) -> dict[str, Any]: + return settings.DRAMATIQ + @property def encoder_class(self) -> str: return self.conf.get("encoder_class", "dramatiq.encoder.PickleEncoder") @@ -52,6 +56,34 @@ class Conf: def task_class(self) -> str: return self.conf["task_class"] + @property + def autodiscovery(self) -> dict[str, Any]: + autodiscovery = { + "enabled": False, + "setup_module": "django_dramatiq_postgres.setup", + "apps_prefix": None, + "actors_module_name": "tasks", + "modules_callback": None, + **self.conf.get("autodiscovery", {}), + } + if not autodiscovery["enabled"] and not autodiscovery["modules_callback"]: + raise ImproperlyConfigured( + "One of DRAMATIQ.autodiscovery.enabled or " + "DRAMATIQ.autodiscovery.modules_callback must be configured." + ) + return autodiscovery + + @property + def worker(self) -> dict[str, Any]: + return { + "use_gevent": False, + "watch": settings.DEBUG, + "watch_use_polling": False, + "processes": None, + "threads": None, + **self.conf.get("worker", {}), + } + @property def test(self) -> bool: return self.conf.get("test", False) diff --git a/authentik/tasks/management/__init__.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/management/__init__.py similarity index 100% rename from authentik/tasks/management/__init__.py rename to packages/django-dramatiq-postgres/django_dramatiq_postgres/management/__init__.py diff --git a/authentik/tasks/management/commands/__init__.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/management/commands/__init__.py similarity index 100% rename from authentik/tasks/management/commands/__init__.py rename to packages/django-dramatiq-postgres/django_dramatiq_postgres/management/commands/__init__.py diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/management/commands/worker.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/management/commands/worker.py new file mode 100644 index 0000000000..dafb3f6b10 --- /dev/null +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/management/commands/worker.py @@ -0,0 +1,92 @@ +import os +import sys + +from django.apps.registry import apps +from django.core.management.base import BaseCommand +from django.utils.module_loading import import_string, module_has_submodule + +from django_dramatiq_postgres.conf import Conf + + +class Command(BaseCommand): + """Run worker""" + + def add_arguments(self, parser): + parser.add_argument( + "--pid-file", + action="store", + default=None, + dest="pid_file", + help="PID file", + ) + + def handle( + self, + pid_file, + verbosity, + **options, + ): + worker = Conf().worker + executable_name = "dramatiq-gevent" if worker["use_gevent"] else "dramatiq" + executable_path = self._resolve_executable(executable_name) + watch_args = ["--watch", "."] if worker["watch"] else [] + if watch_args and worker["watch_use_polling"]: + watch_args.append("--watch-use-polling") + + parallel_args = [] + if processes := worker["processes"]: + parallel_args.extend(["--processes", str(processes)]) + if threads := worker["threads"]: + parallel_args.extend(["--threads", str(threads)]) + + pid_file_args = [] + if pid_file is not None: + pid_file_args = ["--pid-file", pid_file] + + verbosity_args = ["-v"] * (verbosity - 1) + + tasks_modules = self._discover_tasks_modules() + process_args = [ + executable_name, + "--path", + ".", + *parallel_args, + *watch_args, + *pid_file_args, + *verbosity_args, + *tasks_modules, + ] + + os.execvp(executable_path, process_args) # nosec + + def _resolve_executable(self, exec_name: str): + bin_dir = os.path.dirname(sys.executable) + if bin_dir: + for d in [bin_dir, os.path.join(bin_dir, "Scripts")]: + exec_path = os.path.join(d, exec_name) + if os.path.isfile(exec_path): + return exec_path + return exec_name + + def _discover_tasks_modules(self) -> list[str]: + # Does not support a tasks directory + autodiscovery = Conf().autodiscovery + modules = [autodiscovery["setup_module"]] + + if autodiscovery["enabled"]: + for app in apps.get_app_configs(): + if autodiscovery["apps_prefix"] and not app.name.startswith( + autodiscovery["apps_prefix"] + ): + continue + if module_has_submodule(app.module, autodiscovery["actors_module_name"]): + modules.append(f"{app.name}.{autodiscovery['actors_module_name']}") + else: + modules_callback = autodiscovery["modules_callback"] + callback = ( + modules_callback + if not isinstance(modules_callback, str) + else import_string(modules_callback) + ) + modules.extend(callback()) + return modules diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/setup.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/setup.py new file mode 100644 index 0000000000..8fb58f6e12 --- /dev/null +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/setup.py @@ -0,0 +1,3 @@ +import django + +django.setup() diff --git a/scripts/generate_config.py b/scripts/generate_config.py index 6cb49e0aea..255622d439 100755 --- a/scripts/generate_config.py +++ b/scripts/generate_config.py @@ -47,7 +47,8 @@ def generate_local_config(): "api_key": generate_id(), }, "worker": { - "embedded": True, + "processes": 1, + "threads": 1, }, }