providers/proxy: add tests for proxy basic auth (#4357)
* add tests for proxy basic auth Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * stop bandit from complaining Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * add API tests Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * more tests Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
		@ -1,6 +1,7 @@
 | 
				
			|||||||
"""ProxyProvider API Views"""
 | 
					"""ProxyProvider API Views"""
 | 
				
			||||||
from typing import Any, Optional
 | 
					from typing import Any, Optional
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from django.utils.translation import gettext_lazy as _
 | 
				
			||||||
from drf_spectacular.utils import extend_schema_field
 | 
					from drf_spectacular.utils import extend_schema_field
 | 
				
			||||||
from rest_framework.exceptions import ValidationError
 | 
					from rest_framework.exceptions import ValidationError
 | 
				
			||||||
from rest_framework.fields import CharField, ListField, ReadOnlyField, SerializerMethodField
 | 
					from rest_framework.fields import CharField, ListField, ReadOnlyField, SerializerMethodField
 | 
				
			||||||
@ -39,22 +40,34 @@ class ProxyProviderSerializer(ProviderSerializer):
 | 
				
			|||||||
    redirect_uris = CharField(read_only=True)
 | 
					    redirect_uris = CharField(read_only=True)
 | 
				
			||||||
    outpost_set = ListField(child=CharField(), read_only=True, source="outpost_set.all")
 | 
					    outpost_set = ListField(child=CharField(), read_only=True, source="outpost_set.all")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def validate_basic_auth_enabled(self, value: bool) -> bool:
 | 
				
			||||||
 | 
					        """Ensure user and password attributes are set"""
 | 
				
			||||||
 | 
					        if value:
 | 
				
			||||||
 | 
					            if (
 | 
				
			||||||
 | 
					                self.initial_data.get("basic_auth_password_attribute", "") == ""
 | 
				
			||||||
 | 
					                or self.initial_data.get("basic_auth_user_attribute", "") == ""
 | 
				
			||||||
 | 
					            ):
 | 
				
			||||||
 | 
					                raise ValidationError(
 | 
				
			||||||
 | 
					                    _("User and password attributes must be set when basic auth is enabled.")
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					        return value
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def validate(self, attrs) -> dict[Any, str]:
 | 
					    def validate(self, attrs) -> dict[Any, str]:
 | 
				
			||||||
        """Check that internal_host is set when mode is Proxy"""
 | 
					        """Check that internal_host is set when mode is Proxy"""
 | 
				
			||||||
        if (
 | 
					        if (
 | 
				
			||||||
            attrs.get("mode", ProxyMode.PROXY) == ProxyMode.PROXY
 | 
					            attrs.get("mode", ProxyMode.PROXY) == ProxyMode.PROXY
 | 
				
			||||||
            and attrs.get("internal_host", "") == ""
 | 
					            and attrs.get("internal_host", "") == ""
 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
            raise ValidationError("Internal host cannot be empty when forward auth is disabled.")
 | 
					            raise ValidationError(_("Internal host cannot be empty when forward auth is disabled."))
 | 
				
			||||||
        return attrs
 | 
					        return attrs
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create(self, validated_data):
 | 
					    def create(self, validated_data: dict):
 | 
				
			||||||
        instance: ProxyProvider = super().create(validated_data)
 | 
					        instance: ProxyProvider = super().create(validated_data)
 | 
				
			||||||
        instance.set_oauth_defaults()
 | 
					        instance.set_oauth_defaults()
 | 
				
			||||||
        instance.save()
 | 
					        instance.save()
 | 
				
			||||||
        return instance
 | 
					        return instance
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def update(self, instance: ProxyProvider, validated_data):
 | 
					    def update(self, instance: ProxyProvider, validated_data: dict):
 | 
				
			||||||
        instance = super().update(instance, validated_data)
 | 
					        instance = super().update(instance, validated_data)
 | 
				
			||||||
        instance.set_oauth_defaults()
 | 
					        instance.set_oauth_defaults()
 | 
				
			||||||
        instance.save()
 | 
					        instance.save()
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										122
									
								
								authentik/providers/proxy/tests.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										122
									
								
								authentik/providers/proxy/tests.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,122 @@
 | 
				
			|||||||
 | 
					"""proxy provider tests"""
 | 
				
			||||||
 | 
					from django.urls import reverse
 | 
				
			||||||
 | 
					from rest_framework.test import APITestCase
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 | 
				
			||||||
 | 
					from authentik.lib.generators import generate_id
 | 
				
			||||||
 | 
					from authentik.providers.oauth2.models import ClientTypes
 | 
				
			||||||
 | 
					from authentik.providers.proxy.models import ProxyMode, ProxyProvider
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class ProxyProviderTests(APITestCase):
 | 
				
			||||||
 | 
					    """proxy provider tests"""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def setUp(self) -> None:
 | 
				
			||||||
 | 
					        self.user = create_test_admin_user()
 | 
				
			||||||
 | 
					        self.client.force_login(self.user)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_basic_auth(self):
 | 
				
			||||||
 | 
					        """Test basic_auth_enabled"""
 | 
				
			||||||
 | 
					        response = self.client.post(
 | 
				
			||||||
 | 
					            reverse("authentik_api:proxyprovider-list"),
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                "name": generate_id(),
 | 
				
			||||||
 | 
					                "mode": ProxyMode.PROXY,
 | 
				
			||||||
 | 
					                "authorization_flow": create_test_flow().pk.hex,
 | 
				
			||||||
 | 
					                "external_host": "http://localhost",
 | 
				
			||||||
 | 
					                "internal_host": "http://localhost",
 | 
				
			||||||
 | 
					                "basic_auth_enabled": True,
 | 
				
			||||||
 | 
					                "basic_auth_user_attribute": generate_id(),
 | 
				
			||||||
 | 
					                "basic_auth_password_attribute": generate_id(),
 | 
				
			||||||
 | 
					            },
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self.assertEqual(response.status_code, 201)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_basic_auth_invalid(self):
 | 
				
			||||||
 | 
					        """Test basic_auth_enabled"""
 | 
				
			||||||
 | 
					        response = self.client.post(
 | 
				
			||||||
 | 
					            reverse("authentik_api:proxyprovider-list"),
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                "name": generate_id(),
 | 
				
			||||||
 | 
					                "mode": ProxyMode.PROXY,
 | 
				
			||||||
 | 
					                "authorization_flow": create_test_flow().pk.hex,
 | 
				
			||||||
 | 
					                "external_host": "http://localhost",
 | 
				
			||||||
 | 
					                "internal_host": "http://localhost",
 | 
				
			||||||
 | 
					                "basic_auth_enabled": True,
 | 
				
			||||||
 | 
					            },
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self.assertEqual(response.status_code, 400)
 | 
				
			||||||
 | 
					        self.assertJSONEqual(
 | 
				
			||||||
 | 
					            response.content.decode(),
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                "basic_auth_enabled": [
 | 
				
			||||||
 | 
					                    "User and password attributes must be set when basic auth is enabled."
 | 
				
			||||||
 | 
					                ]
 | 
				
			||||||
 | 
					            },
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_validate(self):
 | 
				
			||||||
 | 
					        """Test validate"""
 | 
				
			||||||
 | 
					        response = self.client.post(
 | 
				
			||||||
 | 
					            reverse("authentik_api:proxyprovider-list"),
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                "name": generate_id(),
 | 
				
			||||||
 | 
					                "mode": ProxyMode.PROXY,
 | 
				
			||||||
 | 
					                "authorization_flow": create_test_flow().pk.hex,
 | 
				
			||||||
 | 
					                "external_host": "http://localhost",
 | 
				
			||||||
 | 
					            },
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self.assertEqual(response.status_code, 400)
 | 
				
			||||||
 | 
					        self.assertJSONEqual(
 | 
				
			||||||
 | 
					            response.content.decode(),
 | 
				
			||||||
 | 
					            {"non_field_errors": ["Internal host cannot be empty when forward auth is disabled."]},
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_create_defaults(self):
 | 
				
			||||||
 | 
					        """Test create"""
 | 
				
			||||||
 | 
					        name = generate_id()
 | 
				
			||||||
 | 
					        response = self.client.post(
 | 
				
			||||||
 | 
					            reverse("authentik_api:proxyprovider-list"),
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                "name": name,
 | 
				
			||||||
 | 
					                "mode": ProxyMode.PROXY,
 | 
				
			||||||
 | 
					                "authorization_flow": create_test_flow().pk.hex,
 | 
				
			||||||
 | 
					                "external_host": "http://localhost",
 | 
				
			||||||
 | 
					                "internal_host": "http://localhost",
 | 
				
			||||||
 | 
					            },
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self.assertEqual(response.status_code, 201)
 | 
				
			||||||
 | 
					        provider: ProxyProvider = ProxyProvider.objects.get(name=name)
 | 
				
			||||||
 | 
					        self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_update_defaults(self):
 | 
				
			||||||
 | 
					        """Test create"""
 | 
				
			||||||
 | 
					        name = generate_id()
 | 
				
			||||||
 | 
					        response = self.client.post(
 | 
				
			||||||
 | 
					            reverse("authentik_api:proxyprovider-list"),
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                "name": name,
 | 
				
			||||||
 | 
					                "mode": ProxyMode.PROXY,
 | 
				
			||||||
 | 
					                "authorization_flow": create_test_flow().pk.hex,
 | 
				
			||||||
 | 
					                "external_host": "http://localhost",
 | 
				
			||||||
 | 
					                "internal_host": "http://localhost",
 | 
				
			||||||
 | 
					            },
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self.assertEqual(response.status_code, 201)
 | 
				
			||||||
 | 
					        provider: ProxyProvider = ProxyProvider.objects.get(name=name)
 | 
				
			||||||
 | 
					        self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
 | 
				
			||||||
 | 
					        provider.client_type = ClientTypes.PUBLIC
 | 
				
			||||||
 | 
					        provider.save()
 | 
				
			||||||
 | 
					        response = self.client.put(
 | 
				
			||||||
 | 
					            reverse("authentik_api:proxyprovider-detail", kwargs={"pk": provider.pk}),
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                "name": name,
 | 
				
			||||||
 | 
					                "mode": ProxyMode.PROXY,
 | 
				
			||||||
 | 
					                "authorization_flow": create_test_flow().pk.hex,
 | 
				
			||||||
 | 
					                "external_host": "http://localhost",
 | 
				
			||||||
 | 
					                "internal_host": "http://localhost",
 | 
				
			||||||
 | 
					            },
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self.assertEqual(response.status_code, 200)
 | 
				
			||||||
 | 
					        provider: ProxyProvider = ProxyProvider.objects.get(name=name)
 | 
				
			||||||
 | 
					        self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
 | 
				
			||||||
@ -47,7 +47,6 @@ class TestProviderLDAP(SeleniumTestCase):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def _prepare(self) -> User:
 | 
					    def _prepare(self) -> User:
 | 
				
			||||||
        """prepare user, provider, app and container"""
 | 
					        """prepare user, provider, app and container"""
 | 
				
			||||||
        # set additionalHeaders to test later
 | 
					 | 
				
			||||||
        self.user.attributes["extraAttribute"] = "bar"
 | 
					        self.user.attributes["extraAttribute"] = "bar"
 | 
				
			||||||
        self.user.save()
 | 
					        self.user.save()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -1,4 +1,5 @@
 | 
				
			|||||||
"""Proxy and Outpost e2e tests"""
 | 
					"""Proxy and Outpost e2e tests"""
 | 
				
			||||||
 | 
					from base64 import b64encode
 | 
				
			||||||
from dataclasses import asdict
 | 
					from dataclasses import asdict
 | 
				
			||||||
from sys import platform
 | 
					from sys import platform
 | 
				
			||||||
from time import sleep
 | 
					from time import sleep
 | 
				
			||||||
@ -14,6 +15,7 @@ from authentik import __version__
 | 
				
			|||||||
from authentik.blueprints.tests import apply_blueprint, reconcile_app
 | 
					from authentik.blueprints.tests import apply_blueprint, reconcile_app
 | 
				
			||||||
from authentik.core.models import Application
 | 
					from authentik.core.models import Application
 | 
				
			||||||
from authentik.flows.models import Flow
 | 
					from authentik.flows.models import Flow
 | 
				
			||||||
 | 
					from authentik.lib.generators import generate_id
 | 
				
			||||||
from authentik.outposts.models import DockerServiceConnection, Outpost, OutpostConfig, OutpostType
 | 
					from authentik.outposts.models import DockerServiceConnection, Outpost, OutpostConfig, OutpostType
 | 
				
			||||||
from authentik.outposts.tasks import outpost_local_connection
 | 
					from authentik.outposts.tasks import outpost_local_connection
 | 
				
			||||||
from authentik.providers.proxy.models import ProxyProvider
 | 
					from authentik.providers.proxy.models import ProxyProvider
 | 
				
			||||||
@ -119,6 +121,78 @@ class TestProviderProxy(SeleniumTestCase):
 | 
				
			|||||||
        full_body_text = self.driver.find_element(By.CSS_SELECTOR, ".pf-c-title.pf-m-3xl").text
 | 
					        full_body_text = self.driver.find_element(By.CSS_SELECTOR, ".pf-c-title.pf-m-3xl").text
 | 
				
			||||||
        self.assertIn("You've logged out of proxy.", full_body_text)
 | 
					        self.assertIn("You've logged out of proxy.", full_body_text)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @retry()
 | 
				
			||||||
 | 
					    @apply_blueprint(
 | 
				
			||||||
 | 
					        "default/10-flow-default-authentication-flow.yaml",
 | 
				
			||||||
 | 
					        "default/10-flow-default-invalidation-flow.yaml",
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    @apply_blueprint(
 | 
				
			||||||
 | 
					        "default/20-flow-default-provider-authorization-explicit-consent.yaml",
 | 
				
			||||||
 | 
					        "default/20-flow-default-provider-authorization-implicit-consent.yaml",
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    @apply_blueprint(
 | 
				
			||||||
 | 
					        "system/providers-oauth2.yaml",
 | 
				
			||||||
 | 
					        "system/providers-proxy.yaml",
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    @reconcile_app("authentik_crypto")
 | 
				
			||||||
 | 
					    def test_proxy_basic_auth(self):
 | 
				
			||||||
 | 
					        """Test simple outpost setup with single provider"""
 | 
				
			||||||
 | 
					        cred = generate_id()
 | 
				
			||||||
 | 
					        attr = "basic-password"  # nosec
 | 
				
			||||||
 | 
					        self.user.attributes["basic-username"] = cred
 | 
				
			||||||
 | 
					        self.user.attributes[attr] = cred
 | 
				
			||||||
 | 
					        self.user.save()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        proxy: ProxyProvider = ProxyProvider.objects.create(
 | 
				
			||||||
 | 
					            name="proxy_provider",
 | 
				
			||||||
 | 
					            authorization_flow=Flow.objects.get(
 | 
				
			||||||
 | 
					                slug="default-provider-authorization-implicit-consent"
 | 
				
			||||||
 | 
					            ),
 | 
				
			||||||
 | 
					            internal_host="http://localhost",
 | 
				
			||||||
 | 
					            external_host="http://localhost:9000",
 | 
				
			||||||
 | 
					            basic_auth_enabled=True,
 | 
				
			||||||
 | 
					            basic_auth_user_attribute="basic-username",
 | 
				
			||||||
 | 
					            basic_auth_password_attribute=attr,
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        # Ensure OAuth2 Params are set
 | 
				
			||||||
 | 
					        proxy.set_oauth_defaults()
 | 
				
			||||||
 | 
					        proxy.save()
 | 
				
			||||||
 | 
					        # we need to create an application to actually access the proxy
 | 
				
			||||||
 | 
					        Application.objects.create(name="proxy", slug="proxy", provider=proxy)
 | 
				
			||||||
 | 
					        outpost: Outpost = Outpost.objects.create(
 | 
				
			||||||
 | 
					            name="proxy_outpost",
 | 
				
			||||||
 | 
					            type=OutpostType.PROXY,
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        outpost.providers.add(proxy)
 | 
				
			||||||
 | 
					        outpost.build_user_permissions(outpost.user)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.proxy_container = self.start_proxy(outpost)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Wait until outpost healthcheck succeeds
 | 
				
			||||||
 | 
					        healthcheck_retries = 0
 | 
				
			||||||
 | 
					        while healthcheck_retries < 50:
 | 
				
			||||||
 | 
					            if len(outpost.state) > 0:
 | 
				
			||||||
 | 
					                state = outpost.state[0]
 | 
				
			||||||
 | 
					                if state.last_seen:
 | 
				
			||||||
 | 
					                    break
 | 
				
			||||||
 | 
					            healthcheck_retries += 1
 | 
				
			||||||
 | 
					            sleep(0.5)
 | 
				
			||||||
 | 
					        sleep(5)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.driver.get("http://localhost:9000")
 | 
				
			||||||
 | 
					        self.login()
 | 
				
			||||||
 | 
					        sleep(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        full_body_text = self.driver.find_element(By.CSS_SELECTOR, "pre").text
 | 
				
			||||||
 | 
					        self.assertIn(f"X-Authentik-Username: {self.user.username}", full_body_text)
 | 
				
			||||||
 | 
					        auth_header = b64encode(f"{cred}:{cred}".encode()).decode()
 | 
				
			||||||
 | 
					        self.assertIn(f"Authorization: Basic {auth_header}", full_body_text)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.driver.get("http://localhost:9000/outpost.goauthentik.io/sign_out")
 | 
				
			||||||
 | 
					        sleep(2)
 | 
				
			||||||
 | 
					        full_body_text = self.driver.find_element(By.CSS_SELECTOR, ".pf-c-title.pf-m-3xl").text
 | 
				
			||||||
 | 
					        self.assertIn("You've logged out of proxy.", full_body_text)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@skipUnless(platform.startswith("linux"), "requires local docker")
 | 
					@skipUnless(platform.startswith("linux"), "requires local docker")
 | 
				
			||||||
class TestProviderProxyConnect(ChannelsLiveServerTestCase):
 | 
					class TestProviderProxyConnect(ChannelsLiveServerTestCase):
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user