1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
#!/usr/bin/env python3
from __future__ import absolute_import
from uuid import uuid4
import json
from http.server import BaseHTTPRequestHandler, HTTPServer
import argparse
_SESSIONS = {}
class Window:
def __init__(self, handle, title="about:blank"):
self.handle = handle
self.title = title
def visit_url(self, url):
print("Visiting %s" % url)
# XXX todo, load the URL for real
self.url = url
class Session:
def __init__(self, uuid):
self.session_id = uuid
self.autoinc = 0
self.windows = {}
self.active_handle = self.new_window()
def visit(self, url):
self.windows[self.active_handle].visit_url(url)
def new_window(self):
w = Window(self.autoinc)
self.windows[w.handle] = w
self.autoinc += 1
return w.handle
class RequestHandler(BaseHTTPRequestHandler):
def _set_headers(self, status=200):
self.send_response(status)
self.send_header("Content-type", "application/json")
self.end_headers()
def _send_response(self, status=200, data=None):
if data is None:
data = {}
data = json.dumps(data).encode("utf8")
self._set_headers(status)
self.wfile.write(data)
def _parse_path(self):
path = self.path.lstrip("/")
sections = path.split("/")
session = None
action = []
if len(sections) > 1:
session_id = sections[1]
if session_id in _SESSIONS:
session = _SESSIONS[session_id]
action = sections[2:]
return session, action
def do_GET(self):
print("GET " + self.path)
if self.path == "/status":
return self._send_response(data={"ready": "OK"})
session, action = self._parse_path()
if action == ["window", "handles"]:
data = {"value": list(session.windows.keys())}
return self._send_response(data=data)
if action == ["moz", "context"]:
data = {"value": "chrome"}
return self._send_response(data=data)
return self._send_response(status=404)
def do_POST(self):
print("POST " + self.path)
content_length = int(self.headers["Content-Length"])
post_data = json.loads(self.rfile.read(content_length))
# new session
if self.path == "/session":
uuid = str(uuid4())
_SESSIONS[uuid] = Session(uuid)
return self._send_response(data={"sessionId": uuid})
session, action = self._parse_path()
if action == ["url"]:
session.visit(post_data["url"])
return self._send_response()
if action == ["window", "new"]:
if session is None:
return self._send_response(404)
handle = session.new_window()
return self._send_response(data={"handle": handle, "type": "tab"})
if action == ["timeouts"]:
return self._send_response()
if action == ["execute", "async"]:
return self._send_response(data={"logs": []})
# other commands not supported yet, we just return 200s
return self._send_response()
def do_DELETE(self):
return self._send_response()
session, action = self._parse_path()
if session is not None:
del _SESSIONS[session.session_id]
return self._send_response()
return self._send_response(status=404)
VERSION = """\
geckodriver 0.24.0 ( 2019-01-28)
The source code of this program is available from
testing/geckodriver in https://hg.mozilla.org/mozilla-central.
This program is subject to the terms of the Mozilla Public License 2.0.
You can obtain a copy of the license at https://mozilla.org/MPL/2.0/.\
"""
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="FakeGeckodriver")
parser.add_argument("--log", type=str, default=None)
parser.add_argument("--port", type=int, default=4444)
parser.add_argument("--marionette-port", type=int, default=2828)
parser.add_argument("--version", action="store_true", default=False)
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
if args.version:
print(VERSION)
else:
HTTPServer.allow_reuse_address = True
server = HTTPServer(("127.0.0.1", args.port), RequestHandler)
server.serve_forever()
|