Compare commits
	
		
			10 Commits
		
	
	
		
			providers/
			...
			enterprise
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 60000812fd | |||
| 929e70669a | |||
| 36114284bf | |||
| a65ea0de94 | |||
| 2056b0cbee | |||
| af85ddf60b | |||
| e8e42261e3 | |||
| 9fda4e91ad | |||
| a11f1258e1 | |||
| a97578ac62 | 
							
								
								
									
										2
									
								
								.github/workflows/ci-main.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/ci-main.yml
									
									
									
									
										vendored
									
									
								
							@ -168,6 +168,8 @@ jobs:
 | 
			
		||||
            glob: tests/e2e/test_provider_saml* tests/e2e/test_source_saml*
 | 
			
		||||
          - name: ldap
 | 
			
		||||
            glob: tests/e2e/test_provider_ldap* tests/e2e/test_source_ldap*
 | 
			
		||||
          - name: rac
 | 
			
		||||
            glob: tests/e2e/test_provider_rac*
 | 
			
		||||
          - name: radius
 | 
			
		||||
            glob: tests/e2e/test_provider_radius*
 | 
			
		||||
          - name: scim
 | 
			
		||||
 | 
			
		||||
@ -31,6 +31,8 @@ class PytestTestRunner(DiscoverRunner):  # pragma: no cover
 | 
			
		||||
 | 
			
		||||
        if kwargs.get("randomly_seed", None):
 | 
			
		||||
            self.args.append(f"--randomly-seed={kwargs['randomly_seed']}")
 | 
			
		||||
        if kwargs.get("no_capture"):
 | 
			
		||||
            self.args.append("-s")
 | 
			
		||||
 | 
			
		||||
        settings.TEST = True
 | 
			
		||||
        settings.CELERY["task_always_eager"] = True
 | 
			
		||||
@ -56,6 +58,10 @@ class PytestTestRunner(DiscoverRunner):  # pragma: no cover
 | 
			
		||||
    def add_arguments(cls, parser: ArgumentParser):
 | 
			
		||||
        """Add more pytest-specific arguments"""
 | 
			
		||||
        DiscoverRunner.add_arguments(parser)
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            "--no-capture",
 | 
			
		||||
            action="store_true",
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            "--randomly-seed",
 | 
			
		||||
            type=int,
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										44
									
								
								tests/e2e/_process.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										44
									
								
								tests/e2e/_process.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,44 @@
 | 
			
		||||
"""authentik e2e testing utilities"""
 | 
			
		||||
 | 
			
		||||
from datetime import timedelta
 | 
			
		||||
from time import mktime
 | 
			
		||||
from unittest.mock import MagicMock, patch
 | 
			
		||||
 | 
			
		||||
from daphne.testing import DaphneProcess
 | 
			
		||||
from django import setup as django_setup
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
from django.utils.timezone import now
 | 
			
		||||
 | 
			
		||||
from authentik.lib.config import CONFIG
 | 
			
		||||
from authentik.lib.generators import generate_id
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestDatabaseProcess(DaphneProcess):
 | 
			
		||||
    """Channels does not correctly switch to the test database by default.
 | 
			
		||||
    https://github.com/django/channels/issues/2048"""
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
        if not settings.configured:  # Fix For raise AppRegistryNotReady("Apps aren't loaded yet.")
 | 
			
		||||
            django_setup()  # Ensure Django is fully set up before using settings
 | 
			
		||||
        if not settings.DATABASES[list(settings.DATABASES.keys())[0]]["NAME"].startswith("test_"):
 | 
			
		||||
            for _, db_settings in settings.DATABASES.items():
 | 
			
		||||
                db_settings["NAME"] = f"test_{db_settings['NAME']}"
 | 
			
		||||
        settings.TEST = True
 | 
			
		||||
        from authentik.enterprise.license import LicenseKey
 | 
			
		||||
 | 
			
		||||
        with (
 | 
			
		||||
            patch(
 | 
			
		||||
                "authentik.enterprise.license.LicenseKey.validate",
 | 
			
		||||
                MagicMock(
 | 
			
		||||
                    return_value=LicenseKey(
 | 
			
		||||
                        aud="",
 | 
			
		||||
                        exp=int(mktime((now() + timedelta(days=3000)).timetuple())),
 | 
			
		||||
                        name=generate_id(),
 | 
			
		||||
                        internal_users=100,
 | 
			
		||||
                        external_users=100,
 | 
			
		||||
                    )
 | 
			
		||||
                ),
 | 
			
		||||
            ),
 | 
			
		||||
            CONFIG.patch("email.port", 1025),
 | 
			
		||||
        ):
 | 
			
		||||
            return super().run()
 | 
			
		||||
