summaryrefslogtreecommitdiffstats
path: root/dom/quota/test/marionette/quota_test_case.py
blob: c31edcaaf76957b0262414e0b4d16f10df52aa56 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import os
from contextlib import contextmanager

from marionette_driver import Wait
from marionette_harness import MarionetteTestCase


class QuotaTestCase(MarionetteTestCase):
    def ensureInvariantHolds(self, op):
        maxWaitTime = 60
        Wait(self.marionette, timeout=maxWaitTime).until(
            op,
            message=f"operation did not yield success even after waiting {maxWaitTime}s time",
        )

    def findDirObj(self, path, pattern, isFile):
        for obj in os.scandir(path):
            if obj.path.endswith(pattern) and (obj.is_file() == isFile):
                return obj.path
        return None

    def getFullOriginMetadata(self, persistenceType, origin):
        with self.marionette.using_context("chrome"):
            res = self.marionette.execute_async_script(
                """
                    const [persistenceType, origin, resolve] = arguments;

                    const principal = Services.scriptSecurityManager.
                        createContentPrincipalFromOrigin(origin);

                    const request = Services.qms.getFullOriginMetadata(
                        persistenceType, principal);

                    request.callback = function() {
                      if (request.resultCode != Cr.NS_OK) {
                        resolve(null);
                      } else {
                        resolve(request.result);
                      }
                    }
                """,
                script_args=(persistenceType, origin),
                new_sandbox=False,
            )

            assert res is not None
            return res

    def getStoragePath(self, profilePath, origin, persistenceType, client):
        fullOriginMetadata = self.getFullOriginMetadata(persistenceType, origin)

        storageOrigin = fullOriginMetadata["storageOrigin"]
        sanitizedStorageOrigin = storageOrigin.replace(":", "+").replace("/", "+")

        return os.path.join(
            profilePath, "storage", persistenceType, sanitizedStorageOrigin, client
        )

    def resetStoragesForPrincipal(self, origin, persistenceType, client):
        # This method is used to force sqlite to write journal file contents to
        # main sqlite database file

        script = """
            const [resolve] = arguments

            let origin = '%s';
            let persistenceType = '%s';
            let client = '%s';
            let principal = Services.scriptSecurityManager.
                                createContentPrincipalFromOrigin(origin);

            let req = Services.qms.resetStoragesForPrincipal(principal, persistenceType, client);
            req.callback = () => {
                if (req.resultCode == Cr.NS_OK) {
                    resolve(true);
                } else {
                    resolve(false);
                }
            }
            """ % (
            origin,
            persistenceType,
            client,
        )

        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            return self.marionette.execute_async_script(script)

    @contextmanager
    def using_new_window(self, path, private=False):
        """
        This helper method is created to ensure that a temporary
        window required inside the test scope is lifetime'd properly
        """

        oldWindow = self.marionette.current_window_handle
        try:
            newWindow = self.marionette.open(type="window", private=private)
            self.marionette.switch_to_window(newWindow["handle"])
            self.marionette.navigate(self.marionette.absolute_url(path))
            origin = self.marionette.absolute_url("")[:-1]
            if private:
                origin += "^privateBrowsingId=1"

            yield (origin, "private" if private else "default")

        finally:
            self.marionette.close()
            self.marionette.switch_to_window(oldWindow)