#!/usr/bin/env python3 import argparse import json from http.server import BaseHTTPRequestHandler, HTTPServer from uuid import uuid4 _SESSIONS = {} class Window: def __init__(self, handle, title="about:blank"): self.handle = handle self.title = title def visit_url(self, url): print("Visiting %s" % url) # XXX todo, load the URL for real self.url = url class Session: def __init__(self, uuid): self.session_id = uuid self.autoinc = 0 self.windows = {} self.active_handle = self.new_window() def visit(self, url): self.windows[self.active_handle].visit_url(url) def new_window(self): w = Window(self.autoinc) self.windows[w.handle] = w self.autoinc += 1 return w.handle class RequestHandler(BaseHTTPRequestHandler): def _set_headers(self, status=200): self.send_response(status) self.send_header("Content-type", "application/json") self.end_headers() def _send_response(self, status=200, data=None): if data is None: data = {} data = json.dumps(data).encode("utf8") self._set_headers(status) self.wfile.write(data) def _parse_path(self): path = self.path.lstrip("/") sections = path.split("/") session = None action = [] if len(sections) > 1: session_id = sections[1] if session_id in _SESSIONS: session = _SESSIONS[session_id] action = sections[2:] return session, action def do_GET(self): print("GET " + self.path) if self.path == "/status": return self._send_response(data={"ready": "OK"}) session, action = self._parse_path() if action == ["window", "handles"]: data = {"value": list(session.windows.keys())} return self._send_response(data=data) if action == ["moz", "context"]: data = {"value": "chrome"} return self._send_response(data=data) return self._send_response(status=404) def do_POST(self): print("POST " + self.path) content_length = int(self.headers["Content-Length"]) post_data = json.loads(self.rfile.read(content_length)) # new session if self.path == "/session": uuid = str(uuid4()) _SESSIONS[uuid] = Session(uuid) return self._send_response(data={"sessionId": uuid}) session, action = self._parse_path() if action == ["url"]: session.visit(post_data["url"]) return self._send_response() if action == ["window", "new"]: if session is None: return self._send_response(404) handle = session.new_window() return self._send_response(data={"handle": handle, "type": "tab"}) if action == ["timeouts"]: return self._send_response() if action == ["execute", "async"]: return self._send_response(data={"logs": []}) # other commands not supported yet, we just return 200s return self._send_response() def do_DELETE(self): return self._send_response() session, action = self._parse_path() if session is not None: del _SESSIONS[session.session_id] return self._send_response() return self._send_response(status=404) VERSION = """\ geckodriver 0.24.0 ( 2019-01-28) The source code of this program is available from testing/geckodriver in https://hg.mozilla.org/mozilla-central. This program is subject to the terms of the Mozilla Public License 2.0. You can obtain a copy of the license at https://mozilla.org/MPL/2.0/.\ """ if __name__ == "__main__": parser = argparse.ArgumentParser(description="FakeGeckodriver") parser.add_argument("--log", type=str, default=None) parser.add_argument("--port", type=int, default=4444) parser.add_argument("--marionette-port", type=int, default=2828) parser.add_argument("--version", action="store_true", default=False) parser.add_argument("--verbose", "-v", action="count") args = parser.parse_args() if args.version: print(VERSION) else: HTTPServer.allow_reuse_address = True server = HTTPServer(("127.0.0.1", args.port), RequestHandler) server.serve_forever()