summaryrefslogtreecommitdiffstats
path: root/accessible/tests/browser/python_runner_wsh.py
blob: 2686967cfb7eefadd7d7dc053b0bcf97c03eefb9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""A pywebsocket3 handler which runs arbitrary Python code and returns the
result.
This is used to test OS specific accessibility APIs which can't be tested in JS.
It is intended to be called from JS browser tests.
"""

import json
import math
import os
import sys
import traceback

from mod_pywebsocket import msgutil


def web_socket_do_extra_handshake(request):
    pass


def web_socket_transfer_data(request):
    def send(*args):
        """Send a response to the client as a JSON array."""
        msgutil.send_message(request, json.dumps(args))

    cleanNamespace = {}
    testDir = None
    if sys.platform == "win32":
        testDir = "windows"
    elif sys.platform == "linux":
        testDir = "atk"
    if testDir:
        sys.path.append(
            os.path.join(
                os.getcwd(), "browser", "accessible", "tests", "browser", testDir
            )
        )
        try:
            import a11y_setup

            cleanNamespace = a11y_setup.__dict__
            setupExc = None
        except Exception:
            setupExc = traceback.format_exc()
        sys.path.pop()

    def info(message):
        """Log an info message."""
        send("info", str(message))

    cleanNamespace["info"] = info
    namespace = cleanNamespace.copy()

    # Keep handling messages until the WebSocket is closed.
    while True:
        code = msgutil.receive_message(request)
        if not code:
            return
        if code == "__reset__":
            namespace = cleanNamespace.copy()
            continue

        if setupExc:
            # a11y_setup failed. Report an exception immediately.
            send("exception", setupExc)
            continue

        # Wrap the code in a function called run(). This allows the code to
        # return a result by simply using the return statement.
        if "\n" not in code and not code.lstrip().startswith("return "):
            # Single line without return. Assume this is an expression. We use
            # a lambda to return the result.
            code = f"run = lambda: {code}"
        else:
            lines = ["def run():"]
            # Indent each line inside the function.
            lines.extend(f"  {line}" for line in code.splitlines())
            code = "\n".join(lines)
        try:
            # Execute this Python code, which will define the run() function.
            exec(code, namespace)
            # Run the function we just defined.
            ret = namespace["run"]()
            if isinstance(ret, float) and math.isnan(ret):
                # NaN can't be serialized by JSON.
                ret = None
            send("return", ret)
        except Exception:
            send("exception", traceback.format_exc())