summaryrefslogtreecommitdiffstats
path: root/testing/condprofile/condprof/tests/fakegeckodriver.py
blob: ab40401aafc18c97dc921da40d5f95486d953438 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python3
import argparse
import json
from http.server import BaseHTTPRequestHandler, HTTPServer
from uuid import uuid4

_SESSIONS = {}


class Window:
    def __init__(self, handle, title="about:blank"):
        self.handle = handle
        self.title = title

    def visit_url(self, url):
        print("Visiting %s" % url)
        # XXX todo, load the URL for real
        self.url = url


class Session:
    def __init__(self, uuid):
        self.session_id = uuid
        self.autoinc = 0
        self.windows = {}
        self.active_handle = self.new_window()

    def visit(self, url):
        self.windows[self.active_handle].visit_url(url)

    def new_window(self):
        w = Window(self.autoinc)
        self.windows[w.handle] = w
        self.autoinc += 1
        return w.handle


class RequestHandler(BaseHTTPRequestHandler):
    def _set_headers(self, status=200):
        self.send_response(status)
        self.send_header("Content-type", "application/json")
        self.end_headers()

    def _send_response(self, status=200, data=None):
        if data is None:
            data = {}
        data = json.dumps(data).encode("utf8")
        self._set_headers(status)
        self.wfile.write(data)

    def _parse_path(self):
        path = self.path.lstrip("/")
        sections = path.split("/")
        session = None
        action = []
        if len(sections) > 1:
            session_id = sections[1]
            if session_id in _SESSIONS:
                session = _SESSIONS[session_id]
                action = sections[2:]
        return session, action

    def do_GET(self):
        print("GET " + self.path)
        if self.path == "/status":
            return self._send_response(data={"ready": "OK"})

        session, action = self._parse_path()
        if action == ["window", "handles"]:
            data = {"value": list(session.windows.keys())}
            return self._send_response(data=data)

        if action == ["moz", "context"]:
            data = {"value": "chrome"}
            return self._send_response(data=data)

        return self._send_response(status=404)

    def do_POST(self):
        print("POST " + self.path)
        content_length = int(self.headers["Content-Length"])
        post_data = json.loads(self.rfile.read(content_length))

        # new session
        if self.path == "/session":
            uuid = str(uuid4())
            _SESSIONS[uuid] = Session(uuid)
            return self._send_response(data={"sessionId": uuid})

        session, action = self._parse_path()
        if action == ["url"]:
            session.visit(post_data["url"])
            return self._send_response()

        if action == ["window", "new"]:
            if session is None:
                return self._send_response(404)
            handle = session.new_window()
            return self._send_response(data={"handle": handle, "type": "tab"})

        if action == ["timeouts"]:
            return self._send_response()

        if action == ["execute", "async"]:
            return self._send_response(data={"logs": []})

        # other commands not supported yet, we just return 200s
        return self._send_response()

    def do_DELETE(self):
        return self._send_response()
        session, action = self._parse_path()
        if session is not None:
            del _SESSIONS[session.session_id]
            return self._send_response()
        return self._send_response(status=404)


VERSION = """\
geckodriver 0.24.0 ( 2019-01-28)

The source code of this program is available from
testing/geckodriver in https://hg.mozilla.org/mozilla-central.

This program is subject to the terms of the Mozilla Public License 2.0.
You can obtain a copy of the license at https://mozilla.org/MPL/2.0/.\
"""


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="FakeGeckodriver")
    parser.add_argument("--log", type=str, default=None)
    parser.add_argument("--port", type=int, default=4444)
    parser.add_argument("--marionette-port", type=int, default=2828)
    parser.add_argument("--version", action="store_true", default=False)
    parser.add_argument("--verbose", "-v", action="count")
    args = parser.parse_args()

    if args.version:
        print(VERSION)
    else:
        HTTPServer.allow_reuse_address = True
        server = HTTPServer(("127.0.0.1", args.port), RequestHandler)
        server.serve_forever()