make tests mostly work
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
		@ -249,7 +249,7 @@ class Conformance:
 | 
			
		||||
            )
 | 
			
		||||
        return response.json()
 | 
			
		||||
 | 
			
		||||
    async def wait_for_state(self, module_id, required_states, timeout=240):
 | 
			
		||||
    def wait_for_state(self, module_id, required_states, timeout=240):
 | 
			
		||||
        timeout_at = time.time() + timeout
 | 
			
		||||
        while True:
 | 
			
		||||
            if time.time() > timeout_at:
 | 
			
		||||
@ -257,7 +257,7 @@ class Conformance:
 | 
			
		||||
                    f"Timed out waiting for test module {module_id} to be in one of states: {required_states}"
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            info = await self.get_module_info(module_id)
 | 
			
		||||
            info = self.get_module_info(module_id)
 | 
			
		||||
 | 
			
		||||
            status = info["status"]
 | 
			
		||||
            print(f"module id {module_id} status is {status}")
 | 
			
		||||
@ -266,7 +266,7 @@ class Conformance:
 | 
			
		||||
            if status == "INTERRUPTED":
 | 
			
		||||
                raise Exception(f"Test module {module_id} has moved to INTERRUPTED")
 | 
			
		||||
 | 
			
		||||
            await asyncio.sleep(1)
 | 
			
		||||
            time.sleep(1)
 | 
			
		||||
 | 
			
		||||
    async def close_client(self):
 | 
			
		||||
        self.httpclient.close()
 | 
			
		||||
 | 
			
		||||
@ -10,6 +10,8 @@ from selenium.webdriver.support import expected_conditions as ec
 | 
			
		||||
 | 
			
		||||
