summaryrefslogtreecommitdiffstats
path: root/toolkit/components/telemetry/tests/marionette/harness/telemetry_harness/testcase.py
blob: d30fd67986532c77fbe51e572c65566ea1a9fc0f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import contextlib
import os
import re
import textwrap

from marionette_driver.addons import Addons
from marionette_driver.errors import MarionetteException
from marionette_driver.wait import Wait
from marionette_harness import MarionetteTestCase
from marionette_harness.runner.mixins.window_manager import WindowManagerMixin

from telemetry_harness.ping_server import PingServer

CANARY_CLIENT_ID = "c0ffeec0-ffee-c0ff-eec0-ffeec0ffeec0"
UUID_PATTERN = re.compile(
    r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
)


class TelemetryTestCase(WindowManagerMixin, MarionetteTestCase):
    def __init__(self, *args, **kwargs):
        """Initialize the test case and create a ping server."""
        super(TelemetryTestCase, self).__init__(*args, **kwargs)

    def setUp(self, *args, **kwargs):
        """Set up the test case and start the ping server."""

        self.ping_server = PingServer(
            self.testvars["server_root"], self.testvars["server_url"]
        )
        self.ping_server.start()

        super(TelemetryTestCase, self).setUp(*args, **kwargs)

        # Store IDs of addons installed via self.install_addon()
        self.addon_ids = []

        with self.marionette.using_context(self.marionette.CONTEXT_CONTENT):
            self.marionette.navigate("about:about")

    def disable_telemetry(self):
        """Disable the Firefox Data Collection and Use in the current browser."""
        self.marionette.instance.profile.set_persistent_preferences(
            {"datareporting.healthreport.uploadEnabled": False}
        )
        self.marionette.set_pref("datareporting.healthreport.uploadEnabled", False)

    def enable_telemetry(self):
        """Enable the Firefox Data Collection and Use in the current browser."""
        self.marionette.instance.profile.set_persistent_preferences(
            {"datareporting.healthreport.uploadEnabled": True}
        )
        self.marionette.set_pref("datareporting.healthreport.uploadEnabled", True)

    @contextlib.contextmanager
    def new_tab(self):
        """Perform operations in a new tab and then close the new tab."""

        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            start_tab = self.marionette.current_window_handle
            new_tab = self.open_tab(focus=True)
            self.marionette.switch_to_window(new_tab)

            yield

            self.marionette.close()
            self.marionette.switch_to_window(start_tab)

    def navigate_in_new_tab(self, url):
        """Open a new tab and navigate to the provided URL."""

        with self.new_tab():
            with self.marionette.using_context(self.marionette.CONTEXT_CONTENT):
                self.marionette.navigate(url)

    def assertIsValidUUID(self, value):
        """Check if the given UUID is valid."""

        self.assertIsNotNone(value)
        self.assertNotEqual(value, "")

        # Check for client ID that is used when Telemetry upload is disabled
        self.assertNotEqual(value, CANARY_CLIENT_ID, msg="UUID is CANARY CLIENT ID")

        self.assertIsNotNone(
            re.match(UUID_PATTERN, value),
            msg="UUID does not match regular expression",
        )

    def wait_for_pings(self, action_func, ping_filter, count, ping_server=None):
        """Call the given action and wait for pings to come in and return
        the `count` number of pings, that match the given filter.
        """

        if ping_server is None:
            ping_server = self.ping_server

        # Keep track of the current number of pings
        current_num_pings = len(ping_server.pings)

        # New list to store new pings that satisfy the filter
        filtered_pings = []

        def wait_func(*args, **kwargs):
            # Ignore existing pings in ping_server.pings
            new_pings = ping_server.pings[current_num_pings:]

            # Filter pings to make sure we wait for the correct ping type
            filtered_pings[:] = [p for p in new_pings if ping_filter(p)]

            return len(filtered_pings) >= count

        self.logger.info(
            "wait_for_pings running action '{action}'.".format(
                action=action_func.__name__
            )
        )

        # Call given action and wait for a ping
        action_func()

        try:
            Wait(self.marionette, 60).until(wait_func)
        except Exception as e:
            self.fail("Error waiting for ping: {}".format(e))

        return filtered_pings[:count]

    def wait_for_ping(self, action_func, ping_filter, ping_server=None):
        """Call wait_for_pings() with the given action_func and ping_filter and
        return the first result.
        """
        [ping] = self.wait_for_pings(
            action_func, ping_filter, 1, ping_server=ping_server
        )
        return ping

    def restart_browser(self):
        """Restarts browser while maintaining the same profile."""
        return self.marionette.restart(clean=False, in_app=True)

    def start_browser(self):
        """Start the browser."""
        return self.marionette.start_session()

    def quit_browser(self):
        """Quit the browser."""
        return self.marionette.quit()

    def install_addon(self):
        """Install a minimal addon."""
        addon_name = "helloworld"
        self._install_addon(addon_name)

    def install_dynamic_addon(self):
        """Install a dynamic probe addon.

        Source Code:
        https://github.com/mozilla-extensions/dynamic-probe-telemetry-extension
        """
        addon_name = "dynamic_addon/dynamic-probe-telemetry-extension-signed.xpi"
        self._install_addon(addon_name, temp=False)

    def _install_addon(self, addon_name, temp=True):
        """Logic to install addon and add its ID to self.addons.ids"""
        resources_dir = os.path.join(os.path.dirname(__file__), "resources")
        addon_path = os.path.abspath(os.path.join(resources_dir, addon_name))

        try:
            # Ensure the Environment has init'd so the installed addon
            # triggers an "environment-change" ping.
            script = """\
            let [resolve] = arguments;
            const { TelemetryEnvironment } = ChromeUtils.import(
              "resource://gre/modules/TelemetryEnvironment.jsm"
            );
            TelemetryEnvironment.onInitialized().then(resolve);
            """

            with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
                self.marionette.execute_async_script(textwrap.dedent(script))

            addons = Addons(self.marionette)
            addon_id = addons.install(addon_path, temp=temp)
        except MarionetteException as e:
            self.fail("{} - Error installing addon: {} - ".format(e.cause, e))
        else:
            self.addon_ids.append(addon_id)

    def set_persistent_profile_preferences(self, preferences):
        """Wrapper for setting persistent preferences on a user profile"""
        return self.marionette.instance.profile.set_persistent_preferences(preferences)

    def set_preferences(self, preferences):
        """Wrapper for setting persistent preferences on a user profile"""
        return self.marionette.set_prefs(preferences)

    @property
    def client_id(self):
        """Return the ID of the current client."""
        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            return self.marionette.execute_script(
                """\
                const { ClientID } = ChromeUtils.import(
                  "resource://gre/modules/ClientID.jsm"
                );
                return ClientID.getCachedClientID();
                """
            )

    @property
    def subsession_id(self):
        """Return the ID of the current subsession."""
        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            ping_data = self.marionette.execute_script(
                """\
                const { TelemetryController } = ChromeUtils.import(
                  "resource://gre/modules/TelemetryController.jsm"
                );
                return TelemetryController.getCurrentPingData(true);
                """
            )
            return ping_data["payload"]["info"]["subsessionId"]

    def tearDown(self, *args, **kwargs):
        """Stop the ping server and tear down the testcase."""
        super(TelemetryTestCase, self).tearDown()
        self.ping_server.stop()
        self.marionette.quit(in_app=False, clean=True)