@ -1,7 +1,6 @@
 | 
			
		||||
"""LDAP and Outpost e2e tests"""
 | 
			
		||||
 | 
			
		||||
from dataclasses import asdict
 | 
			
		||||
from time import sleep
 | 
			
		||||
 | 
			
		||||
from guardian.shortcuts import assign_perm
 | 
			
		||||
from ldap3 import ALL, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, SUBTREE, Connection, Server
 | 
			
		||||
@ -56,17 +55,7 @@ class TestProviderLDAP(SeleniumTestCase):
 | 
			
		||||
        outpost.providers.add(ldap)
 | 
			
		||||
 | 
			
		||||
        self.start_ldap(outpost)
 | 
			
		||||
 | 
			
		||||
        # Wait until outpost healthcheck succeeds
 | 
			
		||||
        healthcheck_retries = 0
 | 
			
		||||
        while healthcheck_retries < 50:  # noqa: PLR2004
 | 
			
		||||
            if len(outpost.state) > 0:
 | 
			
		||||
                state = outpost.state[0]
 | 
			
		||||
                if state.last_seen:
 | 
			
		||||
                    break
 | 
			
		||||
            healthcheck_retries += 1
 | 
			
		||||
            sleep(0.5)
 | 
			
		||||
        sleep(5)
 | 
			
		||||
        self.wait_for_outpost(outpost)
 | 
			
		||||
        return outpost
 | 
			
		||||
 | 
			
		||||
    @retry()
 | 
			
		||||
 | 
			
		||||
@ -7,7 +7,6 @@ from sys import platform
 | 
			
		||||
from time import sleep
 | 
			
		||||
from unittest.case import skip, skipUnless
 | 
			
		||||
 | 
			
		||||
from channels.testing import ChannelsLiveServerTestCase
 | 
			
		||||
from jwt import decode
 | 
			
		||||
from selenium.webdriver.common.by import By
 | 
			
		||||
 | 
			
		||||
@ -87,17 +86,7 @@ class TestProviderProxy(SeleniumTestCase):
 | 
			
		||||
        outpost.build_user_permissions(outpost.user)
 | 
			
		||||
 | 
			
		||||
        self.start_proxy(outpost)
 | 
			
		||||
 | 
			
		||||
        # Wait until outpost healthcheck succeeds
 | 
			
		||||
        healthcheck_retries = 0
 | 
			
		||||
        while healthcheck_retries < 50:  # noqa: PLR2004
 | 
			
		||||
            if len(outpost.state) > 0:
 | 
			
		||||
                state = outpost.state[0]
 | 
			
		||||
                if state.last_seen:
 | 
			
		||||
                    break
 | 
			
		||||
            healthcheck_retries += 1
 | 
			
		||||
            sleep(0.5)
 | 
			
		||||
        sleep(5)
 | 
			
		||||
        self.wait_for_outpost(outpost)
 | 
			
		||||
 | 
			
		||||
        self.driver.get("http://localhost:9000/api")
 | 
			
		||||
        self.login()
 | 
			
		||||
@ -168,17 +157,7 @@ class TestProviderProxy(SeleniumTestCase):
 | 
			
		||||
        outpost.build_user_permissions(outpost.user)
 | 
			
		||||
 | 
			
		||||
        self.start_proxy(outpost)
 | 
			
		||||
 | 
			
		||||
        # Wait until outpost healthcheck succeeds
 | 
			
		||||
        healthcheck_retries = 0
 | 
			
		||||
        while healthcheck_retries < 50:  # noqa: PLR2004
 | 
			
		||||
            if len(outpost.state) > 0:
 | 
			
		||||
                state = outpost.state[0]
 | 
			
		||||
                if state.last_seen:
 | 
			
		||||
                    break
 | 
			
		||||
            healthcheck_retries += 1
 | 
			
		||||
            sleep(0.5)
 | 
			
		||||
        sleep(5)
 | 
			
		||||
        self.wait_for_outpost(outpost)
 | 
			
		||||
 | 
			
		||||
        self.driver.get("http://localhost:9000/api")
 | 
			
		||||
        self.login()
 | 
			
		||||
