diff options
Diffstat (limited to 'testing/web-platform/tests/tools/wave/network')
10 files changed, 1547 insertions, 0 deletions
diff --git a/testing/web-platform/tests/tools/wave/network/__init__.py b/testing/web-platform/tests/tools/wave/network/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/web-platform/tests/tools/wave/network/__init__.py diff --git a/testing/web-platform/tests/tools/wave/network/api/__init__.py b/testing/web-platform/tests/tools/wave/network/api/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/web-platform/tests/tools/wave/network/api/__init__.py diff --git a/testing/web-platform/tests/tools/wave/network/api/api_handler.py b/testing/web-platform/tests/tools/wave/network/api/api_handler.py new file mode 100644 index 0000000000..9c67e6c0cd --- /dev/null +++ b/testing/web-platform/tests/tools/wave/network/api/api_handler.py @@ -0,0 +1,99 @@ +# mypy: allow-untyped-defs + +import json +import sys +import traceback +import logging + +from urllib.parse import parse_qsl + +global logger +logger = logging.getLogger("wave-api-handler") + + +class ApiHandler: + def __init__(self, web_root): + self._web_root = web_root + + def set_headers(self, response, headers): + if not isinstance(response.headers, list): + response.headers = [] + for header in headers: + response.headers.append(header) + + def send_json(self, data, response, status=None): + if status is None: + status = 200 + json_string = json.dumps(data, indent=4) + response.content = json_string + self.set_headers(response, [("Content-Type", "application/json")]) + response.status = status + + def send_file(self, blob, file_name, response): + self.set_headers(response, + [("Content-Disposition", + "attachment;filename=" + file_name)]) + response.content = blob + + def send_zip(self, data, file_name, response): + response.headers = [("Content-Type", "application/x-compressed")] + self.send_file(data, file_name, response) + + def parse_uri(self, request): + path = request.url_parts.path + if self._web_root is not None: + path = path[len(self._web_root):] + + uri_parts = list(filter(None, path.split("/"))) + return uri_parts + + def parse_query_parameters(self, request): + return dict(parse_qsl(request.url_parts.query)) + + def handle_exception(self, message): + info = sys.exc_info() + traceback.print_tb(info[2]) + logger.error(f"{message}: {info[0].__name__}: {info[1].args[0]}") + + def create_hal_list(self, items, uris, index, count, total): + hal_list = {} + links = {} + if uris is not None: + for relation in uris: + if relation == "self": + continue + uri = uris[relation] + templated = "{" in uri + links[relation] = {"href": uri, "templated": templated} + + if "self" in uris: + self_uri = uris["self"] + self_uri += f"?index={index}&count={count}" + links["self"] = {"href": self_uri} + + first_uri = uris["self"] + first_uri += f"?index={0}&count={count}" + links["first"] = {"href": first_uri} + + last_uri = uris["self"] + last_uri += f"?index={total - (total % count)}&count={count}" + links["last"] = {"href": last_uri} + + if index + count <= total: + next_index = index + count + next_uri = uris["self"] + next_uri += f"?index={next_index}&count={count}" + links["next"] = {"href": next_uri} + + if index != 0: + previous_index = index - count + if previous_index < 0: + previous_index = 0 + previous_uri = uris["self"] + previous_uri += f"?index={previous_index}&count={count}" + links["previous"] = {"href": previous_uri} + + hal_list["_links"] = links + hal_list["items"] = items + + return hal_list diff --git a/testing/web-platform/tests/tools/wave/network/api/devices_api_handler.py b/testing/web-platform/tests/tools/wave/network/api/devices_api_handler.py new file mode 100644 index 0000000000..ecd9a96770 --- /dev/null +++ b/testing/web-platform/tests/tools/wave/network/api/devices_api_handler.py @@ -0,0 +1,202 @@ +# mypy: allow-untyped-defs + +import json +import threading + +from .api_handler import ApiHandler +from ...data.http_polling_event_listener import HttpPollingEventListener +from ...testing.event_dispatcher import DEVICES +from ...utils.serializer import serialize_device +from ...testing.devices_manager import DEVICE_TIMEOUT, RECONNECT_TIME +from ...data.exceptions.not_found_exception import NotFoundException + + +class DevicesApiHandler(ApiHandler): + def __init__(self, devices_manager, event_dispatcher, web_root): + super().__init__(web_root) + self._devices_manager = devices_manager + self._event_dispatcher = event_dispatcher + + def create_device(self, request, response): + try: + user_agent = request.headers[b"user-agent"].decode("utf-8") + + device = self._devices_manager.create_device(user_agent) + + self.send_json({"token": device.token}, response) + except Exception: + self.handle_exception("Failed to create device") + response.status = 500 + + def read_device(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + device = self._devices_manager.read_device(token) + + device_object = serialize_device(device) + + self.send_json(device_object, response) + except NotFoundException: + self.handle_exception("Failed to read device") + response.status = 404 + except Exception: + self.handle_exception("Failed to read device") + response.status = 500 + + def read_devices(self, request, response): + try: + devices = self._devices_manager.read_devices() + + device_objects = [] + for device in devices: + device_object = serialize_device(device) + device_objects.append(device_object) + + self.send_json(device_objects, response) + except Exception: + self.handle_exception("Failed to read devices") + response.status = 500 + + def register_event_listener(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + query = self.parse_query_parameters(request) + + if "device_token" in query: + self._devices_manager.refresh_device(query["device_token"]) + + event = threading.Event() + timer = threading.Timer( + (DEVICE_TIMEOUT - RECONNECT_TIME) / 1000, + event.set, + []) + timer.start() + http_polling_event_listener = HttpPollingEventListener(token, event) + event_listener_token = self._event_dispatcher.add_event_listener(http_polling_event_listener) + + event.wait() + + message = http_polling_event_listener.message + if message is not None: + self.send_json(data=message, response=response) + self._event_dispatcher.remove_event_listener(event_listener_token) + except Exception: + self.handle_exception("Failed to register event listener") + response.status = 500 + + def register_global_event_listener(self, request, response): + try: + query = self.parse_query_parameters(request) + + if "device_token" in query: + self._devices_manager.refresh_device(query["device_token"]) + + event = threading.Event() + timer = threading.Timer( + (DEVICE_TIMEOUT - RECONNECT_TIME) / 1000, + event.set, + []) + timer.start() + http_polling_event_listener = HttpPollingEventListener(DEVICES, event) + event_listener_token = self._event_dispatcher.add_event_listener(http_polling_event_listener) + + event.wait() + + message = http_polling_event_listener.message + if message is not None: + self.send_json(data=message, response=response) + self._event_dispatcher.remove_event_listener(event_listener_token) + except Exception: + self.handle_exception("Failed to register global event listener") + response.status = 500 + + def post_global_event(self, request, response): + try: + event = {} + body = request.body.decode("utf-8") + if body != "": + event = json.loads(body) + + query = self.parse_query_parameters(request) + if "device_token" in query: + self._devices_manager.refresh_device(query["device_token"]) + + event_type = None + if "type" in event: + event_type = event["type"] + data = None + if "data" in event: + data = event["data"] + self._devices_manager.post_global_event(event_type, data) + + except Exception: + self.handle_exception("Failed to post global event") + response.status = 500 + + def post_event(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + event = {} + body = request.body.decode("utf-8") + if body != "": + event = json.loads(body) + + query = self.parse_query_parameters(request) + if "device_token" in query: + self._devices_manager.refresh_device(query["device_token"]) + + event_type = None + if "type" in event: + event_type = event["type"] + data = None + if "data" in event: + data = event["data"] + self._devices_manager.post_event(token, event_type, data) + + except Exception: + self.handle_exception("Failed to post event") + response.status = 500 + + def handle_request(self, request, response): + method = request.method + uri_parts = self.parse_uri(request) + + # /api/devices + if len(uri_parts) == 2: + if method == "POST": + self.create_device(request, response) + return + if method == "GET": + self.read_devices(request, response) + return + + # /api/devices/<function> + if len(uri_parts) == 3: + function = uri_parts[2] + if method == "GET": + if function == "events": + self.register_global_event_listener(request, response) + return + self.read_device(request, response) + return + if method == "POST": + if function == "events": + self.post_global_event(request, response) + return + + # /api/devices/<token>/<function> + if len(uri_parts) == 4: + function = uri_parts[3] + if method == "GET": + if function == "events": + self.register_event_listener(request, response) + return + if method == "POST": + if function == "events": + self.post_event(request, response) + return diff --git a/testing/web-platform/tests/tools/wave/network/api/general_api_handler.py b/testing/web-platform/tests/tools/wave/network/api/general_api_handler.py new file mode 100644 index 0000000000..65883a9b75 --- /dev/null +++ b/testing/web-platform/tests/tools/wave/network/api/general_api_handler.py @@ -0,0 +1,76 @@ +# mypy: allow-untyped-defs + +from .api_handler import ApiHandler + +TOKEN_LENGTH = 36 + + +class GeneralApiHandler(ApiHandler): + def __init__( + self, + web_root, + read_sessions_enabled, + import_results_enabled, + reports_enabled, + version_string, + test_type_selection_enabled, + test_file_selection_enabled + ): + super().__init__(web_root) + self.read_sessions_enabled = read_sessions_enabled + self.import_results_enabled = import_results_enabled + self.reports_enabled = reports_enabled + self.version_string = version_string + self.test_type_selection_enabled = test_type_selection_enabled + self.test_file_selection_enabled = test_file_selection_enabled + + def read_status(self): + try: + return { + "format": "application/json", + "data": { + "version_string": self.version_string, + "read_sessions_enabled": self.read_sessions_enabled, + "import_results_enabled": self.import_results_enabled, + "reports_enabled": self.reports_enabled, + "test_type_selection_enabled": self.test_type_selection_enabled, + "test_file_selection_enabled": self.test_file_selection_enabled + } + } + except Exception: + self.handle_exception("Failed to read server configuration") + return {"status": 500} + + def handle_request(self, request, response): + method = request.method + uri_parts = self.parse_uri(request) + + result = None + # /api/<function> + if len(uri_parts) == 2: + function = uri_parts[1] + if method == "GET": + if function == "status": + result = self.read_status() + + if result is None: + response.status = 404 + return + + format = None + if "format" in result: + format = result["format"] + if format == "application/json": + data = None + if "data" in result: + data = result["data"] + status = 200 + if "status" in result: + status = result["status"] + self.send_json(data, response, status) + return + + status = 404 + if "status" in result: + status = result["status"] + response.status = status diff --git a/testing/web-platform/tests/tools/wave/network/api/results_api_handler.py b/testing/web-platform/tests/tools/wave/network/api/results_api_handler.py new file mode 100644 index 0000000000..a9da0df10f --- /dev/null +++ b/testing/web-platform/tests/tools/wave/network/api/results_api_handler.py @@ -0,0 +1,232 @@ +# mypy: allow-untyped-defs + +import json + +from .api_handler import ApiHandler +from ...data.exceptions.duplicate_exception import DuplicateException +from ...data.exceptions.invalid_data_exception import InvalidDataException + + +class ResultsApiHandler(ApiHandler): + def __init__(self, results_manager, session_manager, web_root): + super().__init__(web_root) + self._results_manager = results_manager + self._sessions_manager = session_manager + + def create_result(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + data = None + body = request.body.decode("utf-8") + if body != "": + data = json.loads(body) + + self._results_manager.create_result(token, data) + + except Exception: + self.handle_exception("Failed to create result") + response.status = 500 + + def read_results(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + session = self._sessions_manager.read_session(token) + if session is None: + response.status = 404 + return + + results = self._results_manager.read_results(token) + + self.send_json(response=response, data=results) + + except Exception: + self.handle_exception("Failed to read results") + response.status = 500 + + def read_results_compact(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + results = self._results_manager.read_flattened_results(token) + + self.send_json(response=response, data=results) + + except Exception: + self.handle_exception("Failed to read compact results") + response.status = 500 + + def read_results_api_wpt_report_url(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + api = uri_parts[3] + + uri = self._results_manager.read_results_wpt_report_uri(token, api) + self.send_json({"uri": uri}, response) + except Exception: + self.handle_exception("Failed to read results report url") + response.status = 500 + + def read_results_api_wpt_multi_report_uri(self, request, response): + try: + uri_parts = self.parse_uri(request) + api = uri_parts[2] + query = self.parse_query_parameters(request) + tokens = query["tokens"].split(",") + uri = self._results_manager.read_results_wpt_multi_report_uri( + tokens, + api + ) + self.send_json({"uri": uri}, response) + except Exception: + self.handle_exception("Failed to read results multi report url") + response.status = 500 + + def download_results_api_json(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + api = uri_parts[3] + blob = self._results_manager.export_results_api_json(token, api) + if blob is None: + response.status = 404 + return + file_path = self._results_manager.get_json_path(token, api) + file_name = "{}-{}-{}".format( + token.split("-")[0], + api, + file_path.split("/")[-1] + ) + self.send_zip(blob, file_name, response) + except Exception: + self.handle_exception("Failed to download api json") + response.status = 500 + + def import_results_api_json(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + api = uri_parts[3] + blob = request.body + + self._results_manager.import_results_api_json(token, api, blob) + + response.status = 200 + except Exception: + self.handle_exception("Failed to upload api json") + response.status = 500 + + def download_results_all_api_jsons(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + blob = self._results_manager.export_results_all_api_jsons(token) + file_name = token.split("-")[0] + "_results_json.zip" + self.send_zip(blob, file_name, response) + except Exception: + self.handle_exception("Failed to download all api jsons") + response.status = 500 + + def download_results(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + blob = self._results_manager.export_results(token) + if blob is None: + response.status = 404 + return + file_name = token + ".zip" + self.send_zip(blob, file_name, response) + except Exception: + self.handle_exception("Failed to download results") + response.status = 500 + + def download_results_overview(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + blob = self._results_manager.export_results_overview(token) + if blob is None: + response.status = 404 + return + file_name = token.split("-")[0] + "_results_html.zip" + self.send_zip(blob, file_name, response) + except Exception: + self.handle_exception("Failed to download results overview") + response.status = 500 + + def import_results(self, request, response): + try: + blob = request.body + token = self._results_manager.import_results(blob) + self.send_json({"token": token}, response) + except DuplicateException: + self.handle_exception("Failed to import results") + self.send_json({"error": "Session already exists!"}, response, 400) + return + except InvalidDataException: + self.handle_exception("Failed to import results") + self.send_json({"error": "Invalid input data!"}, response, 400) + return + except Exception: + self.handle_exception("Failed to import results") + response.status = 500 + + def handle_request(self, request, response): + method = request.method + uri_parts = self.parse_uri(request) + + # /api/results/<token> + if len(uri_parts) == 3: + if method == "POST": + if uri_parts[2] == "import": + self.import_results(request, response) + return + self.create_result(request, response) + return + + if method == "GET": + self.read_results(request, response) + return + + # /api/results/<token>/<function> + if len(uri_parts) == 4: + function = uri_parts[3] + if method == "GET": + if function == "compact": + self.read_results_compact(request, response) + return + if function == "reporturl": + return self.read_results_api_wpt_multi_report_uri(request, + response) + if function == "json": + self.download_results_all_api_jsons(request, response) + return + if function == "export": + self.download_results(request, response) + return + if function == "overview": + self.download_results_overview(request, response) + return + + # /api/results/<token>/<api>/<function> + if len(uri_parts) == 5: + function = uri_parts[4] + if method == "GET": + if function == "reporturl": + self.read_results_api_wpt_report_url(request, response) + return + if function == "json": + self.download_results_api_json(request, response) + return + if method == "POST": + if function == "json": + self.import_results_api_json(request, response) + return + + response.status = 404 diff --git a/testing/web-platform/tests/tools/wave/network/api/sessions_api_handler.py b/testing/web-platform/tests/tools/wave/network/api/sessions_api_handler.py new file mode 100644 index 0000000000..9eb896b807 --- /dev/null +++ b/testing/web-platform/tests/tools/wave/network/api/sessions_api_handler.py @@ -0,0 +1,458 @@ +# mypy: allow-untyped-defs + +import json +import threading + +from .api_handler import ApiHandler + +from ...utils.serializer import serialize_session +from ...data.exceptions.not_found_exception import NotFoundException +from ...data.exceptions.invalid_data_exception import InvalidDataException +from ...data.http_polling_event_listener import HttpPollingEventListener + +TOKEN_LENGTH = 36 + + +class SessionsApiHandler(ApiHandler): + def __init__( + self, + sessions_manager, + results_manager, + event_dispatcher, + web_root, + read_sessions_enabled + ): + super().__init__(web_root) + self._sessions_manager = sessions_manager + self._results_manager = results_manager + self._event_dispatcher = event_dispatcher + self._read_sessions_enabled = read_sessions_enabled + + def create_session(self, body, headers): + try: + config = {} + body = body.decode("utf-8") + if body != "": + config = json.loads(body) + tests = {} + if "tests" in config: + tests = config["tests"] + test_types = None + if "types" in config: + test_types = config["types"] + timeouts = {} + if "timeouts" in config: + timeouts = config["timeouts"] + reference_tokens = [] + if "reference_tokens" in config: + reference_tokens = config["reference_tokens"] + user_agent = headers[b"user-agent"].decode("utf-8") + labels = [] + if "labels" in config: + labels = config["labels"] + expiration_date = None + if "expiration_date" in config: + expiration_date = config["expiration_date"] + type = None + if "type" in config: + type = config["type"] + + session = self._sessions_manager.create_session( + tests, + test_types, + timeouts, + reference_tokens, + user_agent, + labels, + expiration_date, + type + ) + + return { + "format": "application/json", + "data": {"token": session.token} + } + + except InvalidDataException: + self.handle_exception("Failed to create session") + return { + "format": "application/json", + "data": {"error": "Invalid input data!"}, + "status": 400 + } + + except Exception: + self.handle_exception("Failed to create session") + return {"status": 500} + + def read_session(self, token): + try: + + session = self._sessions_manager.read_session(token) + if session is None: + return {"status": 404} + + data = serialize_session(session) + + return { + "format": "application/json", + "data": { + "token": data["token"], + "tests": data["tests"], + "types": data["types"], + "timeouts": data["timeouts"], + "reference_tokens": data["reference_tokens"], + "user_agent": data["user_agent"], + "browser": data["browser"], + "is_public": data["is_public"], + "date_created": data["date_created"], + "labels": data["labels"] + } + } + except Exception: + self.handle_exception("Failed to read session") + return {"status": 500} + + def read_sessions(self, query_parameters, uri_path): + try: + index = 0 + if "index" in query_parameters: + index = int(query_parameters["index"]) + count = 10 + if "count" in query_parameters: + count = int(query_parameters["count"]) + expand = [] + if "expand" in query_parameters: + expand = query_parameters["expand"].split(",") + + session_tokens = self._sessions_manager.read_sessions(index=index, count=count) + total_sessions = self._sessions_manager.get_total_sessions() + + embedded = {} + + for relation in expand: + if relation == "configuration": + configurations = [] + for token in session_tokens: + result = self.read_session(token) + if "status" in result and result["status"] != 200: + continue + configurations.append(result["data"]) + embedded["configuration"] = configurations + + if relation == "status": + statuses = [] + for token in session_tokens: + result = self.read_session_status(token) + if "status" in result and result["status"] != 200: + continue + statuses.append(result["data"]) + embedded["status"] = statuses + + uris = { + "self": uri_path, + "configuration": self._web_root + "api/sessions/{token}", + "status": self._web_root + "api/sessions/{token}/status" + } + + data = self.create_hal_list(session_tokens, uris, index, count, total=total_sessions) + + if len(embedded) > 0: + data["_embedded"] = embedded + + return { + "format": "application/json", + "data": data + } + except Exception: + self.handle_exception("Failed to read session") + return {"status": 500} + + def read_session_status(self, token): + try: + session = self._sessions_manager.read_session_status(token) + if session is None: + return {"status": 404} + + data = serialize_session(session) + + return { + "format": "application/json", + "data": { + "token": data["token"], + "status": data["status"], + "date_started": data["date_started"], + "date_finished": data["date_finished"], + "expiration_date": data["expiration_date"] + } + } + except Exception: + self.handle_exception("Failed to read session status") + return {"status": 500} + + def read_public_sessions(self, request, response): + try: + session_tokens = self._sessions_manager.read_public_sessions() + + self.send_json(session_tokens, response) + except Exception: + self.handle_exception("Failed to read public sessions") + response.status = 500 + + def update_session_configuration(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + config = {} + body = request.body.decode("utf-8") + if body != "": + config = json.loads(body) + + tests = {} + if "tests" in config: + tests = config["tests"] + test_types = None + if "types" in config: + test_types = config["types"] + timeouts = {} + if "timeouts" in config: + timeouts = config["timeouts"] + reference_tokens = [] + if "reference_tokens" in config: + reference_tokens = config["reference_tokens"] + type = None + if "type" in config: + type = config["type"] + + self._sessions_manager.update_session_configuration( + token, + tests, + test_types, + timeouts, + reference_tokens, + type + ) + except NotFoundException: + self.handle_exception("Failed to update session configuration") + response.status = 404 + except Exception: + self.handle_exception("Failed to update session configuration") + response.status = 500 + + def update_labels(self, request, response): + try: + uri_parts = self.parse_uri(request) + # convert unicode to ascii to get a text type, ignore special chars + token = uri_parts[2] + body = request.body.decode("utf-8") + labels = None + if body != "": + labels = json.loads(body) + if "labels" in labels: + labels = labels["labels"] + + self._sessions_manager.update_labels(token=token, labels=labels) + except Exception: + self.handle_exception("Failed to update labels") + response.status = 500 + + def delete_session(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + session = self._sessions_manager.read_session(token) + if session is None: + response.status = 404 + return + + self._sessions_manager.delete_session(token) + self._results_manager.delete_results(token) + except Exception: + self.handle_exception("Failed to delete session") + response.status = 500 + + def start_session(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + self._sessions_manager.start_session(token) + except Exception: + self.handle_exception("Failed to start session") + response.status = 500 + + def pause_session(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + self._sessions_manager.pause_session(token) + except Exception: + self.handle_exception("Failed to pause session") + response.status = 500 + + def stop_session(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + self._sessions_manager.stop_session(token) + except Exception: + self.handle_exception("Failed to stop session") + response.status = 500 + + def resume_session(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + resume_token = None + body = request.body.decode("utf-8") + if body != "": + resume_token = json.loads(body)["resume_token"] + + self._sessions_manager.resume_session(token, resume_token) + except Exception: + self.handle_exception("Failed to resume session") + response.status = 500 + + def find_session(self, request, response): + try: + uri_parts = self.parse_uri(request) + fragment = uri_parts[2] + token = self._sessions_manager.find_token(fragment) + if token is None: + response.status = 404 + return + self.send_json({"token": token}, response) + except Exception: + self.handle_exception("Failed to find session") + response.status = 500 + + def register_event_listener(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + query_parameters = self.parse_query_parameters(request) + last_event_number = None + if ("last_event" in query_parameters): + last_event_number = int(query_parameters["last_event"]) + + event = threading.Event() + http_polling_event_listener = HttpPollingEventListener(token, event) + event_listener_token = self._event_dispatcher.add_event_listener(http_polling_event_listener, last_event_number) + + event.wait() + + message = http_polling_event_listener.message + self.send_json(data=message, response=response) + self._event_dispatcher.remove_event_listener(event_listener_token) + except Exception: + self.handle_exception("Failed to register event listener") + response.status = 500 + + def push_event(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + message = None + body = request.body.decode("utf-8") + if body != "": + message = json.loads(body) + + self._event_dispatcher.dispatch_event( + token, + message["type"], + message["data"]) + except Exception: + self.handle_exception("Failed to push session event") + + def handle_request(self, request, response): + method = request.method + uri_parts = self.parse_uri(request) + body = request.body + headers = request.headers + query_parameters = self.parse_query_parameters(request) + uri_path = request.url_parts.path + + result = None + # /api/sessions + if len(uri_parts) == 2: + if method == "POST": + result = self.create_session(body, headers) + if method == "GET": + if self._read_sessions_enabled: + result = self.read_sessions(query_parameters, uri_path) + + # /api/sessions/<token> + if len(uri_parts) == 3: + function = uri_parts[2] + if method == "GET": + if function == "public": + self.read_public_sessions(request, response) + return + if len(function) != TOKEN_LENGTH: + self.find_session(request, response) + return + result = self.read_session(token=uri_parts[2]) + if method == "PUT": + self.update_session_configuration(request, response) + return + if method == "DELETE": + self.delete_session(request, response) + return + + # /api/sessions/<token>/<function> + if len(uri_parts) == 4: + function = uri_parts[3] + if method == "GET": + if function == "status": + result = self.read_session_status(token=uri_parts[2]) + if function == "events": + self.register_event_listener(request, response) + return + if method == "POST": + if function == "start": + self.start_session(request, response) + return + if function == "pause": + self.pause_session(request, response) + return + if function == "stop": + self.stop_session(request, response) + return + if function == "resume": + self.resume_session(request, response) + return + if function == "events": + self.push_event(request, response) + return + if method == "PUT": + if function == "labels": + self.update_labels(request, response) + return + + if result is None: + response.status = 404 + return + + format = None + if "format" in result: + format = result["format"] + if format == "application/json": + data = None + if "data" in result: + data = result["data"] + status = 200 + if "status" in result: + status = result["status"] + self.send_json(data, response, status) + return + + status = 404 + if "status" in result: + status = result["status"] + response.status = status diff --git a/testing/web-platform/tests/tools/wave/network/api/tests_api_handler.py b/testing/web-platform/tests/tools/wave/network/api/tests_api_handler.py new file mode 100644 index 0000000000..3803583771 --- /dev/null +++ b/testing/web-platform/tests/tools/wave/network/api/tests_api_handler.py @@ -0,0 +1,298 @@ +# mypy: allow-untyped-defs + +import json + +from urllib.parse import urlunsplit + +from .api_handler import ApiHandler +from ...utils.serializer import serialize_session +from ...data.session import PAUSED, COMPLETED, ABORTED, PENDING, RUNNING + +DEFAULT_LAST_COMPLETED_TESTS_COUNT = 5 +DEFAULT_LAST_COMPLETED_TESTS_STATUS = ["ALL"] + +EXECUTION_MODE_AUTO = "auto" +EXECUTION_MODE_MANUAL = "manual" +EXECUTION_MODE_PROGRAMMATIC = "programmatic" + + +class TestsApiHandler(ApiHandler): + def __init__( + self, + wpt_port, + wpt_ssl_port, + tests_manager, + sessions_manager, + hostname, + web_root, + test_loader + ): + super().__init__(web_root) + self._tests_manager = tests_manager + self._sessions_manager = sessions_manager + self._wpt_port = wpt_port + self._wpt_ssl_port = wpt_ssl_port + self._hostname = hostname + self._web_root = web_root + self._test_loader = test_loader + + def read_tests(self, response): + tests = self._tests_manager.read_tests() + self.send_json(tests, response) + + def read_session_tests(self, request, response): + uri_parts = self.parse_uri(request) + token = uri_parts[2] + session = self._sessions_manager.read_session(token) + + if session is None: + response.status = 404 + return + + data = serialize_session(session) + tests = { + "token": token, + "pending_tests": data["pending_tests"], + "running_tests": data["running_tests"] + } + self.send_json(tests, response) + + def read_next_test(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + hostname = self._hostname + + session = self._sessions_manager.read_session(token) + if session is None: + response.status = 404 + return + + if session.status == PAUSED: + url = self._generate_wave_url( + hostname=hostname, + uri="pause.html", + token=token + ) + self.send_json({"next_test": url}, response) + return + if session.status == COMPLETED or session.status == ABORTED: + url = self._generate_wave_url( + hostname=hostname, + uri="finish.html", + token=token + ) + self.send_json({"next_test": url}, response) + return + if session.status == PENDING: + url = self._generate_wave_url( + hostname=hostname, + uri="newsession.html", + token=token + ) + self.send_json({"next_test": url}, response) + return + + test = self._tests_manager.next_test(session) + + if test is None: + if session.status != RUNNING: + return + url = self._generate_wave_url( + hostname=hostname, + uri="finish.html", + token=token + ) + self.send_json({"next_test": url}, response) + self._sessions_manager.complete_session(token) + return + + test_timeout = self._tests_manager.get_test_timeout( + test=test, session=session) + + test = self._sessions_manager.get_test_path_with_query(test, session) + url = self._generate_test_url( + test=test, + token=token, + test_timeout=test_timeout, + hostname=hostname) + + self.send_json({ + "next_test": url + }, response) + except Exception: + self.handle_exception("Failed to read next test") + response.status = 500 + + def read_last_completed(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + query = self.parse_query_parameters(request) + count = None + if "count" in query: + count = query["count"] + else: + count = DEFAULT_LAST_COMPLETED_TESTS_COUNT + + status = None + if "status" in query: + status = query["status"].split(",") + else: + status = DEFAULT_LAST_COMPLETED_TESTS_STATUS + + completed_tests = self._tests_manager.read_last_completed_tests( + token, count) + tests = {} + for one_status in status: + one_status = one_status.lower() + if one_status == "pass": + tests["pass"] = completed_tests["pass"] + continue + if one_status == "fail": + tests["fail"] = completed_tests["fail"] + continue + if one_status == "timeout": + tests["timeout"] = completed_tests["timeout"] + continue + if one_status == "all": + tests["pass"] = completed_tests["pass"] + tests["fail"] = completed_tests["fail"] + tests["timeout"] = completed_tests["timeout"] + break + self.send_json(data=tests, response=response) + except Exception: + self.handle_exception("Failed to read last completed tests") + response.status = 500 + + def read_malfunctioning(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + tm = self._tests_manager + malfunctioning_tests = tm.read_malfunctioning_tests(token) + + self.send_json(data=malfunctioning_tests, response=response) + except Exception: + self.handle_exception("Failed to read malfunctioning tests") + response.status = 500 + + def update_malfunctioning(self, request, response): + try: + uri_parts = self.parse_uri(request) + token = uri_parts[2] + + data = None + body = request.body.decode("utf-8") + if body != "": + data = json.loads(body) + + self._tests_manager.update_malfunctioning_tests(token, data) + except Exception: + self.handle_exception("Failed to update malfunctioning tests") + response.status = 500 + + def read_available_apis(self, request, response): + try: + apis = self._test_loader.get_apis() + self.send_json(apis, response) + except Exception: + self.handle_exception("Failed to read available APIs") + response.status = 500 + + def handle_request(self, request, response): + method = request.method + uri_parts = self.parse_uri(request) + + # /api/tests + if len(uri_parts) == 2: + if method == "GET": + self.read_tests(response) + return + + # /api/tests/<token> + if len(uri_parts) == 3: + if method == "GET": + if uri_parts[2] == "apis": + self.read_available_apis(request, response) + return + self.read_session_tests(request, response) + return + + # /api/tests/<token>/<function> + if len(uri_parts) == 4: + function = uri_parts[3] + if method == "GET": + if function == "next": + self.read_next_test(request, response) + return + if function == "last_completed": + self.read_last_completed(request, response) + return + if function == "malfunctioning": + self.read_malfunctioning(request, response) + return + if method == "PUT": + if function == "malfunctioning": + self.update_malfunctioning(request, response) + return + + response.status = 404 + + def _generate_wave_url(self, hostname, uri, token): + if self._web_root is not None: + uri = self._web_root + uri + + return self._generate_url( + hostname=hostname, + uri=uri, + port=self._wpt_port, + query="token=" + token + ) + + def _generate_test_url(self, hostname, test, token, test_timeout): + protocol = "http" + port = self._wpt_port + + if "https" in test: + protocol = "https" + port = self._wpt_ssl_port + + test_query = "" + split = test.split("?") + if len(split) > 1: + test = split[0] + test_query = split[1] + + query = "token={}&timeout={}&https_port={}&web_root={}&{}".format( + token, + test_timeout, + self._wpt_ssl_port, + self._web_root, + test_query + ) + + return self._generate_url( + protocol=protocol, + hostname=hostname, + port=port, + uri=test, + query=query + ) + + def _generate_url(self, + hostname, + port=None, + uri=None, + query=None, + protocol=None): + if port is None: + port = 80 + if uri is None: + uri = "/" + if query is None: + query = "" + if protocol is None: + protocol = "http" + return urlunsplit([protocol, f"{hostname}:{port}", uri, query, '']) diff --git a/testing/web-platform/tests/tools/wave/network/http_handler.py b/testing/web-platform/tests/tools/wave/network/http_handler.py new file mode 100644 index 0000000000..b76f711cf1 --- /dev/null +++ b/testing/web-platform/tests/tools/wave/network/http_handler.py @@ -0,0 +1,122 @@ +# mypy: allow-untyped-defs + +import http.client as httplib +import sys +import logging +import traceback + + +global logger +logger = logging.getLogger("wave-api-handler") + +class HttpHandler: + def __init__( + self, + static_handler, + sessions_api_handler, + tests_api_handler, + results_api_handler, + devices_api_handler, + general_api_handler, + http_port, + web_root + ): + self.static_handler = static_handler + self.sessions_api_handler = sessions_api_handler + self.tests_api_handler = tests_api_handler + self.results_api_handler = results_api_handler + self.general_api_handler = general_api_handler + self.devices_api_handler = devices_api_handler + self._http_port = http_port + self._web_root = web_root + + def handle_request(self, request, response): + response.headers = [ + ("Access-Control-Allow-Origin", "*"), + ("Access-Control-Allow-Headers", "*"), + ("Access-Control-Allow-Methods", "*") + ] + if request.method == "OPTIONS": + return + + path = self._remove_web_root(request.request_path) + + is_api_call = False + for index, part in enumerate(path.split("/")): + if index > 2: + break + if part != "api": + continue + + is_api_call = True + + if (is_api_call): + if request.url_parts.scheme == "https": + self._proxy(request, response) + return + self.handle_api(request, response) + else: + self.handle_static_file(request, response) + + def handle_api(self, request, response): + path = self._remove_web_root(request.request_path) + path = path.split("?")[0] + api_name = path.split("/")[1] + + if api_name is None: + return + + if api_name == "sessions": + self.sessions_api_handler.handle_request(request, response) + return + if api_name == "tests": + self.tests_api_handler.handle_request(request, response) + return + if api_name == "results": + self.results_api_handler.handle_request(request, response) + return + if api_name == "devices": + self.devices_api_handler.handle_request(request, response) + return + + self.general_api_handler.handle_request(request, response) + + def handle_static_file(self, request, response): + self.static_handler.handle_request(request, response) + + def _remove_web_root(self, path): + if self._web_root is not None: + path = path[len(self._web_root):] + return path + + + def _proxy(self, request, response): + host = 'localhost' + port = int(self._http_port) + uri = request.url_parts.path + uri = uri + "?" + request.url_parts.query + content_length = request.headers.get('Content-Length') + data = "" + if content_length is not None: + data = request.raw_input.read(int(content_length)) + method = request.method + + headers = {} + for key in request.headers: + value = request.headers[key] + headers[key.decode("utf-8")] = value.decode("utf-8") + + try: + proxy_connection = httplib.HTTPConnection(host, port) + proxy_connection.request(method, uri, data, headers) + proxy_response = proxy_connection.getresponse() + response.content = proxy_response.read() + response.headers = proxy_response.getheaders() + response.status = proxy_response.status + + except OSError: + message = "Failed to perform proxy request" + info = sys.exc_info() + traceback.print_tb(info[2]) + logger.error(f"{message}: {info[0].__name__}: {info[1].args[0]}") + response.status = 500 diff --git a/testing/web-platform/tests/tools/wave/network/static_handler.py b/testing/web-platform/tests/tools/wave/network/static_handler.py new file mode 100644 index 0000000000..230af8da1a --- /dev/null +++ b/testing/web-platform/tests/tools/wave/network/static_handler.py @@ -0,0 +1,60 @@ +# mypy: allow-untyped-defs + +import os + + +class StaticHandler: + def __init__(self, web_root, http_port, https_port): + self.static_dir = os.path.join( + os.getcwd(), "tools/wave/www") + self._web_root = web_root + self._http_port = http_port + self._https_port = https_port + + def handle_request(self, request, response): + file_path = request.request_path + + if self._web_root is not None: + if not file_path.startswith(self._web_root): + response.status = 404 + return + file_path = file_path[len(self._web_root):] + + if file_path == "": + file_path = "index.html" + + file_path = file_path.split("?")[0] + file_path = os.path.join(self.static_dir, file_path) + + if not os.path.exists(file_path): + response.status = 404 + return + + headers = [] + + content_types = { + "html": "text/html", + "js": "text/javascript", + "css": "text/css", + "jpg": "image/jpeg", + "jpeg": "image/jpeg", + "ttf": "font/ttf", + "woff": "font/woff", + "woff2": "font/woff2" + } + + headers.append( + ("Content-Type", content_types[file_path.split(".")[-1]])) + + data = None + with open(file_path, "rb") as file: + data = file.read() + + if file_path.split("/")[-1] == "wave-service.js": + data = data.decode("UTF-8") + data = data.replace("{{WEB_ROOT}}", str(self._web_root)) + data = data.replace("{{HTTP_PORT}}", str(self._http_port)) + data = data.replace("{{HTTPS_PORT}}", str(self._https_port)) + + response.content = data + response.headers = headers |