diff --git a/tests/openid_conformance/conformance.py b/tests/openid_conformance/conformance.py index 2566fc86c7..8a0366610d 100644 --- a/tests/openid_conformance/conformance.py +++ b/tests/openid_conformance/conformance.py @@ -1,272 +1,193 @@ -#!/usr/bin/env python3 -# -# python wrapper for conformance suite API - - -import asyncio import json import os import re import time -import traceback import zipfile -import httpx - - -class RetryTransport(httpx.HTTPTransport): - def handle_request( - self, - request: httpx.Request, - ) -> httpx.Response: - retry = 0 - resp = None - while retry < 5: - retry += 1 - if retry > 2: - time.sleep(1) - try: - if resp is not None: - resp.close() - resp = super().handle_request(request) - except Exception as e: - print(f"httpx {request.url} exception {e} caught - retrying") - continue - if resp.status_code >= 500 and resp.status_code < 600: - print(f"httpx {request.url} 5xx response - retrying") - continue - content_type = resp.headers.get("Content-Type") - if content_type is not None: - mime_type, _, _ = content_type.partition(";") - if mime_type == "application/json": - try: - resp.read() - resp.json() - except Exception as e: - traceback.print_exc() - print( - f"httpx {request.url} response not decodable as json '{e}' - retrying" - ) - continue - break - return resp +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry class Conformance: + HTTP_OK = 200 + HTTP_CREATED = 201 + def __init__(self, api_url_base, api_token, verify_ssl): if not api_url_base.endswith("/"): api_url_base += "/" self.api_url_base = api_url_base - transport = RetryTransport(verify=verify_ssl) - self.httpclient = httpx.Client(verify=verify_ssl, transport=transport, timeout=20) - headers = {"Content-Type": "application/json"} + self.session = requests.Session() + self.session.verify = verify_ssl + retries = Retry( + total=5, + backoff_factor=1, + status_forcelist=[500, 502, 503, 504], + allowed_methods=["GET", "POST"], + ) + self.session.mount("https://", HTTPAdapter(max_retries=retries)) + self.session.mount("http://", HTTPAdapter(max_retries=retries)) + + self.session.headers.update({"Content-Type": "application/json"}) if api_token is not None: - headers["Authorization"] = f"Bearer {api_token}" - self.httpclient.headers = headers + self.session.headers.update({"Authorization": f"Bearer {api_token}"}) - async def get_all_test_modules(self): - """Returns an array containing a dictionary per test module""" - api_url = f"{self.api_url_base}api/runner/available" - response = self.httpclient.get(api_url) - - if response.status_code != 200: + def get_all_test_modules(self): + url = f"{self.api_url_base}api/runner/available" + response = self.session.get(url) + if response.status_code != Conformance.HTTP_OK: raise Exception( - f"get_all_test_modules failed - HTTP {response.status_code:d} {response.content}" + f"get_all_test_modules failed - HTTP {response.status_code} {response.content}" ) return response.json() - def get_test_status(self,module_id): - """Returns an array containing a dictionary per test module""" - api_url = f"{self.api_url_base}api/runner/{module_id}" - response = self.httpclient.get(api_url) - - if response.status_code != 200: + def get_test_status(self, module_id): + url = f"{self.api_url_base}api/runner/{module_id}" + response = self.session.get(url) + if response.status_code != Conformance.HTTP_OK: raise Exception( - f"get_test_status failed - HTTP {response.status_code:d} {response.content}" + f"get_test_status failed - HTTP {response.status_code} {response.content}" ) return response.json() - async def exporthtml(self, plan_id, path): - for i in range(5): - api_url = f"{self.api_url_base}api/plan/exporthtml/{plan_id}" + def exporthtml(self, plan_id, path): + for _ in range(5): + url = f"{self.api_url_base}api/plan/exporthtml/{plan_id}" try: - with self.httpclient.stream("GET", api_url) as response: - if response.status_code != 200: + with self.session.get(url, stream=True) as response: + if response.status_code != Conformance.HTTP_OK: raise Exception( - f"exporthtml failed - HTTP {response.status_code:d} {response.content}" + f"exporthtml failed - HTTP {response.status_code} {response.content}" ) - d = response.headers["content-disposition"] - local_filename = re.findall('filename="(.+)"', d)[0] + cd = response.headers.get("content-disposition", "") + local_filename = re.findall('filename="(.+)"', cd)[0] full_path = os.path.join(path, local_filename) with open(full_path, "wb") as f: - for chunk in response.iter_bytes(): + for chunk in response.iter_content(chunk_size=8192): f.write(chunk) zip_file = zipfile.ZipFile(full_path) ret = zip_file.testzip() if ret is not None: - raise Exception( - f"exporthtml for {plan_id} downloaded corrupt zip file {full_path} - {str(ret)}" - ) + raise Exception(f"exporthtml returned corrupt zip file: {ret}") return full_path except Exception as e: - print(f"httpx {api_url} exception {e} caught - retrying") - await asyncio.sleep(1) + print(f"requests {url} exception {e} caught - retrying") + time.sleep(1) raise Exception(f"exporthtml for {plan_id} failed even after retries") - async def create_certification_package( + def create_certification_package( self, plan_id, conformance_pdf_path, rp_logs_zip_path=None, output_zip_directory="./" ): - """ - Create a complete certification package zip file which is written - to the directory specified by the 'output_zip_directory' parameter. - Calling this function will additionally publish and mark the test plan as immutable. + with ( + open(conformance_pdf_path, "rb") as cert_pdf, + open(rp_logs_zip_path, "rb") if rp_logs_zip_path else open(os.devnull, "rb") as rp_logs, + ): + files = { + "certificationOfConformancePdf": cert_pdf, + "clientSideData": rp_logs, + } - :param plan_id: The plan id for which to create the package. - :conformance_pdf_path: The path to the signed Certification of Conformance PDF document. - :rp_logs_zip_path: Required for RP tests and is the path to the client logs zip file. - :output_zip_directory: The (already existing) directory to which the certification package zip file is written. - """ - certificationOfConformancePdf = open(conformance_pdf_path, "rb") - clientSideData = ( - open(rp_logs_zip_path, "rb") if rp_logs_zip_path is not None else open(os.devnull, "rb") - ) - files = { - "certificationOfConformancePdf": certificationOfConformancePdf, - "clientSideData": clientSideData, - } - try: - with httpx.Client() as multipartClient: - multipartClient.headers = self.httpclient.headers.copy() - multipartClient.headers.pop("content-type") - api_url = f"{self.api_url_base}api/plan/{plan_id}/certificationpackage" + headers = self.session.headers.copy() + headers.pop("Content-Type", None) - response = multipartClient.post(api_url, files=files) - if response.status_code != 200: - raise Exception( - f"certificationpackage failed - HTTP {response.status_code:d} {response.content}" - ) - - d = response.headers["content-disposition"] - local_filename = re.findall('filename="(.+)"', d)[0] - full_path = os.path.join(output_zip_directory, local_filename) - with open(full_path, "wb") as f: - for chunk in response.iter_bytes(): - f.write(chunk) - print( - f"Certification package zip for plan id {plan_id} written to {full_path}" + url = f"{self.api_url_base}api/plan/{plan_id}/certificationpackage" + response = self.session.post(url, files=files, headers=headers) + if response.status_code != Conformance.HTTP_OK: + raise Exception( + f"certificationpackage failed - HTTP {response.status_code} {response.content}" ) - finally: - certificationOfConformancePdf.close() - clientSideData.close() + + cd = response.headers.get("content-disposition", "") + local_filename = re.findall('filename="(.+)"', cd)[0] + full_path = os.path.join(output_zip_directory, local_filename) + with open(full_path, "wb") as f: + f.write(response.content) + print(f"Certification package zip for plan id {plan_id} written to {full_path}") def create_test_plan(self, name, configuration, variant=None): - api_url = f"{self.api_url_base}api/plan" + url = f"{self.api_url_base}api/plan" payload = {"planName": name} - if variant != None: + if variant is not None: payload["variant"] = json.dumps(variant) - response = self.httpclient.post(api_url, params=payload, data=configuration) - - if response.status_code != 201: + response = self.session.post(url, params=payload, data=configuration) + if response.status_code != Conformance.HTTP_CREATED: raise Exception( - f"create_test_plan failed - HTTP {response.status_code:d} {response.content}" + f"create_test_plan failed - HTTP {response.status_code} {response.content}" ) return response.json() def create_test(self, test_name, configuration): - api_url = f"{self.api_url_base}api/runner" + url = f"{self.api_url_base}api/runner" payload = {"test": test_name} - response = self.httpclient.post(api_url, params=payload, data=configuration) - - if response.status_code != 201: - raise Exception( - f"create_test failed - HTTP {response.status_code:d} {response.content}" - ) + response = self.session.post(url, params=payload, data=configuration) + if response.status_code != Conformance.HTTP_CREATED: + raise Exception(f"create_test failed - HTTP {response.status_code} {response.content}") return response.json() - async def create_test_from_plan(self, plan_id, test_name): - api_url = f"{self.api_url_base}api/runner" + def create_test_from_plan(self, plan_id, test_name): + url = f"{self.api_url_base}api/runner" payload = {"test": test_name, "plan": plan_id} - response = self.httpclient.post(api_url, params=payload) - - if response.status_code != 201: + response = self.session.post(url, params=payload) + if response.status_code != Conformance.HTTP_CREATED: raise Exception( - f"create_test_from_plan failed - HTTP {response.status_code:d} {response.content}" + f"create_test_from_plan failed - HTTP {response.status_code} {response.content}" ) return response.json() def create_test_from_plan_with_variant(self, plan_id, test_name, variant): - api_url = f"{self.api_url_base}api/runner" + url = f"{self.api_url_base}api/runner" payload = {"test": test_name, "plan": plan_id} - if variant != None: + if variant is not None: payload["variant"] = json.dumps(variant) - response = self.httpclient.post(api_url, params=payload) - - if response.status_code != 201: + response = self.session.post(url, params=payload) + if response.status_code != Conformance.HTTP_CREATED: raise Exception( - f"create_test_from_plan failed - HTTP {response.status_code:d} {response.content}" + "create_test_from_plan_with_variant failed - " + f"HTTP {response.status_code} {response.content}" ) return response.json() def get_module_info(self, module_id): - api_url = f"{self.api_url_base}api/info/{module_id}" - response = self.httpclient.get(api_url) - - if response.status_code != 200: + url = f"{self.api_url_base}api/info/{module_id}" + response = self.session.get(url) + if response.status_code != Conformance.HTTP_OK: raise Exception( - f"get_module_info failed - HTTP {response.status_code:d} {response.content}" + f"get_module_info failed - HTTP {response.status_code} {response.content}" ) return response.json() def get_test_log(self, module_id): - api_url = f"{self.api_url_base}api/log/{module_id}" - response = self.httpclient.get(api_url) - - if response.status_code != 200: - raise Exception( - f"get_test_log failed - HTTP {response.status_code:d} {response.content}" - ) + url = f"{self.api_url_base}api/log/{module_id}" + response = self.session.get(url) + if response.status_code != Conformance.HTTP_OK: + raise Exception(f"get_test_log failed - HTTP {response.status_code} {response.content}") return response.json() def upload_image(self, log_id, placeholder, data): - api_url = f"{self.api_url_base}api/log/{log_id}/images/{placeholder}" - response = self.httpclient.post(api_url, data=data, headers={ - "Content-Type": "text/plain" - }) - - if response.status_code != 200: - raise Exception( - f"upload_image failed - HTTP {response.status_code:d} {response.content}" - ) + url = f"{self.api_url_base}api/log/{log_id}/images/{placeholder}" + response = self.session.post(url, data=data, headers={"Content-Type": "text/plain"}) + if response.status_code != Conformance.HTTP_OK: + raise Exception(f"upload_image failed - HTTP {response.status_code} {response.content}") return response.json() - async def start_test(self, module_id): - api_url = f"{self.api_url_base}api/runner/{module_id}" - response = self.httpclient.post(api_url) - - if response.status_code != 200: - raise Exception( - f"start_test failed - HTTP {response.status_code:d} {response.content}" - ) + def start_test(self, module_id): + url = f"{self.api_url_base}api/runner/{module_id}" + response = self.session.post(url) + if response.status_code != Conformance.HTTP_OK: + raise Exception(f"start_test failed - HTTP {response.status_code} {response.content}") return response.json() def wait_for_state(self, module_id, required_states, timeout=240): timeout_at = time.time() + timeout - while True: - if time.time() > timeout_at: - raise Exception( - f"Timed out waiting for test module {module_id} to be in one of states: {required_states}" - ) - + while time.time() < timeout_at: info = self.get_module_info(module_id) - - status = info["status"] - print(f"module id {module_id} status is {status}") + status = info.get("status") if status in required_states: return status if status == "INTERRUPTED": raise Exception(f"Test module {module_id} has moved to INTERRUPTED") - time.sleep(1) - - async def close_client(self): - self.httpclient.close() + raise Exception( + f"Timed out waiting for test module {module_id} " + f"to be in one of states: {required_states}" + )