@ -202,7 +181,7 @@ class TestProviderProxy(SeleniumTestCase):
 | 
			
		||||
# TODO: Fix flaky test
 | 
			
		||||
@skip("Flaky test")
 | 
			
		||||
@skipUnless(platform.startswith("linux"), "requires local docker")
 | 
			
		||||
class TestProviderProxyConnect(ChannelsLiveServerTestCase):
 | 
			
		||||
class TestProviderProxyConnect(SeleniumTestCase):
 | 
			
		||||
    """Test Proxy connectivity over websockets"""
 | 
			
		||||
 | 
			
		||||
    @retry(exceptions=[AssertionError])
 | 
			
		||||
@ -239,16 +218,7 @@ class TestProviderProxyConnect(ChannelsLiveServerTestCase):
 | 
			
		||||
        )
 | 
			
		||||
        outpost.providers.add(proxy)
 | 
			
		||||
        outpost.build_user_permissions(outpost.user)
 | 
			
		||||
 | 
			
		||||
        # Wait until outpost healthcheck succeeds
 | 
			
		||||
        healthcheck_retries = 0
 | 
			
		||||
        while healthcheck_retries < 50:  # noqa: PLR2004
 | 
			
		||||
            if len(outpost.state) > 0:
 | 
			
		||||
                state = outpost.state[0]
 | 
			
		||||
                if state.last_seen and state.version:
 | 
			
		||||
                    break
 | 
			
		||||
            healthcheck_retries += 1
 | 
			
		||||
            sleep(0.5)
 | 
			
		||||
        self.wait_for_outpost(outpost)
 | 
			
		||||
 | 
			
		||||
        state = outpost.state
 | 
			
		||||
        self.assertGreaterEqual(len(state), 1)
 | 
			
		||||
 | 
			
		||||
@ -76,17 +76,7 @@ class TestProviderProxyForward(SeleniumTestCase):
 | 
			
		||||
        outpost.build_user_permissions(outpost.user)
 | 
			
		||||
 | 
			
		||||
        self.start_outpost(outpost)
 | 
			
		||||
 | 
			
		||||
        # Wait until outpost healthcheck succeeds
 | 
			
		||||
        healthcheck_retries = 0
 | 
			
		||||
        while healthcheck_retries < 50:  # noqa: PLR2004
 | 
			
		||||
            if len(outpost.state) > 0:
 | 
			
		||||
                state = outpost.state[0]
 | 
			
		||||
                if state.last_seen:
 | 
			
		||||
                    break
 | 
			
		||||
            healthcheck_retries += 1
 | 
			
		||||
            sleep(0.5)
 | 
			
		||||
        sleep(5)
 | 
			
		||||
        self.wait_for_outpost(outpost)
 | 
			
		||||
 | 
			
		||||
    @retry()
 | 
			
		||||
    def test_traefik(self):
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										126
									
								
								tests/e2e/test_provider_rac.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										126
									
								
								tests/e2e/test_provider_rac.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,126 @@
 | 
			
		||||
"""RAC e2e tests"""
 | 
			
		||||
 | 
			
		||||
from datetime import timedelta
 | 
			
		||||
from time import mktime, sleep
 | 
			
		||||
from unittest.mock import MagicMock, patch
 | 
			
		||||
 | 
			
		||||
from django.utils.timezone import now
 | 
			
		||||
from selenium.webdriver.common.by import By
 | 
			
		||||
from selenium.webdriver.common.keys import Keys
 | 
			
		||||
 | 
			
		||||
from authentik.blueprints.tests import apply_blueprint, reconcile_app
 | 
			
		||||
from authentik.core.models import Application
 | 
			
		||||
from authentik.enterprise.license import LicenseKey
 | 
			
		||||
from authentik.enterprise.models import License
 | 
			
		||||
from authentik.enterprise.providers.rac.models import Endpoint, Protocols, RACProvider
 | 
			
		||||
from authentik.flows.models import Flow
 | 
			
		||||
