144 lines
4.3 KiB
Python
Executable file
144 lines
4.3 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import argparse
|
|
import json
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
from uuid import uuid4
|
|
|
|
_SESSIONS = {}
|
|
|
|
|
|
class Window:
|
|
def __init__(self, handle, title="about:blank"):
|
|
self.handle = handle
|
|
self.title = title
|
|
|
|
def visit_url(self, url):
|
|
print("Visiting %s" % url)
|
|
# XXX todo, load the URL for real
|
|
self.url = url
|
|
|
|
|
|
class Session:
|
|
def __init__(self, uuid):
|
|
self.session_id = uuid
|
|
self.autoinc = 0
|
|
self.windows = {}
|
|
self.active_handle = self.new_window()
|
|
|
|
def visit(self, url):
|
|
self.windows[self.active_handle].visit_url(url)
|
|
|
|
def new_window(self):
|
|
w = Window(self.autoinc)
|
|
self.windows[w.handle] = w
|
|
self.autoinc += 1
|
|
return w.handle
|
|
|
|
|
|
class RequestHandler(BaseHTTPRequestHandler):
|
|
def _set_headers(self, status=200):
|
|
self.send_response(status)
|
|
self.send_header("Content-type", "application/json")
|
|
self.end_headers()
|
|
|
|
def _send_response(self, status=200, data=None):
|
|
if data is None:
|
|
data = {}
|
|
data = json.dumps(data).encode("utf8")
|
|
self._set_headers(status)
|
|
self.wfile.write(data)
|
|
|
|
def _parse_path(self):
|
|
path = self.path.lstrip("/")
|
|
sections = path.split("/")
|
|
session = None
|
|
action = []
|
|
if len(sections) > 1:
|
|
session_id = sections[1]
|
|
if session_id in _SESSIONS:
|
|
session = _SESSIONS[session_id]
|
|
action = sections[2:]
|
|
return session, action
|
|
|
|
def do_GET(self):
|
|
print("GET " + self.path)
|
|
if self.path == "/status":
|
|
return self._send_response(data={"ready": "OK"})
|
|
|
|
session, action = self._parse_path()
|
|
if action == ["window", "handles"]:
|
|
data = {"value": list(session.windows.keys())}
|
|
return self._send_response(data=data)
|
|
|
|
if action == ["moz", "context"]:
|
|
data = {"value": "chrome"}
|
|
return self._send_response(data=data)
|
|
|
|
return self._send_response(status=404)
|
|
|
|
def do_POST(self):
|
|
print("POST " + self.path)
|
|
content_length = int(self.headers["Content-Length"])
|
|
post_data = json.loads(self.rfile.read(content_length))
|
|
|
|
# new session
|
|
if self.path == "/session":
|
|
uuid = str(uuid4())
|
|
_SESSIONS[uuid] = Session(uuid)
|
|
return self._send_response(data={"sessionId": uuid})
|
|
|
|
session, action = self._parse_path()
|
|
if action == ["url"]:
|
|
session.visit(post_data["url"])
|
|
return self._send_response()
|
|
|
|
if action == ["window", "new"]:
|
|
if session is None:
|
|
return self._send_response(404)
|
|
handle = session.new_window()
|
|
return self._send_response(data={"handle": handle, "type": "tab"})
|
|
|
|
if action == ["timeouts"]:
|
|
return self._send_response()
|
|
|
|
if action == ["execute", "async"]:
|
|
return self._send_response(data={"logs": []})
|
|
|
|
# other commands not supported yet, we just return 200s
|
|
return self._send_response()
|
|
|
|
def do_DELETE(self):
|
|
return self._send_response()
|
|
session, action = self._parse_path()
|
|
if session is not None:
|
|
del _SESSIONS[session.session_id]
|
|
return self._send_response()
|
|
return self._send_response(status=404)
|
|
|
|
|
|
VERSION = """\
|
|
geckodriver 0.24.0 ( 2019-01-28)
|
|
|
|
The source code of this program is available from
|
|
testing/geckodriver in https://hg.mozilla.org/mozilla-central.
|
|
|
|
This program is subject to the terms of the Mozilla Public License 2.0.
|
|
You can obtain a copy of the license at https://mozilla.org/MPL/2.0/.\
|
|
"""
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="FakeGeckodriver")
|
|
parser.add_argument("--log", type=str, default=None)
|
|
parser.add_argument("--port", type=int, default=4444)
|
|
parser.add_argument("--marionette-port", type=int, default=2828)
|
|
parser.add_argument("--version", action="store_true", default=False)
|
|
parser.add_argument("--verbose", "-v", action="count")
|
|
args = parser.parse_args()
|
|
|
|
if args.version:
|
|
print(VERSION)
|
|
else:
|
|
HTTPServer.allow_reuse_address = True
|
|
server = HTTPServer(("127.0.0.1", args.port), RequestHandler)
|
|
server.serve_forever()
|