diff --git a/tests/openid_conformance/conformance.py b/tests/openid_conformance/conformance.py index 8a0366610d..b83d579ee9 100644 --- a/tests/openid_conformance/conformance.py +++ b/tests/openid_conformance/conformance.py @@ -12,6 +12,7 @@ from urllib3.util.retry import Retry class Conformance: HTTP_OK = 200 HTTP_CREATED = 201 + HTTP_NO_CONTENT = 204 def __init__(self, api_url_base, api_token, verify_ssl): if not api_url_base.endswith("/"): @@ -116,6 +117,15 @@ class Conformance: ) return response.json() + def delete_test_plan(self, plan_id: str): + url = f"{self.api_url_base}api/plan/{plan_id}" + response = self.session.delete(url) + if response.status_code != Conformance.HTTP_NO_CONTENT: + raise Exception( + f"create_test_plan failed - HTTP {response.status_code} {response.content}" + ) + return response.json() + def create_test(self, test_name, configuration): url = f"{self.api_url_base}api/runner" payload = {"test": test_name} diff --git a/tests/openid_conformance/generator.py b/tests/openid_conformance/generator.py new file mode 100644 index 0000000000..c5da90b2bd --- /dev/null +++ b/tests/openid_conformance/generator.py @@ -0,0 +1,163 @@ +from functools import partial +from json import dumps +from pathlib import Path +from time import sleep + +from selenium.webdriver.common.by import By +from selenium.webdriver.support import expected_conditions as ec + +from authentik.blueprints.tests import apply_blueprint, reconcile_app +from authentik.providers.oauth2.models import OAuth2Provider +from tests.e2e.utils import SeleniumTestCase, get_local_ip, retry +from tests.openid_conformance.conformance import Conformance + + +class BaseOpenIDConformance(SeleniumTestCase): + + conformance: Conformance + + def run_test(self, module_id: str): + """Process instructions for a single test, navigate to browser URLs and take screenshots""" + tested_browser_url = 0 + uploaded_image = 0 + cleared_cookies = False + while True: + # Fetch all info + test_status = self.conformance.get_test_status(module_id) + test_log = self.conformance.get_test_log(module_id) + test_info = self.conformance.get_module_info(module_id) + # Check if we need to clear cookies - tests only indicates this in their written summary + # so this check is a bit brittle + if "cookies" in test_info["summary"] and not cleared_cookies: + # Navigate to our origin to delete cookies in the right context + self.driver.get(self.url("authentik_api:user-me") + "?format=json") + self.driver.delete_all_cookies() + cleared_cookies = True + # Check if we need deal with any browser URLs + browser_urls = test_status.get("browser", {}).get("urls", []) + if len(browser_urls) > tested_browser_url: + self.do_browser(browser_urls[tested_browser_url]) + tested_browser_url += 1 + continue + # Check if we need to upload any items + upload_items = [x for x in test_log if "upload" in x] + if len(upload_items) > uploaded_image: + screenshot = self.get_screenshot() + self.conformance.upload_image( + module_id, upload_items[uploaded_image]["upload"], screenshot + ) + sleep(3) + uploaded_image += 1 + continue + if test_info["status"] in ["INTERRUPTED", "FINISHED"]: + return + sleep(0.1) + + def get_screenshot(self): + """Get a screenshot, but resize the window first so we don't exceed 500kb""" + self.driver.set_window_size(800, 600) + screenshot = f"data:image/jpeg;base64,{self.driver.get_screenshot_as_base64()}" + self.driver.maximize_window() + return screenshot + + def do_browser(self, url): + """For any specific OpenID Conformance test, execute the operations required""" + self.driver.get(url) + if "if/flow/default-authentication-flow" in self.driver.current_url: + self.login() + self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "#complete"))) + + +def generate(plan_name: str): + + test_plan_name = "oidcc-basic-certification-test-plan" + provider_a = OAuth2Provider.objects.get(client_id="4054d882aff59755f2f279968b97ce8806a926e1") + provider_b = OAuth2Provider.objects.get(client_id="ad64aeaf1efe388ecf4d28fcc537e8de08bcae26") + test_plan_config = { + "alias": "authentik", + "description": "authentik", + "server": {}, + "client": { + "client_id": "4054d882aff59755f2f279968b97ce8806a926e1", + "client_secret": provider_a.client_secret, + }, + "client_secret_post": { + "client_id": "4054d882aff59755f2f279968b97ce8806a926e1", + "client_secret": provider_a.client_secret, + }, + "client2": { + "client_id": "ad64aeaf1efe388ecf4d28fcc537e8de08bcae26", + "client_secret": provider_b.client_secret, + }, + "consent": {}, + } + + # Create a Conformance instance... + conformance = Conformance(f"https://{get_local_ip()}:8443/", None, verify_ssl=False) + + test_plan = conformance.create_test_plan( + test_plan_name, + dumps(test_plan_config), + { + "server_metadata": "discovery", + "client_registration": "static_client", + }, + ) + + @retry() + @apply_blueprint( + "default/flow-default-authentication-flow.yaml", + "default/flow-default-invalidation-flow.yaml", + ) + @apply_blueprint( + "default/flow-default-provider-authorization-implicit-consent.yaml", + "default/flow-default-provider-invalidation.yaml", + ) + @apply_blueprint("system/providers-oauth2.yaml") + @reconcile_app("authentik_crypto") + @apply_blueprint("testing/oidc-conformance.yaml") + def tester_func(self: BaseOpenIDConformance, test: dict): + # Fetch name and variant of the next test to run + module_name = test["testModule"] + variant = test["variant"] + module_instance = self.conformance.create_test_from_plan_with_variant( + self.plan_id, module_name, variant + ) + module_id = module_instance["id"] + self.run_test(module_id) + self.conformance.wait_for_state(module_id, ["FINISHED"], timeout=self.wait_timeout) + sleep(2) + + class test_cls(BaseOpenIDConformance): + + def setUp(self): + test_plan = conformance.create_test_plan( + test_plan_name, + dumps(test_plan_config), + { + "server_metadata": "discovery", + "client_registration": "static_client", + }, + ) + test_plan_config["server"] = { + "discoveryUrl": self.url( + "authentik_providers_oauth2:provider-info", application_slug="conformance" + ), + } + self.plan_id = test_plan["id"] + return super().setUp() + + def tearDown(self): + self.conformance.exporthtml(self.plan_id, Path(__file__).parent / "exports") + return super().tearDown() + + for test in test_plan["modules"]: + setattr( + test_cls, + f"test_{test["testModule"]}_{dumps(["variant"])}", + partial(tester_func, test=test), + ) + + conformance.delete_test_plan(test_plan["id"]) + + return test_cls diff --git a/tests/openid_conformance/test_conformance.py b/tests/openid_conformance/test_conformance.py index 0b7c794053..5927c06279 100644 --- a/tests/openid_conformance/test_conformance.py +++ b/tests/openid_conformance/test_conformance.py @@ -84,6 +84,74 @@ class TestOpenIDConformance(SeleniumTestCase): sleep(2) self.conformance.exporthtml(plan_id, Path(__file__).parent / "exports") + @retry() + @apply_blueprint( + "default/flow-default-authentication-flow.yaml", + "default/flow-default-invalidation-flow.yaml", + ) + @apply_blueprint( + "default/flow-default-provider-authorization-implicit-consent.yaml", + "default/flow-default-provider-invalidation.yaml", + ) + @apply_blueprint("system/providers-oauth2.yaml") + @reconcile_app("authentik_crypto") + @apply_blueprint("testing/oidc-conformance.yaml") + def test_oidcc_implicit_certification_test(self): + test_plan_name = "oidcc-implicit-certification-test-plan" + provider_a = OAuth2Provider.objects.get( + client_id="4054d882aff59755f2f279968b97ce8806a926e1" + ) + provider_b = OAuth2Provider.objects.get( + client_id="ad64aeaf1efe388ecf4d28fcc537e8de08bcae26" + ) + test_plan_config = { + "alias": "authentik", + "description": "authentik", + "server": { + "discoveryUrl": self.url( + "authentik_providers_oauth2:provider-info", application_slug="conformance" + ), + }, + "client": { + "client_id": "4054d882aff59755f2f279968b97ce8806a926e1", + "client_secret": provider_a.client_secret, + }, + "client_secret_post": { + "client_id": "4054d882aff59755f2f279968b97ce8806a926e1", + "client_secret": provider_a.client_secret, + }, + "client2": { + "client_id": "ad64aeaf1efe388ecf4d28fcc537e8de08bcae26", + "client_secret": provider_b.client_secret, + }, + "consent": {}, + } + + # Create a Conformance instance... + self.conformance = Conformance(f"https://{self.host}:8443/", None, verify_ssl=False) + + test_plan = self.conformance.create_test_plan( + test_plan_name, + dumps(test_plan_config), + { + "server_metadata": "discovery", + "client_registration": "static_client", + }, + ) + plan_id = test_plan["id"] + for test in test_plan["modules"]: + with self.subTest(test["testModule"], **test["variant"]): + # Fetch name and variant of the next test to run + module_name = test["testModule"] + variant = test["variant"] + module_instance = self.conformance.create_test_from_plan_with_variant( + plan_id, module_name, variant + ) + module_id = module_instance["id"] + self.run_test(module_id) + self.conformance.wait_for_state(module_id, ["FINISHED"], timeout=self.wait_timeout) + sleep(2) + def run_test(self, module_id: str): """Process instructions for a single test, navigate to browser URLs and take screenshots""" tested_browser_url = 0