diff options
Diffstat (limited to 'testing/condprofile/condprof/tests/fakegeckodriver.py')
-rwxr-xr-x | testing/condprofile/condprof/tests/fakegeckodriver.py | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/testing/condprofile/condprof/tests/fakegeckodriver.py b/testing/condprofile/condprof/tests/fakegeckodriver.py new file mode 100755 index 0000000000..ab40401aaf --- /dev/null +++ b/testing/condprofile/condprof/tests/fakegeckodriver.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +import argparse +import json +from http.server import BaseHTTPRequestHandler, HTTPServer +from uuid import uuid4 + +_SESSIONS = {} + + +class Window: + def __init__(self, handle, title="about:blank"): + self.handle = handle + self.title = title + + def visit_url(self, url): + print("Visiting %s" % url) + # XXX todo, load the URL for real + self.url = url + + +class Session: + def __init__(self, uuid): + self.session_id = uuid + self.autoinc = 0 + self.windows = {} + self.active_handle = self.new_window() + + def visit(self, url): + self.windows[self.active_handle].visit_url(url) + + def new_window(self): + w = Window(self.autoinc) + self.windows[w.handle] = w + self.autoinc += 1 + return w.handle + + +class RequestHandler(BaseHTTPRequestHandler): + def _set_headers(self, status=200): + self.send_response(status) + self.send_header("Content-type", "application/json") + self.end_headers() + + def _send_response(self, status=200, data=None): + if data is None: + data = {} + data = json.dumps(data).encode("utf8") + self._set_headers(status) + self.wfile.write(data) + + def _parse_path(self): + path = self.path.lstrip("/") + sections = path.split("/") + session = None + action = [] + if len(sections) > 1: + session_id = sections[1] + if session_id in _SESSIONS: + session = _SESSIONS[session_id] + action = sections[2:] + return session, action + + def do_GET(self): + print("GET " + self.path) + if self.path == "/status": + return self._send_response(data={"ready": "OK"}) + + session, action = self._parse_path() + if action == ["window", "handles"]: + data = {"value": list(session.windows.keys())} + return self._send_response(data=data) + + if action == ["moz", "context"]: + data = {"value": "chrome"} + return self._send_response(data=data) + + return self._send_response(status=404) + + def do_POST(self): + print("POST " + self.path) + content_length = int(self.headers["Content-Length"]) + post_data = json.loads(self.rfile.read(content_length)) + + # new session + if self.path == "/session": + uuid = str(uuid4()) + _SESSIONS[uuid] = Session(uuid) + return self._send_response(data={"sessionId": uuid}) + + session, action = self._parse_path() + if action == ["url"]: + session.visit(post_data["url"]) + return self._send_response() + + if action == ["window", "new"]: + if session is None: + return self._send_response(404) + handle = session.new_window() + return self._send_response(data={"handle": handle, "type": "tab"}) + + if action == ["timeouts"]: + return self._send_response() + + if action == ["execute", "async"]: + return self._send_response(data={"logs": []}) + + # other commands not supported yet, we just return 200s + return self._send_response() + + def do_DELETE(self): + return self._send_response() + session, action = self._parse_path() + if session is not None: + del _SESSIONS[session.session_id] + return self._send_response() + return self._send_response(status=404) + + +VERSION = """\ +geckodriver 0.24.0 ( 2019-01-28) + +The source code of this program is available from +testing/geckodriver in https://hg.mozilla.org/mozilla-central. + +This program is subject to the terms of the Mozilla Public License 2.0. +You can obtain a copy of the license at https://mozilla.org/MPL/2.0/.\ +""" + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="FakeGeckodriver") + parser.add_argument("--log", type=str, default=None) + parser.add_argument("--port", type=int, default=4444) + parser.add_argument("--marionette-port", type=int, default=2828) + parser.add_argument("--version", action="store_true", default=False) + parser.add_argument("--verbose", "-v", action="count") + args = parser.parse_args() + + if args.version: + print(VERSION) + else: + HTTPServer.allow_reuse_address = True + server = HTTPServer(("127.0.0.1", args.port), RequestHandler) + server.serve_forever() |