* remove pyright Signed-off-by: Jens Langhammer <jens@goauthentik.io> * remove pylint Signed-off-by: Jens Langhammer <jens@goauthentik.io> * replace pylint with ruff Signed-off-by: Jens Langhammer <jens@goauthentik.io> * ruff fix Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> * fix UP038 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix DJ012 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix default arg Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix UP031 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * rename stage type to view Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix DJ008 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix remaining upgrade Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix PLR2004 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix B904 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix PLW2901 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix remaining issues Signed-off-by: Jens Langhammer <jens@goauthentik.io> * prevent ruff from breaking the code Signed-off-by: Jens Langhammer <jens@goauthentik.io> * stages/prompt: refactor field building Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> * fix tests Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix lint Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fully remove isort Signed-off-by: Jens Langhammer <jens@goauthentik.io> --------- Signed-off-by: Jens Langhammer <jens@goauthentik.io> Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> Co-authored-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
		
			
				
	
	
		
			122 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			122 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Radius e2e tests"""
 | 
						|
 | 
						|
from dataclasses import asdict
 | 
						|
from time import sleep
 | 
						|
 | 
						|
from docker.client import DockerClient, from_env
 | 
						|
from docker.models.containers import Container
 | 
						|
from pyrad.client import Client
 | 
						|
from pyrad.dictionary import Dictionary
 | 
						|
from pyrad.packet import AccessAccept, AccessReject, AccessRequest
 | 
						|
 | 
						|
from authentik.blueprints.tests import apply_blueprint
 | 
						|
from authentik.core.models import Application, User
 | 
						|
from authentik.flows.models import Flow
 | 
						|
from authentik.lib.generators import generate_id, generate_key
 | 
						|
from authentik.outposts.models import Outpost, OutpostConfig, OutpostType
 | 
						|
from authentik.providers.radius.models import RadiusProvider
 | 
						|
from tests.e2e.utils import SeleniumTestCase, retry
 | 
						|
 | 
						|
 | 
						|
class TestProviderRadius(SeleniumTestCase):
 | 
						|
    """Radius Outpost e2e tests"""
 | 
						|
 | 
						|
    radius_container: Container
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super().setUp()
 | 
						|
        self.shared_secret = generate_key()
 | 
						|
 | 
						|
    def tearDown(self) -> None:
 | 
						|
        super().tearDown()
 | 
						|
        self.output_container_logs(self.radius_container)
 | 
						|
        self.radius_container.kill()
 | 
						|
 | 
						|
    def start_radius(self, outpost: Outpost) -> Container:
 | 
						|
        """Start radius container based on outpost created"""
 | 
						|
        client: DockerClient = from_env()
 | 
						|
        container = client.containers.run(
 | 
						|
            image=self.get_container_image("ghcr.io/goauthentik/dev-radius"),
 | 
						|
            detach=True,
 | 
						|
            ports={"1812/udp": "1812/udp"},
 | 
						|
            environment={
 | 
						|
                "AUTHENTIK_HOST": self.live_server_url,
 | 
						|
                "AUTHENTIK_TOKEN": outpost.token.key,
 | 
						|
            },
 | 
						|
        )
 | 
						|
        return container
 | 
						|
 | 
						|
    def _prepare(self) -> User:
 | 
						|
        """prepare user, provider, app and container"""
 | 
						|
        radius: RadiusProvider = RadiusProvider.objects.create(
 | 
						|
            name=generate_id(),
 | 
						|
            authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
 | 
						|
            shared_secret=self.shared_secret,
 | 
						|
        )
 | 
						|
        # we need to create an application to actually access radius
 | 
						|
        Application.objects.create(name="radius", slug=generate_id(), provider=radius)
 | 
						|
        outpost: Outpost = Outpost.objects.create(
 | 
						|
            name=generate_id(),
 | 
						|
            type=OutpostType.RADIUS,
 | 
						|
            _config=asdict(OutpostConfig(log_level="debug")),
 | 
						|
        )
 | 
						|
        outpost.providers.add(radius)
 | 
						|
 | 
						|
        self.radius_container = self.start_radius(outpost)
 | 
						|
 | 
						|
        # Wait until outpost healthcheck succeeds
 | 
						|
        healthcheck_retries = 0
 | 
						|
        while healthcheck_retries < 50:  # noqa: PLR2004
 | 
						|
            if len(outpost.state) > 0:
 | 
						|
                state = outpost.state[0]
 | 
						|
                if state.last_seen:
 | 
						|
                    break
 | 
						|
            healthcheck_retries += 1
 | 
						|
            sleep(0.5)
 | 
						|
        sleep(5)
 | 
						|
        return outpost
 | 
						|
 | 
						|
    @retry()
 | 
						|
    @apply_blueprint(
 | 
						|
        "default/flow-default-authentication-flow.yaml",
 | 
						|
        "default/flow-default-invalidation-flow.yaml",
 | 
						|
    )
 | 
						|
    def test_radius_bind_success(self):
 | 
						|
        """Test simple bind"""
 | 
						|
        self._prepare()
 | 
						|
        srv = Client(
 | 
						|
            server="localhost",
 | 
						|
            secret=self.shared_secret.encode(),
 | 
						|
            dict=Dictionary("tests/radius-dictionary"),
 | 
						|
        )
 | 
						|
 | 
						|
        req = srv.CreateAuthPacket(
 | 
						|
            code=AccessRequest, User_Name=self.user.username, NAS_Identifier="localhost"
 | 
						|
        )
 | 
						|
        req["User-Password"] = req.PwCrypt(self.user.username)
 | 
						|
 | 
						|
        reply = srv.SendPacket(req)
 | 
						|
        self.assertEqual(reply.code, AccessAccept)
 | 
						|
 | 
						|
    @retry()
 | 
						|
    @apply_blueprint(
 | 
						|
        "default/flow-default-authentication-flow.yaml",
 | 
						|
        "default/flow-default-invalidation-flow.yaml",
 | 
						|
    )
 | 
						|
    def test_radius_bind_fail(self):
 | 
						|
        """Test simple bind (failed)"""
 | 
						|
        self._prepare()
 | 
						|
        srv = Client(
 | 
						|
            server="localhost",
 | 
						|
            secret=self.shared_secret.encode(),
 | 
						|
            dict=Dictionary("tests/radius-dictionary"),
 | 
						|
        )
 | 
						|
 | 
						|
        req = srv.CreateAuthPacket(
 | 
						|
            code=AccessRequest, User_Name=self.user.username, NAS_Identifier="localhost"
 | 
						|
        )
 | 
						|
        req["User-Password"] = req.PwCrypt(self.user.username + "foo")
 | 
						|
 | 
						|
        reply = srv.SendPacket(req)
 | 
						|
        self.assertEqual(reply.code, AccessReject)
 |