from authentik.lib.generators import generate_id
 | 
			
		||||
from authentik.outposts.models import Outpost, OutpostType
 | 
			
		||||
from tests.e2e.utils import SeleniumTestCase, retry
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestProviderRAC(SeleniumTestCase):
 | 
			
		||||
    """RAC e2e tests"""
 | 
			
		||||
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super().setUp()
 | 
			
		||||
        self.password = generate_id()
 | 
			
		||||
 | 
			
		||||
    def start_rac(self, outpost: Outpost):
 | 
			
		||||
        """Start rac container based on outpost created"""
 | 
			
		||||
        self.run_container(
 | 
			
		||||
            image=self.get_container_image("ghcr.io/goauthentik/dev-rac"),
 | 
			
		||||
            environment={
 | 
			
		||||
                "AUTHENTIK_TOKEN": outpost.token.key,
 | 
			
		||||
            },
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @patch(
 | 
			
		||||
        "authentik.enterprise.license.LicenseKey.validate",
 | 
			
		||||
        MagicMock(
 | 
			
		||||
            return_value=LicenseKey(
 | 
			
		||||
                aud="",
 | 
			
		||||
                exp=int(mktime((now() + timedelta(days=3000)).timetuple())),
 | 
			
		||||
                name=generate_id(),
 | 
			
		||||
                internal_users=100,
 | 
			
		||||
                external_users=100,
 | 
			
		||||
            )
 | 
			
		||||
        ),
 | 
			
		||||
    )
 | 
			
		||||
    @retry()
 | 
			
		||||
    @apply_blueprint(
 | 
			
		||||
        "default/flow-default-authentication-flow.yaml",
 | 
			
		||||
        "default/flow-default-invalidation-flow.yaml",
 | 
			
		||||
    )
 | 
			
		||||
    @apply_blueprint(
 | 
			
		||||
        "default/flow-default-provider-authorization-implicit-consent.yaml",
 | 
			
		||||
        "default/flow-default-provider-invalidation.yaml",
 | 
			
		||||
    )
 | 
			
		||||
    @apply_blueprint(
 | 
			
		||||
        "system/providers-rac.yaml",
 | 
			
		||||
    )
 | 
			
		||||
    @reconcile_app("authentik_crypto")
 | 
			
		||||
    def test_rac_ssh(self):
 | 
			
		||||
        """Test SSH RAC"""
 | 
			
		||||
        License.objects.create(key=generate_id())
 | 
			
		||||
 | 
			
		||||
        test_ssh = self.run_container(
 | 
			
		||||
            image="lscr.io/linuxserver/openssh-server:latest",
 | 
			
		||||
            ports={
 | 
			
		||||
                "2222": "2222",
 | 
			
		||||
            },
 | 
			
		||||
            environment={
 | 
			
		||||
                "USER_NAME": "authentik",
 | 
			
		||||
                "USER_PASSWORD": self.password,
 | 
			
		||||
                "PASSWORD_ACCESS": "true",
 | 
			
		||||
                "SUDO_ACCESS": "true",
 | 
			
		||||
            },
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        rac: RACProvider = RACProvider.objects.create(
 | 
			
		||||
            name=generate_id(),
 | 
			
		||||
            authorization_flow=Flow.objects.get(
 | 
			
		||||
                slug="default-provider-authorization-implicit-consent"
 | 
			
		||||
            ),
 | 
			
		||||
        )
 | 
			
		||||
        endpoint = Endpoint.objects.create(
 | 
			
		||||
            name=generate_id(),
 | 
			
		||||
            protocol=Protocols.SSH,
 | 
			
		||||
            host=f"{self.host}:2222",
 | 
			
		||||
            settings={
 | 
			
		||||
                "username": "authentik",
 | 
			
		||||
                "password": self.password,
 | 
			
		||||
            },
 | 
			
		||||
            provider=rac,
 | 
			
		||||
        )
 | 
			
		||||
        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=rac)
 | 
			
		||||
        outpost: Outpost = Outpost.objects.create(
 | 
			
		||||
            name=generate_id(),
 | 
			
		||||
            type=OutpostType.RAC,
 | 
			
		||||
        )
 | 
			
		||||
        outpost.providers.add(rac)
 | 
			
		||||
        outpost.build_user_permissions(outpost.user)
 | 
			
		||||
 | 
			
		||||
        self.start_rac(outpost)
 | 
			
		||||
        self.wait_for_outpost(outpost)
 | 
			
		||||
 | 
			
		||||
        self.driver.get(
 | 
			
		||||
            self.url("authentik_providers_rac:start", app=app.slug, endpoint=endpoint.pk)
 | 
			
		||||
        )
 | 
			
		||||
        self.login()
 | 
			
		||||
        sleep(1)
 | 
			
		||||
 | 
			
		||||
        iface = self.driver.find_element(By.CSS_SELECTOR, "ak-rac")
 | 
			
		||||
        sleep(5)
 | 
			
		||||
        state = self.driver.execute_script("return arguments[0].clientState", iface)
 | 
			
		||||
        self.assertEqual(state, 3)
 | 
			
		||||
 | 
			
		||||
        uid = generate_id()
 | 
			
		||||
        self.driver.find_element(By.CSS_SELECTOR, "body").send_keys(
 | 
			
		||||
            f'echo "{uid}" > /tmp/test' + Keys.ENTER
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        sleep(2)
 | 
			
		||||
 | 
			
		||||
        _, output = test_ssh.exec_run("cat /tmp/test")
 | 
			
		||||
        self.assertEqual(output, f"{uid}\n".encode())
 | 
			
		||||
@ -1,7 +1,6 @@
 | 
			
		||||
"""Radius e2e tests"""
 | 
			
		||||
 | 
			
		||||
from dataclasses import asdict
 | 
			
		||||
from time import sleep
 | 
			
		||||
 | 
			
		||||
from pyrad.client import Client
 | 
			
		||||
from pyrad.dictionary import Dictionary
 | 
			
		||||
@ -50,17 +49,7 @@ class TestProviderRadius(SeleniumTestCase):
 | 
			
		||||
        outpost.providers.add(radius)
 | 
			
		||||
 | 
			
		||||
        self.start_radius(outpost)
 | 
			
		||||
 | 
			
		||||
        # Wait until outpost healthcheck succeeds
 | 
			
		||||
        healthcheck_retries = 0
 | 
			
		||||
        while healthcheck_retries < 50:  # noqa: PLR2004
 | 
			
		||||
            if len(outpost.state) > 0:
 | 
			
		||||
                state = outpost.state[0]
 | 
			
		||||
                if state.last_seen:
 | 
			
		||||
                    break
 | 
			
		||||
            healthcheck_retries += 1
 | 
			
		||||
            sleep(0.5)
 | 
			
		||||
        sleep(5)
 | 
			
		||||
        self.wait_for_outpost(outpost)
 | 
			
		||||
        return outpost
 | 
			
		||||
 | 
			
		||||
    @retry()
 | 
			
		||||
 | 
			
		||||
@ -419,7 +419,6 @@ class TestSourceSAML(SeleniumTestCase):
 | 
			
		||||
        # Wait until we're logged in
 | 
			
		||||
        self.wait_for_url(self.if_user_url())
 | 
			
		||||
 | 
			
		||||
        # sleep(999999)
 | 
			
		||||
        self.assert_user(
 | 
			
		||||
            User.objects.exclude(username="akadmin")
 | 
			
		||||
            .exclude(username__startswith="ak-outpost")
 | 
			
		||||
 | 
			
		||||
@ -12,8 +12,8 @@ from typing import Any
 | 
			
		||||
from unittest.case import TestCase
 | 
			
		||||
from urllib.parse import urlencode
 | 
			
		||||
 | 
			
		||||
from channels.testing import ChannelsLiveServerTestCase
 | 
			
		||||
from django.apps import apps
 | 
			
		||||
from django.contrib.staticfiles.testing import StaticLiveServerTestCase
 | 
			
		||||
from django.db import connection
 | 
			
		||||
from django.db.migrations.loader import MigrationLoader
 | 
			
		||||
from django.test.testcases import TransactionTestCase
 | 
			
		||||
@ -35,6 +35,8 @@ from authentik.core.api.users import UserSerializer
 | 
			
		||||
from authentik.core.models import User
 | 
			
		||||
from authentik.core.tests.utils import create_test_admin_user
 | 
			
		||||
from authentik.lib.generators import generate_id
 | 
			
		||||
from authentik.outposts.models import Outpost
 | 
			
		||||
from tests.e2e._process import TestDatabaseProcess
 | 
			
		||||
 | 
			
		||||
RETRIES = int(environ.get("RETRIES", "3"))
 | 
			
		||||
IS_CI = "CI" in environ
 | 
			
		||||
@ -58,10 +60,13 @@ def get_local_ip() -> str:
 | 
			
		||||
    return ip_addr
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ContainerException(Exception): ...
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class DockerTestCase(TestCase):
 | 
			
		||||
    """Mixin for dealing with containers"""
 | 
			
		||||
 | 
			
		||||
    max_healthcheck_attempts = 30
 | 
			
		||||
    max_healthcheck_attempts = 50
 | 
			
		||||
 | 
			
		||||
    __client: DockerClient
 | 
			
		||||
    __network: Network
 | 
			
		||||
@ -95,7 +100,7 @@ class DockerTestCase(TestCase):
 | 
			
		||||
            sleep(1)
 | 
			
		||||
            attempt += 1
 | 
			
		||||
            if attempt >= self.max_healthcheck_attempts:
 | 
			
		||||
                self.failureException("Container failed to start")
 | 
			
		||||
                raise ContainerException("Container failed to start")
 | 
			
		||||
 | 
			
		||||
    def get_container_image(self, base: str) -> str:
 | 
			
		||||
        """Try to pull docker image based on git branch, fallback to main if not found."""
 | 
			
		||||
@ -152,13 +157,17 @@ class DockerTestCase(TestCase):
 | 
			
		||||
        self.__network.remove()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SeleniumTestCase(DockerTestCase, StaticLiveServerTestCase):
 | 
			
		||||
    """StaticLiveServerTestCase which automatically creates a Webdriver instance"""
 | 
			
		||||
class SeleniumTestCase(DockerTestCase, ChannelsLiveServerTestCase):
 | 
			
		||||
    """ChannelsLiveServerTestCase which automatically creates a Webdriver instance"""
 | 
			
		||||
 | 
			
		||||
    ProtocolServerProcess = TestDatabaseProcess
 | 
			
		||||
 | 
			
		||||
    host = get_local_ip()
 | 
			
		||||
    wait_timeout: int
 | 
			
		||||
    user: User
 | 
			
		||||
 | 
			
		||||
    serve_static = True
 | 
			
		||||
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        if IS_CI:
 | 
			
		||||
            print("::group::authentik Logs", file=stderr)
 | 
			
		||||
@ -265,6 +274,18 @@ class SeleniumTestCase(DockerTestCase, StaticLiveServerTestCase):
 | 
			
		||||
        self.assertEqual(user["name"].value, expected_user.name)
 | 
			
		||||
        self.assertEqual(user["email"].value, expected_user.email)
 | 
			
		||||
 | 
			
		||||
    def wait_for_outpost(self, outpost: Outpost, tries=50):
 | 
			
		||||
        """Wait until outpost healthcheck succeeds"""
 | 
			
		||||
        healthcheck_retries = 0
 | 
			
		||||
        while healthcheck_retries < tries:
 | 
			
		||||
            if len(outpost.state) > 0:
 | 
			
		||||
                state = outpost.state[0]
 | 
			
		||||
                if state.last_seen:
 | 
			
		||||
                    return
 | 
			
		||||
            healthcheck_retries += 1
 | 
			
		||||
            sleep(0.5)
 | 
			
		||||
        raise ContainerException("Outpost failed to become healthy")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@lru_cache
 | 
			
		||||
def get_loader():
 | 
			
		||||
@ -277,7 +298,12 @@ def retry(max_retires=RETRIES, exceptions=None):
 | 
			
		||||
    """Retry test multiple times. Default to catching Selenium Timeout Exception"""
 | 
			
		||||
 | 
			
		||||
    if not exceptions:
 | 
			
		||||
        exceptions = [WebDriverException, TimeoutException, NoSuchElementException]
 | 
			
		||||
        exceptions = [
 | 
			
		||||
            WebDriverException,
 | 
			
		||||
            TimeoutException,
 | 
			
		||||
            NoSuchElementException,
 | 
			
		||||
            ContainerException,
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
    logger = get_logger()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user