summaryrefslogtreecommitdiffstats
path: root/browser/components/sessionstore/test/marionette/session_store_test_case.py
blob: 3bcbcd3f5621e6619cc62f3cf03cf0459bd87d05 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from urllib.parse import quote

from marionette_driver import Wait, errors
from marionette_driver.keys import Keys
from marionette_harness import MarionetteTestCase, WindowManagerMixin


def inline(doc):
    return "data:text/html;charset=utf-8,{}".format(quote(doc))


# Each list element represents a window of tabs loaded at
# some testing URL
DEFAULT_WINDOWS = set(
    [
        # Window 1. Note the comma after the inline call -
        # this is Python's way of declaring a 1 item tuple.
        (inline("""<div">Lorem</div>"""),),
        # Window 2
        (
            inline("""<div">ipsum</div>"""),
            inline("""<div">dolor</div>"""),
        ),
        # Window 3
        (
            inline("""<div">sit</div>"""),
            inline("""<div">amet</div>"""),
        ),
    ]
)


class SessionStoreTestCase(WindowManagerMixin, MarionetteTestCase):
    def setUp(
        self,
        startup_page=1,
        include_private=True,
        restore_on_demand=False,
        no_auto_updates=True,
        win_register_restart=False,
        test_windows=DEFAULT_WINDOWS,
    ):
        super(SessionStoreTestCase, self).setUp()
        self.marionette.set_context("chrome")

        platform = self.marionette.session_capabilities["platformName"]
        self.accelKey = Keys.META if platform == "mac" else Keys.CONTROL

        self.test_windows = test_windows

        self.private_windows = set(
            [
                (
                    inline("""<div">consectetur</div>"""),
                    inline("""<div">ipsum</div>"""),
                ),
                (
                    inline("""<div">adipiscing</div>"""),
                    inline("""<div">consectetur</div>"""),
                ),
            ]
        )

        self.marionette.enforce_gecko_prefs(
            {
                # Set browser restore previous session pref,
                # depending on what the test requires.
                "browser.startup.page": startup_page,
                # Make the content load right away instead of waiting for
                # the user to click on the background tabs
                "browser.sessionstore.restore_on_demand": restore_on_demand,
                # Avoid race conditions by having the content process never
                # send us session updates unless the parent has explicitly asked
                # for them via the TabStateFlusher.
                "browser.sessionstore.debug.no_auto_updates": no_auto_updates,
                # Whether to enable the register application restart mechanism.
                "toolkit.winRegisterApplicationRestart": win_register_restart,
            }
        )

        self.all_windows = self.test_windows.copy()
        self.open_windows(self.test_windows)

        if include_private:
            self.all_windows.update(self.private_windows)
            self.open_windows(self.private_windows, is_private=True)

    def tearDown(self):
        try:
            # Create a fresh profile for subsequent tests.
            self.marionette.restart(in_app=False, clean=True)
        finally:
            super(SessionStoreTestCase, self).tearDown()

    def open_windows(self, window_sets, is_private=False):
        """Open a set of windows with tabs pointing at some URLs.

        @param window_sets (list)
               A set of URL tuples. Each tuple within window_sets
               represents a window, and each URL in the URL
               tuples represents what will be loaded in a tab.

               Note that if is_private is False, then the first
               URL tuple will be opened in the current window, and
               subequent tuples will be opened in new windows.

               Example:

               set(
                   (self.marionette.absolute_url('layout/mozilla_1.html'),
                    self.marionette.absolute_url('layout/mozilla_2.html')),

                   (self.marionette.absolute_url('layout/mozilla_3.html'),
                    self.marionette.absolute_url('layout/mozilla_4.html')),
               )

               This would take the currently open window, and load
               mozilla_1.html and mozilla_2.html in new tabs. It would
               then open a new, second window, and load tabs at
               mozilla_3.html and mozilla_4.html.
        @param is_private (boolean, optional)
               Whether or not any new windows should be a private browsing
               windows.
        """
        if is_private:
            win = self.open_window(private=True)
            self.marionette.switch_to_window(win)
        else:
            win = self.marionette.current_chrome_window_handle

        for index, urls in enumerate(window_sets):
            if index > 0:
                win = self.open_window(private=is_private)
                self.marionette.switch_to_window(win)
            self.open_tabs(win, urls)

    def open_tabs(self, win, urls):
        """Open a set of URLs inside a window in new tabs.

        @param win (browser window)
               The browser window to load the tabs in.
        @param urls (tuple)
               A tuple of URLs to load in this window. The
               first URL will be loaded in the currently selected
               browser tab. Subsequent URLs will be loaded in
               new tabs.
        """
        # If there are any remaining URLs for this window,
        # open some new tabs and navigate to them.
        with self.marionette.using_context("content"):
            if isinstance(urls, str):
                self.marionette.navigate(urls)
            else:
                for index, url in enumerate(urls):
                    if index > 0:
                        tab = self.open_tab()
                        self.marionette.switch_to_window(tab)
                    self.marionette.navigate(url)

    def wait_for_windows(self, expected_windows, message, timeout=5):
        current_windows = None

        def check(_):
            nonlocal current_windows
            current_windows = self.convert_open_windows_to_set()
            return current_windows == expected_windows

        try:
            wait = Wait(self.marionette, timeout=timeout, interval=0.1)
            wait.until(check, message=message)
        except errors.TimeoutException as e:
            # Update the message to include the most recent list of windows
            message = (
                f"{e.message}. Expected {expected_windows}, got {current_windows}."
            )
            raise errors.TimeoutException(message)

    def get_urls_for_window(self, win):
        orig_handle = self.marionette.current_chrome_window_handle

        try:
            with self.marionette.using_context("chrome"):
                self.marionette.switch_to_window(win)
                return self.marionette.execute_script(
                    """
                  return gBrowser.tabs.map(tab => {
                    return tab.linkedBrowser.currentURI.spec;
                  });
                """
                )
        finally:
            self.marionette.switch_to_window(orig_handle)

    def convert_open_windows_to_set(self):
        # There's no guarantee that Marionette will return us an
        # iterator for the opened windows that will match the
        # order within our window list. Instead, we'll convert
        # the list of URLs within each open window to a set of
        # tuples that will allow us to do a direct comparison
        # while allowing the windows to be in any order.
        opened_windows = set()
        for win in self.marionette.chrome_window_handles:
            urls = tuple(self.get_urls_for_window(win))
            opened_windows.add(urls)

        return opened_windows

    def _close_tab_shortcut(self):
        self.marionette.actions.sequence("key", "keyboard_id").key_down(
            self.accelKey
        ).key_down("w").key_up("w").key_up(self.accelKey).perform()

    def close_all_tabs_and_restart(self):
        self.close_all_tabs()
        self.marionette.quit(callback=self._close_tab_shortcut)
        self.marionette.start_session()

    def simulate_os_shutdown(self):
        """Simulate an OS shutdown.

        :raises: Exception: if not supported on the current platform
        :raises: WindowsError: if a Windows API call failed
        """
        if self.marionette.session_capabilities["platformName"] != "windows":
            raise Exception("Unsupported platform for simulate_os_shutdown")

        self._shutdown_with_windows_restart_manager(self.marionette.process_id)

    def _shutdown_with_windows_restart_manager(self, pid):
        """Shut down a process using the Windows Restart Manager.

        When Windows shuts down, it uses a protocol including the
        WM_QUERYENDSESSION and WM_ENDSESSION messages to give
        applications a chance to shut down safely. The best way to
        simulate this is via the Restart Manager, which allows a process
        (such as an installer) to use the same mechanism to shut down
        any other processes which are using registered resources.

        This function starts a Restart Manager session, registers the
        process as a resource, and shuts down the process.

        :param pid: The process id (int) of the process to shutdown

        :raises: WindowsError: if a Windows API call fails
        """
        import ctypes
        from ctypes import POINTER, WINFUNCTYPE, Structure, WinError, pointer, windll
        from ctypes.wintypes import BOOL, DWORD, HANDLE, LPCWSTR, UINT, ULONG, WCHAR

        # set up Windows SDK types
        OpenProcess = windll.kernel32.OpenProcess
        OpenProcess.restype = HANDLE
        OpenProcess.argtypes = [
            DWORD,  # dwDesiredAccess
            BOOL,  # bInheritHandle
            DWORD,
        ]  # dwProcessId
        PROCESS_QUERY_INFORMATION = 0x0400

        class FILETIME(Structure):
            _fields_ = [("dwLowDateTime", DWORD), ("dwHighDateTime", DWORD)]

        LPFILETIME = POINTER(FILETIME)

        GetProcessTimes = windll.kernel32.GetProcessTimes
        GetProcessTimes.restype = BOOL
        GetProcessTimes.argtypes = [
            HANDLE,  # hProcess
            LPFILETIME,  # lpCreationTime
            LPFILETIME,  # lpExitTime
            LPFILETIME,  # lpKernelTime
            LPFILETIME,
        ]  # lpUserTime

        ERROR_SUCCESS = 0

        class RM_UNIQUE_PROCESS(Structure):
            _fields_ = [("dwProcessId", DWORD), ("ProcessStartTime", FILETIME)]

        RmStartSession = windll.rstrtmgr.RmStartSession
        RmStartSession.restype = DWORD
        RmStartSession.argtypes = [
            POINTER(DWORD),  # pSessionHandle
            DWORD,  # dwSessionFlags
            POINTER(WCHAR),
        ]  # strSessionKey

        class GUID(ctypes.Structure):
            _fields_ = [
                ("Data1", ctypes.c_ulong),
                ("Data2", ctypes.c_ushort),
                ("Data3", ctypes.c_ushort),
                ("Data4", ctypes.c_ubyte * 8),
            ]

        CCH_RM_SESSION_KEY = ctypes.sizeof(GUID) * 2

        RmRegisterResources = windll.rstrtmgr.RmRegisterResources
        RmRegisterResources.restype = DWORD
        RmRegisterResources.argtypes = [
            DWORD,  # dwSessionHandle
            UINT,  # nFiles
            POINTER(LPCWSTR),  # rgsFilenames
            UINT,  # nApplications
            POINTER(RM_UNIQUE_PROCESS),  # rgApplications
            UINT,  # nServices
            POINTER(LPCWSTR),
        ]  # rgsServiceNames

        RM_WRITE_STATUS_CALLBACK = WINFUNCTYPE(None, UINT)
        RmShutdown = windll.rstrtmgr.RmShutdown
        RmShutdown.restype = DWORD
        RmShutdown.argtypes = [
            DWORD,  # dwSessionHandle
            ULONG,  # lActionFlags
            RM_WRITE_STATUS_CALLBACK,
        ]  # fnStatus

        RmEndSession = windll.rstrtmgr.RmEndSession
        RmEndSession.restype = DWORD
        RmEndSession.argtypes = [DWORD]  # dwSessionHandle

        # Get the info needed to uniquely identify the process
        hProc = OpenProcess(PROCESS_QUERY_INFORMATION, False, pid)
        if not hProc:
            raise WinError()

        creationTime = FILETIME()
        exitTime = FILETIME()
        kernelTime = FILETIME()
        userTime = FILETIME()
        if not GetProcessTimes(
            hProc,
            pointer(creationTime),
            pointer(exitTime),
            pointer(kernelTime),
            pointer(userTime),
        ):
            raise WinError()

        # Start the Restart Manager Session
        dwSessionHandle = DWORD()
        sessionKeyType = WCHAR * (CCH_RM_SESSION_KEY + 1)
        sessionKey = sessionKeyType()
        if RmStartSession(pointer(dwSessionHandle), 0, sessionKey) != ERROR_SUCCESS:
            raise WinError()

        try:
            UProcs_count = 1
            UProcsArrayType = RM_UNIQUE_PROCESS * UProcs_count
            UProcs = UProcsArrayType(RM_UNIQUE_PROCESS(pid, creationTime))

            # Register the process as a resource
            if (
                RmRegisterResources(
                    dwSessionHandle, 0, None, UProcs_count, UProcs, 0, None
                )
                != ERROR_SUCCESS
            ):
                raise WinError()

            # Shut down all processes using registered resources
            if (
                RmShutdown(
                    dwSessionHandle, 0, ctypes.cast(None, RM_WRITE_STATUS_CALLBACK)
                )
                != ERROR_SUCCESS
            ):
                raise WinError()

        finally:
            RmEndSession(dwSessionHandle)

    def windows_shutdown_with_variety(self, restart_by_os, expect_restore):
        """Test restoring windows after Windows shutdown.

        Opens a set of windows, both standard and private, with
        some number of tabs in them. Once the tabs have loaded, shuts down
        the browser with the Windows Restart Manager and restarts the browser.

        This specifically exercises the Windows synchronous shutdown mechanism,
        which terminates the process in response to the Restart Manager's
        WM_ENDSESSION message.

        If restart_by_os is True, the -os-restarted arg is passed when restarting,
        simulating being automatically restarted by the Restart Manager.

        If expect_restore is True, this ensures that the standard tabs have been
        restored, and that the private ones have not. Otherwise it ensures that
        no tabs and windows have been restored.
        """
        current_windows_set = self.convert_open_windows_to_set()
        self.assertEqual(
            current_windows_set,
            self.all_windows,
            msg="Not all requested windows have been opened. Expected {}, got {}.".format(
                self.all_windows, current_windows_set
            ),
        )

        self.marionette.quit(callback=lambda: self.simulate_os_shutdown())

        saved_args = self.marionette.instance.app_args
        try:
            if restart_by_os:
                self.marionette.instance.app_args = ["-os-restarted"]

            self.marionette.start_session()
            self.marionette.set_context("chrome")
        finally:
            self.marionette.instance.app_args = saved_args

        if expect_restore:
            self.wait_for_windows(
                self.test_windows,
                "Non private browsing windows should have been restored",
            )
        else:
            self.assertEqual(
                len(self.marionette.chrome_window_handles),
                1,
                msg="Windows from last session shouldn`t have been restored.",
            )
            self.assertEqual(
                len(self.marionette.window_handles),
                1,
                msg="Tabs from last session shouldn`t have been restored.",
            )