summaryrefslogtreecommitdiffstats
path: root/testing/marionette/harness/marionette_harness/tests/unit/test_marionette.py
blob: 790a802975c218cbfe6a4ec148714bbab4f97614 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import os
import socket
import time

from marionette_driver import errors
from marionette_driver.marionette import Marionette
from marionette_harness import MarionetteTestCase, run_if_manage_instance


class TestMarionette(MarionetteTestCase):
    def test_correct_test_name(self):
        """Test that the correct test name gets set."""
        expected_test_name = "{module}.py {cls}.{func}".format(
            module=__name__,
            cls=self.__class__.__name__,
            func=self.test_correct_test_name.__name__,
        )

        self.assertIn(expected_test_name, self.marionette.test_name)

    @run_if_manage_instance("Only runnable if Marionette manages the instance")
    def test_raise_for_port_non_existing_process(self):
        """Test that raise_for_port doesn't run into a timeout if instance is not running."""
        self.marionette.quit()
        self.assertIsNotNone(self.marionette.instance.runner.returncode)
        start_time = time.time()
        self.assertRaises(socket.timeout, self.marionette.raise_for_port, timeout=5)
        self.assertLess(time.time() - start_time, 5)

    @run_if_manage_instance("Only runnable if Marionette manages the instance")
    def test_marionette_active_port_file(self):
        active_port_file = os.path.join(
            self.marionette.instance.profile.profile, "MarionetteActivePort"
        )
        self.assertTrue(
            os.path.exists(active_port_file), "MarionetteActivePort file written"
        )
        with open(active_port_file, "r") as fp:
            lines = fp.readlines()
        self.assertEqual(len(lines), 1, "MarionetteActivePort file contains two lines")
        self.assertEqual(
            int(lines[0]),
            self.marionette.port,
            "MarionetteActivePort file contains port",
        )

        self.marionette.quit()
        self.assertFalse(
            os.path.exists(active_port_file), "MarionetteActivePort file removed"
        )

    def test_single_active_session(self):
        self.assertEqual(1, self.marionette.execute_script("return 1"))

        # Use a new Marionette instance for the connection attempt, while there is
        # still an active session present.
        marionette = Marionette(host=self.marionette.host, port=self.marionette.port)
        self.assertRaises(socket.timeout, marionette.raise_for_port, timeout=1.0)

    def test_disable_enable_new_connections(self):
        # Do not re-create socket if it already exists
        self.marionette._send_message("Marionette:AcceptConnections", {"value": True})

        try:
            # Disabling new connections does not affect the existing one.
            self.marionette._send_message(
                "Marionette:AcceptConnections", {"value": False}
            )
            self.assertEqual(1, self.marionette.execute_script("return 1"))

            # Delete the current active session to allow new connection attempts.
            self.marionette.delete_session()

            # Use a new Marionette instance for the connection attempt, that doesn't
            # handle an instance of the application to prevent a connection lost error.
            marionette = Marionette(
                host=self.marionette.host, port=self.marionette.port
            )
            self.assertRaises(socket.timeout, marionette.raise_for_port, timeout=1.0)

        finally:
            self.marionette.quit(in_app=False)

    def test_client_socket_uses_expected_socket_timeout(self):
        current_socket_timeout = self.marionette.socket_timeout

        self.assertEqual(current_socket_timeout, self.marionette.client.socket_timeout)
        self.assertEqual(
            current_socket_timeout,
            self.marionette.client._socket_context._sock.gettimeout(),
        )

    def test_application_update_disabled(self):
        # Updates of the application should always be disabled by default
        with self.marionette.using_context("chrome"):
            update_allowed = self.marionette.execute_script(
                """
              let aus = Cc['@mozilla.org/updates/update-service;1']
                        .getService(Ci.nsIApplicationUpdateService);
              return aus.canCheckForUpdates;
            """
            )

        self.assertFalse(update_allowed)


class TestContext(MarionetteTestCase):
    def setUp(self):
        MarionetteTestCase.setUp(self)
        self.marionette.set_context(self.marionette.CONTEXT_CONTENT)

    def get_context(self):
        return self.marionette._send_message("Marionette:GetContext", key="value")

    def set_context(self, value):
        return self.marionette._send_message("Marionette:SetContext", {"value": value})

    def test_set_context(self):
        self.assertEqual(self.set_context("content"), {"value": None})
        self.assertEqual(self.set_context("chrome"), {"value": None})

        for typ in [True, 42, [], {}, None]:
            with self.assertRaises(errors.InvalidArgumentException):
                self.set_context(typ)

        with self.assertRaises(errors.MarionetteException):
            self.set_context("foo")

    def test_get_context(self):
        self.assertEqual(self.get_context(), "content")
        self.set_context("chrome")
        self.assertEqual(self.get_context(), "chrome")
        self.set_context("content")
        self.assertEqual(self.get_context(), "content")