 b225b0200e
			
		
	
	b225b0200e
	
	
	
		
			
			* remove pyright Signed-off-by: Jens Langhammer <jens@goauthentik.io> * remove pylint Signed-off-by: Jens Langhammer <jens@goauthentik.io> * replace pylint with ruff Signed-off-by: Jens Langhammer <jens@goauthentik.io> * ruff fix Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> * fix UP038 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix DJ012 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix default arg Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix UP031 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * rename stage type to view Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix DJ008 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix remaining upgrade Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix PLR2004 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix B904 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix PLW2901 Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix remaining issues Signed-off-by: Jens Langhammer <jens@goauthentik.io> * prevent ruff from breaking the code Signed-off-by: Jens Langhammer <jens@goauthentik.io> * stages/prompt: refactor field building Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> * fix tests Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fix lint Signed-off-by: Jens Langhammer <jens@goauthentik.io> * fully remove isort Signed-off-by: Jens Langhammer <jens@goauthentik.io> --------- Signed-off-by: Jens Langhammer <jens@goauthentik.io> Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> Co-authored-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
		
			
				
	
	
		
			122 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			122 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Radius e2e tests"""
 | |
| 
 | |
| from dataclasses import asdict
 | |
| from time import sleep
 | |
| 
 | |
| from docker.client import DockerClient, from_env
 | |
| from docker.models.containers import Container
 | |
| from pyrad.client import Client
 | |
| from pyrad.dictionary import Dictionary
 | |
| from pyrad.packet import AccessAccept, AccessReject, AccessRequest
 | |
| 
 | |
| from authentik.blueprints.tests import apply_blueprint
 | |
| from authentik.core.models import Application, User
 | |
| from authentik.flows.models import Flow
 | |
| from authentik.lib.generators import generate_id, generate_key
 | |
| from authentik.outposts.models import Outpost, OutpostConfig, OutpostType
 | |
| from authentik.providers.radius.models import RadiusProvider
 | |
| from tests.e2e.utils import SeleniumTestCase, retry
 | |
| 
 | |
| 
 | |
| class TestProviderRadius(SeleniumTestCase):
 | |
|     """Radius Outpost e2e tests"""
 | |
| 
 | |
|     radius_container: Container
 | |
| 
 | |
|     def setUp(self):
 | |
|         super().setUp()
 | |
|         self.shared_secret = generate_key()
 | |
| 
 | |
|     def tearDown(self) -> None:
 | |
|         super().tearDown()
 | |
|         self.output_container_logs(self.radius_container)
 | |
|         self.radius_container.kill()
 | |
| 
 | |
|     def start_radius(self, outpost: Outpost) -> Container:
 | |
|         """Start radius container based on outpost created"""
 | |
|         client: DockerClient = from_env()
 | |
|         container = client.containers.run(
 | |
|             image=self.get_container_image("ghcr.io/goauthentik/dev-radius"),
 | |
|             detach=True,
 | |
|             ports={"1812/udp": "1812/udp"},
 | |
|             environment={
 | |
|                 "AUTHENTIK_HOST": self.live_server_url,
 | |
|                 "AUTHENTIK_TOKEN": outpost.token.key,
 | |
|             },
 | |
|         )
 | |
|         return container
 | |
| 
 | |
|     def _prepare(self) -> User:
 | |
|         """prepare user, provider, app and container"""
 | |
|         radius: RadiusProvider = RadiusProvider.objects.create(
 | |
|             name=generate_id(),
 | |
|             authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
 | |
|             shared_secret=self.shared_secret,
 | |
|         )
 | |
|         # we need to create an application to actually access radius
 | |
|         Application.objects.create(name="radius", slug=generate_id(), provider=radius)
 | |
|         outpost: Outpost = Outpost.objects.create(
 | |
|             name=generate_id(),
 | |
|             type=OutpostType.RADIUS,
 | |
|             _config=asdict(OutpostConfig(log_level="debug")),
 | |
|         )
 | |
|         outpost.providers.add(radius)
 | |
| 
 | |
|         self.radius_container = self.start_radius(outpost)
 | |
| 
 | |
|         # Wait until outpost healthcheck succeeds
 | |
|         healthcheck_retries = 0
 | |
|         while healthcheck_retries < 50:  # noqa: PLR2004
 | |
|             if len(outpost.state) > 0:
 | |
|                 state = outpost.state[0]
 | |
|                 if state.last_seen:
 | |
|                     break
 | |
|             healthcheck_retries += 1
 | |
|             sleep(0.5)
 | |
|         sleep(5)
 | |
|         return outpost
 | |
| 
 | |
|     @retry()
 | |
|     @apply_blueprint(
 | |
|         "default/flow-default-authentication-flow.yaml",
 | |
|         "default/flow-default-invalidation-flow.yaml",
 | |
|     )
 | |
|     def test_radius_bind_success(self):
 | |
|         """Test simple bind"""
 | |
|         self._prepare()
 | |
|         srv = Client(
 | |
|             server="localhost",
 | |
|             secret=self.shared_secret.encode(),
 | |
|             dict=Dictionary("tests/radius-dictionary"),
 | |
|         )
 | |
| 
 | |
|         req = srv.CreateAuthPacket(
 | |
|             code=AccessRequest, User_Name=self.user.username, NAS_Identifier="localhost"
 | |
|         )
 | |
|         req["User-Password"] = req.PwCrypt(self.user.username)
 | |
| 
 | |
|         reply = srv.SendPacket(req)
 | |
|         self.assertEqual(reply.code, AccessAccept)
 | |
| 
 | |
|     @retry()
 | |
|     @apply_blueprint(
 | |
|         "default/flow-default-authentication-flow.yaml",
 | |
|         "default/flow-default-invalidation-flow.yaml",
 | |
|     )
 | |
|     def test_radius_bind_fail(self):
 | |
|         """Test simple bind (failed)"""
 | |
|         self._prepare()
 | |
|         srv = Client(
 | |
|             server="localhost",
 | |
|             secret=self.shared_secret.encode(),
 | |
|             dict=Dictionary("tests/radius-dictionary"),
 | |
|         )
 | |
| 
 | |
|         req = srv.CreateAuthPacket(
 | |
|             code=AccessRequest, User_Name=self.user.username, NAS_Identifier="localhost"
 | |
|         )
 | |
|         req["User-Password"] = req.PwCrypt(self.user.username + "foo")
 | |
| 
 | |
|         reply = srv.SendPacket(req)
 | |
|         self.assertEqual(reply.code, AccessReject)
 |