class TestOpenIDConformance(SeleniumTestCase):
 | 
			
		||||
 | 
			
		||||
    conformance: Conformance
 | 
			
		||||
 | 
			
		||||
    @retry()
 | 
			
		||||
    @apply_blueprint(
 | 
			
		||||
        "default/flow-default-authentication-flow.yaml",
 | 
			
		||||
@ -50,49 +52,67 @@ class TestOpenIDConformance(SeleniumTestCase):
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        # Create a Conformance instance...
 | 
			
		||||
        conformance = Conformance(f"https://{self.host}:8443/", None, verify_ssl=False)
 | 
			
		||||
        self.conformance = Conformance(f"https://{self.host}:8443/", None, verify_ssl=False)
 | 
			
		||||
 | 
			
		||||
        # Create a test plan instance and print the id of it
 | 
			
		||||
        test_plan = conformance.create_test_plan(
 | 
			
		||||
        test_plan = self.conformance.create_test_plan(
 | 
			
		||||
            test_plan_name, dumps(test_plan_config), test_variant_config
 | 
			
		||||
        )
 | 
			
		||||
        plan_id = test_plan["id"]
 | 
			
		||||
        n = 0
 | 
			
		||||
        for test in test_plan["modules"]:
 | 
			
		||||
            with self.subTest(test["testModule"]):
 | 
			
		||||
            with self.subTest(test["testModule"], **test["variant"]):
 | 
			
		||||
                # Fetch name and variant of the next test to run
 | 
			
		||||
                module_name = test["testModule"]
 | 
			
		||||
                variant = test["variant"]
 | 
			
		||||
                print(f"Module name: {module_name}")
 | 
			
		||||
                print(f"Variant: {dumps(variant)}")
 | 
			
		||||
 | 
			
		||||
                # Create an instance of that test
 | 
			
		||||
                module_instance = conformance.create_test_from_plan_with_variant(
 | 
			
		||||
                module_instance = self.conformance.create_test_from_plan_with_variant(
 | 
			
		||||
                    plan_id, module_name, variant
 | 
			
		||||
                )
 | 
			
		||||
                module_id = module_instance["id"]
 | 
			
		||||
                while True:
 | 
			
		||||
                    test_status = conformance.get_test_status(module_id)
 | 
			
		||||
                    print(test_status)
 | 
			
		||||
                    browser_urls = test_status.get("browser", {}).get("urls", [])
 | 
			
		||||
                    print(browser_urls)
 | 
			
		||||
                    if len(browser_urls) < 1:
 | 
			
		||||
                        continue
 | 
			
		||||
                    self.do_browser(browser_urls[0])
 | 
			
		||||
                    # Check if we need to upload any items
 | 
			
		||||
                    test_log = conformance.get_test_log(module_id)
 | 
			
		||||
                    upload_items = [x for x in test_log if "upload" in x]
 | 
			
		||||
                    if len(upload_items) > 0:
 | 
			
		||||
                        sleep(10)
 | 
			
		||||
                        for item in upload_items:
 | 
			
		||||
                            conformance.upload_image(
 | 
			
		||||
                                module_id, item, self.driver.get_screenshot_as_base64()
 | 
			
		||||
                            )
 | 
			
		||||
                    # Close tab we've opened earlier
 | 
			
		||||
                    # self.driver.close()
 | 
			
		||||
                    break
 | 
			
		||||
                self.run_test(module_id)
 | 
			
		||||
                self.conformance.wait_for_state(module_id, ["FINISHED"], timeout=self.wait_timeout)
 | 
			
		||||
            sleep(2)
 | 
			
		||||
            n += 1
 | 
			
		||||
 | 
			
		||||
    def run_test(self, module_id: str):
 | 
			
		||||
        tested_browser_url = 0
 | 
			
		||||
        uploaded_image = 0
 | 
			
		||||
        while True:
 | 
			
		||||
            # Fetch all info
 | 
			
		||||
            test_status = self.conformance.get_test_status(module_id)
 | 
			
		||||
            test_log = self.conformance.get_test_log(module_id)
 | 
			
		||||
            test_info = self.conformance.get_module_info(module_id)
 | 
			
		||||
            # Check if we need to clear cookies - tests only indicates this in their written summary
 | 
			
		||||
            # so this check is a bit brittle
 | 
			
		||||
            if "cookies" in test_info["summary"]:
 | 
			
		||||
                self.driver.delete_all_cookies()
 | 
			
		||||
            # print("test status", module_id, test_status)
 | 
			
		||||
            # print("test log", module_id, test_log)
 | 
			
		||||
            # Check if we need deal with any browser URLs
 | 
			
		||||
            browser_urls = test_status.get("browser", {}).get("urls", [])
 | 
			
		||||
            print("browser urls", len(browser_urls), tested_browser_url)
 | 
			
		||||
            if len(browser_urls) > tested_browser_url:
 | 
			
		||||
                print("browser url: Opening", tested_browser_url)
 | 
			
		||||
                self.do_browser(browser_urls[tested_browser_url])
 | 
			
		||||
                tested_browser_url += 1
 | 
			
		||||
            # Check if we need to upload any items
 | 
			
		||||
            upload_items = [x for x in test_log if "upload" in x]
 | 
			
		||||
            print("upload items", len(upload_items), uploaded_image)
 | 
			
		||||
            if len(upload_items) > uploaded_image:
 | 
			
		||||
                print("uploading image", uploaded_image)
 | 
			
		||||
                screenshot = self.get_screenshot()
 | 
			
		||||
                self.conformance.upload_image(
 | 
			
		||||
                    module_id, upload_items[uploaded_image]["upload"], screenshot
 | 
			
		||||
                )
 | 
			
		||||
                sleep(3)
 | 
			
		||||
                uploaded_image += 1
 | 
			
		||||
            if test_info["status"] in ["INTERRUPTED", "FINISHED"]:
 | 
			
		||||
                return
 | 
			
		||||
            # return
 | 
			
		||||
 | 
			
		||||
    def get_screenshot(self):
 | 
			
		||||
        """Get a screenshot, but resize the window first so we don't exceed 500kb"""
 | 
			
		||||
        self.driver.set_window_size(1280, 720)
 | 
			
		||||
        screenshot = f"data:image/jpeg;base64,{self.driver.get_screenshot_as_base64()}"
 | 
			
		||||
        self.driver.maximize_window()
 | 
			
		||||
        return screenshot
 | 
			
		||||
 | 
			
		||||
    def do_browser(self, url):
 | 
			
		||||
        """For any specific OpenID Conformance test, execute the operations required"""
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user