summaryrefslogtreecommitdiffstats
path: root/testing/mozharness/test
diff options
context:
space:
mode:
Diffstat (limited to 'testing/mozharness/test')
-rw-r--r--testing/mozharness/test/README2
-rw-r--r--testing/mozharness/test/helper_files/.noserc2
-rw-r--r--testing/mozharness/test/helper_files/archives/archive-double-link.tarbin0 -> 10240 bytes
-rw-r--r--testing/mozharness/test/helper_files/archives/archive-escape.tarbin0 -> 10240 bytes
-rw-r--r--testing/mozharness/test/helper_files/archives/archive-link-abs.tarbin0 -> 10240 bytes
-rw-r--r--testing/mozharness/test/helper_files/archives/archive-link.tarbin0 -> 10240 bytes
-rw-r--r--testing/mozharness/test/helper_files/archives/archive-setuid.tarbin0 -> 10240 bytes
-rw-r--r--testing/mozharness/test/helper_files/archives/archive.tarbin0 -> 10240 bytes
-rw-r--r--testing/mozharness/test/helper_files/archives/archive.tar.bz2bin0 -> 256 bytes
-rw-r--r--testing/mozharness/test/helper_files/archives/archive.tar.gzbin0 -> 260 bytes
-rw-r--r--testing/mozharness/test/helper_files/archives/archive.zipbin0 -> 517 bytes
-rw-r--r--testing/mozharness/test/helper_files/archives/archive_invalid_filename.zipbin0 -> 166 bytes
-rwxr-xr-xtesting/mozharness/test/helper_files/archives/reference/bin/script.sh3
-rw-r--r--testing/mozharness/test/helper_files/archives/reference/lorem.txt1
-rwxr-xr-xtesting/mozharness/test/helper_files/create_archives.sh11
-rwxr-xr-xtesting/mozharness/test/helper_files/init_hgrepo.sh23
-rw-r--r--testing/mozharness/test/helper_files/locales.json18
-rw-r--r--testing/mozharness/test/helper_files/locales.txt4
-rw-r--r--testing/mozharness/test/helper_files/mozconfig_manifest.json3
-rw-r--r--testing/mozharness/test/hgrc9
-rw-r--r--testing/mozharness/test/pip-freeze.example.txt19
-rw-r--r--testing/mozharness/test/test_base_config.py376
-rw-r--r--testing/mozharness/test/test_base_diskutils.py89
-rw-r--r--testing/mozharness/test/test_base_log.py43
-rw-r--r--testing/mozharness/test/test_base_parallel.py28
-rw-r--r--testing/mozharness/test/test_base_python.py39
-rw-r--r--testing/mozharness/test/test_base_script.py983
-rw-r--r--testing/mozharness/test/test_base_vcs_mercurial.py396
-rw-r--r--testing/mozharness/test/test_l10n_locales.py118
-rw-r--r--testing/mozharness/test/test_mozilla_automation.py45
-rw-r--r--testing/mozharness/test/test_mozilla_building_buildbase.py146
-rw-r--r--testing/mozharness/test/test_mozilla_merkle.py134
-rw-r--r--testing/mozharness/test/test_mozilla_structured.py68
33 files changed, 2560 insertions, 0 deletions
diff --git a/testing/mozharness/test/README b/testing/mozharness/test/README
new file mode 100644
index 0000000000..889c8a83d4
--- /dev/null
+++ b/testing/mozharness/test/README
@@ -0,0 +1,2 @@
+test/ : non-network-dependent unit tests
+test/networked/ : network-dependent unit tests.
diff --git a/testing/mozharness/test/helper_files/.noserc b/testing/mozharness/test/helper_files/.noserc
new file mode 100644
index 0000000000..e6f21cf31d
--- /dev/null
+++ b/testing/mozharness/test/helper_files/.noserc
@@ -0,0 +1,2 @@
+[nosetests]
+with-xunit=1
diff --git a/testing/mozharness/test/helper_files/archives/archive-double-link.tar b/testing/mozharness/test/helper_files/archives/archive-double-link.tar
new file mode 100644
index 0000000000..a1b77d3af8
--- /dev/null
+++ b/testing/mozharness/test/helper_files/archives/archive-double-link.tar
Binary files differ
diff --git a/testing/mozharness/test/helper_files/archives/archive-escape.tar b/testing/mozharness/test/helper_files/archives/archive-escape.tar
new file mode 100644
index 0000000000..eb6ace807e
--- /dev/null
+++ b/testing/mozharness/test/helper_files/archives/archive-escape.tar
Binary files differ
diff --git a/testing/mozharness/test/helper_files/archives/archive-link-abs.tar b/testing/mozharness/test/helper_files/archives/archive-link-abs.tar
new file mode 100644
index 0000000000..1bbf9c0f46
--- /dev/null
+++ b/testing/mozharness/test/helper_files/archives/archive-link-abs.tar
Binary files differ
diff --git a/testing/mozharness/test/helper_files/archives/archive-link.tar b/testing/mozharness/test/helper_files/archives/archive-link.tar
new file mode 100644
index 0000000000..2181c24a93
--- /dev/null
+++ b/testing/mozharness/test/helper_files/archives/archive-link.tar
Binary files differ
diff --git a/testing/mozharness/test/helper_files/archives/archive-setuid.tar b/testing/mozharness/test/helper_files/archives/archive-setuid.tar
new file mode 100644
index 0000000000..55078dc826
--- /dev/null
+++ b/testing/mozharness/test/helper_files/archives/archive-setuid.tar
Binary files differ
diff --git a/testing/mozharness/test/helper_files/archives/archive.tar b/testing/mozharness/test/helper_files/archives/archive.tar
new file mode 100644
index 0000000000..1dc094198f
--- /dev/null
+++ b/testing/mozharness/test/helper_files/archives/archive.tar
Binary files differ
diff --git a/testing/mozharness/test/helper_files/archives/archive.tar.bz2 b/testing/mozharness/test/helper_files/archives/archive.tar.bz2
new file mode 100644
index 0000000000..c393ea4b88
--- /dev/null
+++ b/testing/mozharness/test/helper_files/archives/archive.tar.bz2
Binary files differ
diff --git a/testing/mozharness/test/helper_files/archives/archive.tar.gz b/testing/mozharness/test/helper_files/archives/archive.tar.gz
new file mode 100644
index 0000000000..0fbfa39b1c
--- /dev/null
+++ b/testing/mozharness/test/helper_files/archives/archive.tar.gz
Binary files differ
diff --git a/testing/mozharness/test/helper_files/archives/archive.zip b/testing/mozharness/test/helper_files/archives/archive.zip
new file mode 100644
index 0000000000..aa2fb34c16
--- /dev/null
+++ b/testing/mozharness/test/helper_files/archives/archive.zip
Binary files differ
diff --git a/testing/mozharness/test/helper_files/archives/archive_invalid_filename.zip b/testing/mozharness/test/helper_files/archives/archive_invalid_filename.zip
new file mode 100644
index 0000000000..20bdc5acdf
--- /dev/null
+++ b/testing/mozharness/test/helper_files/archives/archive_invalid_filename.zip
Binary files differ
diff --git a/testing/mozharness/test/helper_files/archives/reference/bin/script.sh b/testing/mozharness/test/helper_files/archives/reference/bin/script.sh
new file mode 100755
index 0000000000..134f2933c9
--- /dev/null
+++ b/testing/mozharness/test/helper_files/archives/reference/bin/script.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+echo Hello world!
diff --git a/testing/mozharness/test/helper_files/archives/reference/lorem.txt b/testing/mozharness/test/helper_files/archives/reference/lorem.txt
new file mode 100644
index 0000000000..d2cf010d36
--- /dev/null
+++ b/testing/mozharness/test/helper_files/archives/reference/lorem.txt
@@ -0,0 +1 @@
+Lorem ipsum dolor sit amet.
diff --git a/testing/mozharness/test/helper_files/create_archives.sh b/testing/mozharness/test/helper_files/create_archives.sh
new file mode 100755
index 0000000000..314b55d276
--- /dev/null
+++ b/testing/mozharness/test/helper_files/create_archives.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+# Script to auto-generate the different archive types under the archives directory.
+
+cd archives
+
+rm archive.*
+
+tar cf archive.tar -C reference .
+gzip -fk archive.tar >archive.tar.gz
+bzip2 -fk archive.tar >archive.tar.bz2
+cd reference && zip ../archive.zip -r * && cd ..
diff --git a/testing/mozharness/test/helper_files/init_hgrepo.sh b/testing/mozharness/test/helper_files/init_hgrepo.sh
new file mode 100755
index 0000000000..0f4561695f
--- /dev/null
+++ b/testing/mozharness/test/helper_files/init_hgrepo.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+# Set up an hg repo for testing
+dest=$1
+if [ -z "$dest" ]; then
+ echo You must specify a destination directory 1>&2
+ exit 1
+fi
+
+rm -rf $dest
+hg init $dest
+cd $dest
+
+echo "Hello world $RANDOM" > hello.txt
+hg add hello.txt
+hg commit -m "Adding hello"
+
+hg branch branch2 > /dev/null
+echo "So long, farewell" >> hello.txt
+hg commit -m "Changing hello on branch"
+
+hg checkout default
+echo "Is this thing on?" >> hello.txt
+hg commit -m "Last change on default"
diff --git a/testing/mozharness/test/helper_files/locales.json b/testing/mozharness/test/helper_files/locales.json
new file mode 100644
index 0000000000..c9056b1d15
--- /dev/null
+++ b/testing/mozharness/test/helper_files/locales.json
@@ -0,0 +1,18 @@
+{
+ "ar": {
+ "revision": "default",
+ "platforms": ["maemo"]
+ },
+ "be": {
+ "revision": "default",
+ "platforms": ["maemo"]
+ },
+ "de": {
+ "revision": "default",
+ "platforms": ["maemo", "maemo-multilocale", "android-multilocale"]
+ },
+ "es-ES": {
+ "revision": "default",
+ "platforms": ["maemo", "maemo-multilocale", "android-multilocale"]
+ }
+}
diff --git a/testing/mozharness/test/helper_files/locales.txt b/testing/mozharness/test/helper_files/locales.txt
new file mode 100644
index 0000000000..0b65ab76df
--- /dev/null
+++ b/testing/mozharness/test/helper_files/locales.txt
@@ -0,0 +1,4 @@
+ar
+be
+de
+es-ES
diff --git a/testing/mozharness/test/helper_files/mozconfig_manifest.json b/testing/mozharness/test/helper_files/mozconfig_manifest.json
new file mode 100644
index 0000000000..6f3fc3eea9
--- /dev/null
+++ b/testing/mozharness/test/helper_files/mozconfig_manifest.json
@@ -0,0 +1,3 @@
+{
+ "gecko_path": "path/to/mozconfig"
+}
diff --git a/testing/mozharness/test/hgrc b/testing/mozharness/test/hgrc
new file mode 100644
index 0000000000..85e670518b
--- /dev/null
+++ b/testing/mozharness/test/hgrc
@@ -0,0 +1,9 @@
+[extensions]
+mq =
+purge =
+rebase =
+share =
+transplant =
+
+[ui]
+username = tester <tester@example.com>
diff --git a/testing/mozharness/test/pip-freeze.example.txt b/testing/mozharness/test/pip-freeze.example.txt
new file mode 100644
index 0000000000..56e06923fc
--- /dev/null
+++ b/testing/mozharness/test/pip-freeze.example.txt
@@ -0,0 +1,19 @@
+MakeItSo==0.2.6
+PyYAML==3.10
+Tempita==0.5.1
+WebOb==1.2b3
+-e hg+http://k0s.org/mozilla/hg/configuration@35416ad140982c11eba0a2d6b96d683f53429e94#egg=configuration-dev
+coverage==3.5.1
+-e hg+http://k0s.org/mozilla/hg/jetperf@4645ae34d2c41a353dcdbd856b486b6d3faabb99#egg=jetperf-dev
+logilab-astng==0.23.1
+logilab-common==0.57.1
+mozdevice==0.2
+-e hg+https://hg.mozilla.org/build/mozharness@df6b7f1e14d8c472125ef7a77b8a3b40c96ae181#egg=mozharness-jetperf
+mozhttpd==0.3
+mozinfo==0.3.3
+nose==1.1.2
+pyflakes==0.5.0
+pylint==0.25.1
+-e hg+https://hg.mozilla.org/build/talos@ee5c0b090d808e81a8fc5ba5f96b012797b3e785#egg=talos-dev
+virtualenv==1.7.1.2
+wsgiref==0.1.2
diff --git a/testing/mozharness/test/test_base_config.py b/testing/mozharness/test/test_base_config.py
new file mode 100644
index 0000000000..cafdbbca73
--- /dev/null
+++ b/testing/mozharness/test/test_base_config.py
@@ -0,0 +1,376 @@
+import os
+import unittest
+from copy import deepcopy
+
+JSON_TYPE = None
+try:
+ import simplejson as json
+
+ assert json
+except ImportError:
+ import json
+
+ JSON_TYPE = "json"
+else:
+ JSON_TYPE = "simplejson"
+
+import mozharness.base.config as config
+
+MH_DIR = os.path.dirname(os.path.dirname(__file__))
+
+
+class TestParseConfigFile(unittest.TestCase):
+ def _get_json_config(
+ self,
+ filename=os.path.join(MH_DIR, "configs", "test", "test.json"),
+ output="dict",
+ ):
+ fh = open(filename)
+ contents = json.load(fh)
+ fh.close()
+ if "output" == "dict":
+ return dict(contents)
+ else:
+ return contents
+
+ def _get_python_config(
+ self, filename=os.path.join(MH_DIR, "configs", "test", "test.py"), output="dict"
+ ):
+ global_dict = {}
+ local_dict = {}
+ # exec(open(filename).read(), global_dict, local_dict)
+ exec(
+ compile(open(filename, "rb").read(), filename, "exec"),
+ global_dict,
+ local_dict,
+ )
+ return local_dict["config"]
+
+ def test_json_config(self):
+ c = config.BaseConfig(initial_config_file="test/test.json")
+ content_dict = self._get_json_config()
+ for key in content_dict.keys():
+ self.assertEqual(content_dict[key], c._config[key])
+
+ def test_python_config(self):
+ c = config.BaseConfig(initial_config_file="test/test.py")
+ config_dict = self._get_python_config()
+ for key in config_dict.keys():
+ self.assertEqual(config_dict[key], c._config[key])
+
+ def test_illegal_config(self):
+ self.assertRaises(
+ IOError,
+ config.parse_config_file,
+ "this_file_does_not_exist.py",
+ search_path="yadda",
+ )
+
+ def test_illegal_suffix(self):
+ self.assertRaises(
+ RuntimeError, config.parse_config_file, "test/test.illegal_suffix"
+ )
+
+ def test_malformed_json(self):
+ if JSON_TYPE == "simplejson":
+ self.assertRaises(
+ json.decoder.JSONDecodeError,
+ config.parse_config_file,
+ "test/test_malformed.json",
+ )
+ else:
+ self.assertRaises(
+ ValueError, config.parse_config_file, "test/test_malformed.json"
+ )
+
+ def test_malformed_python(self):
+ self.assertRaises(
+ SyntaxError, config.parse_config_file, "test/test_malformed.py"
+ )
+
+ def test_multiple_config_files_override_string(self):
+ c = config.BaseConfig(initial_config_file="test/test.py")
+ c.parse_args(["--cfg", "test/test_override.py,test/test_override2.py"])
+ self.assertEqual(c._config["override_string"], "yay")
+
+ def test_multiple_config_files_override_list(self):
+ c = config.BaseConfig(initial_config_file="test/test.py")
+ c.parse_args(["--cfg", "test/test_override.py,test/test_override2.py"])
+ self.assertEqual(c._config["override_list"], ["yay", "worked"])
+
+ def test_multiple_config_files_override_dict(self):
+ c = config.BaseConfig(initial_config_file="test/test.py")
+ c.parse_args(["--cfg", "test/test_override.py,test/test_override2.py"])
+ self.assertEqual(c._config["override_dict"], {"yay": "worked"})
+
+ def test_multiple_config_files_keep_string(self):
+ c = config.BaseConfig(initial_config_file="test/test.py")
+ c.parse_args(["--cfg", "test/test_override.py,test/test_override2.py"])
+ self.assertEqual(c._config["keep_string"], "don't change me")
+
+ def test_optional_config_files_override_value(self):
+ c = config.BaseConfig(initial_config_file="test/test.py")
+ c.parse_args(
+ [
+ "--cfg",
+ "test/test_override.py,test/test_override2.py",
+ "--opt-cfg",
+ "test/test_optional.py",
+ ]
+ )
+ self.assertEqual(c._config["opt_override"], "new stuff")
+
+ def test_optional_config_files_missing_config(self):
+ c = config.BaseConfig(initial_config_file="test/test.py")
+ c.parse_args(
+ [
+ "--cfg",
+ "test/test_override.py,test/test_override2.py",
+ "--opt-cfg",
+ "test/test_optional.py,does_not_exist.py",
+ ]
+ )
+ self.assertEqual(c._config["opt_override"], "new stuff")
+
+ def test_optional_config_files_keep_string(self):
+ c = config.BaseConfig(initial_config_file="test/test.py")
+ c.parse_args(
+ [
+ "--cfg",
+ "test/test_override.py,test/test_override2.py",
+ "--opt-cfg",
+ "test/test_optional.py",
+ ]
+ )
+ self.assertEqual(c._config["keep_string"], "don't change me")
+
+
+class TestReadOnlyDict(unittest.TestCase):
+ control_dict = {
+ "b": "2",
+ "c": {"d": "4"},
+ "h": ["f", "g"],
+ "e": ["f", "g", {"turtles": ["turtle1"]}],
+ "d": {"turtles": ["turtle1"]},
+ }
+
+ def get_unlocked_ROD(self):
+ r = config.ReadOnlyDict(self.control_dict)
+ return r
+
+ def get_locked_ROD(self):
+ r = config.ReadOnlyDict(self.control_dict)
+ r.lock()
+ return r
+
+ def test_create_ROD(self):
+ r = self.get_unlocked_ROD()
+ self.assertEqual(
+ r, self.control_dict, msg="can't transfer dict to ReadOnlyDict"
+ )
+
+ def test_pop_item(self):
+ r = self.get_unlocked_ROD()
+ r.popitem()
+ self.assertEqual(
+ len(r),
+ len(self.control_dict) - 1,
+ msg="can't popitem() ReadOnlyDict when unlocked",
+ )
+
+ def test_pop(self):
+ r = self.get_unlocked_ROD()
+ r.pop("e")
+ self.assertEqual(
+ len(r),
+ len(self.control_dict) - 1,
+ msg="can't pop() ReadOnlyDict when unlocked",
+ )
+
+ def test_set(self):
+ r = self.get_unlocked_ROD()
+ r["e"] = "yarrr"
+ self.assertEqual(
+ r["e"], "yarrr", msg="can't set var in ReadOnlyDict when unlocked"
+ )
+
+ def test_del(self):
+ r = self.get_unlocked_ROD()
+ del r["e"]
+ self.assertEqual(
+ len(r),
+ len(self.control_dict) - 1,
+ msg="can't del in ReadOnlyDict when unlocked",
+ )
+
+ def test_clear(self):
+ r = self.get_unlocked_ROD()
+ r.clear()
+ self.assertEqual(r, {}, msg="can't clear() ReadOnlyDict when unlocked")
+
+ def test_set_default(self):
+ r = self.get_unlocked_ROD()
+ for key in self.control_dict.keys():
+ r.setdefault(key, self.control_dict[key])
+ self.assertEqual(
+ r, self.control_dict, msg="can't setdefault() ReadOnlyDict when unlocked"
+ )
+
+ def test_locked_set(self):
+ r = self.get_locked_ROD()
+ # TODO use |with self.assertRaises(AssertionError):| if/when we're
+ # all on 2.7.
+ try:
+ r["e"] = 2
+ except AssertionError:
+ pass
+ else:
+ self.assertEqual(0, 1, msg="can set r['e'] when locked")
+
+ def test_locked_del(self):
+ r = self.get_locked_ROD()
+ try:
+ del r["e"]
+ except AssertionError:
+ pass
+ else:
+ self.assertEqual(0, 1, "can del r['e'] when locked")
+
+ def test_locked_popitem(self):
+ r = self.get_locked_ROD()
+ self.assertRaises(AssertionError, r.popitem)
+
+ def test_locked_update(self):
+ r = self.get_locked_ROD()
+ self.assertRaises(AssertionError, r.update, {})
+
+ def test_locked_set_default(self):
+ r = self.get_locked_ROD()
+ self.assertRaises(AssertionError, r.setdefault, {})
+
+ def test_locked_pop(self):
+ r = self.get_locked_ROD()
+ self.assertRaises(AssertionError, r.pop)
+
+ def test_locked_clear(self):
+ r = self.get_locked_ROD()
+ self.assertRaises(AssertionError, r.clear)
+
+ def test_locked_second_level_dict_pop(self):
+ r = self.get_locked_ROD()
+ self.assertRaises(AssertionError, r["c"].update, {})
+
+ def test_locked_second_level_list_pop(self):
+ r = self.get_locked_ROD()
+ with self.assertRaises(AttributeError):
+ r["e"].pop()
+
+ def test_locked_third_level_mutate(self):
+ r = self.get_locked_ROD()
+ with self.assertRaises(AttributeError):
+ r["d"]["turtles"].append("turtle2")
+
+ def test_locked_object_in_tuple_mutate(self):
+ r = self.get_locked_ROD()
+ with self.assertRaises(AttributeError):
+ r["e"][2]["turtles"].append("turtle2")
+
+ def test_locked_second_level_dict_pop2(self):
+ r = self.get_locked_ROD()
+ self.assertRaises(AssertionError, r["c"].update, {})
+
+ def test_locked_second_level_list_pop2(self):
+ r = self.get_locked_ROD()
+ with self.assertRaises(AttributeError):
+ r["e"].pop()
+
+ def test_locked_third_level_mutate2(self):
+ r = self.get_locked_ROD()
+ with self.assertRaises(AttributeError):
+ r["d"]["turtles"].append("turtle2")
+
+ def test_locked_object_in_tuple_mutate2(self):
+ r = self.get_locked_ROD()
+ with self.assertRaises(AttributeError):
+ r["e"][2]["turtles"].append("turtle2")
+
+ def test_locked_deepcopy_set(self):
+ r = self.get_locked_ROD()
+ c = deepcopy(r)
+ c["e"] = "hey"
+ self.assertEqual(c["e"], "hey", "can't set var in ROD after deepcopy")
+
+
+class TestActions(unittest.TestCase):
+ all_actions = ["a", "b", "c", "d", "e"]
+ default_actions = ["b", "c", "d"]
+
+ def test_verify_actions(self):
+ c = config.BaseConfig(initial_config_file="test/test.json")
+ try:
+ c.verify_actions(["not_a_real_action"])
+ except SystemExit:
+ pass
+ else:
+ self.assertEqual(0, 1, msg="verify_actions() didn't die on invalid action")
+ c = config.BaseConfig(initial_config_file="test/test.json")
+ returned_actions = c.verify_actions(c.all_actions)
+ self.assertEqual(
+ c.all_actions,
+ returned_actions,
+ msg="returned actions from verify_actions() changed",
+ )
+
+ def test_default_actions(self):
+ c = config.BaseConfig(
+ default_actions=self.default_actions,
+ all_actions=self.all_actions,
+ initial_config_file="test/test.json",
+ )
+ self.assertEqual(
+ self.default_actions, c.get_actions(), msg="default_actions broken"
+ )
+
+ def test_no_action1(self):
+ c = config.BaseConfig(
+ default_actions=self.default_actions,
+ all_actions=self.all_actions,
+ initial_config_file="test/test.json",
+ )
+ c.parse_args(args=["foo", "--no-action", "a"])
+ self.assertEqual(
+ self.default_actions, c.get_actions(), msg="--no-ACTION broken"
+ )
+
+ def test_no_action2(self):
+ c = config.BaseConfig(
+ default_actions=self.default_actions,
+ all_actions=self.all_actions,
+ initial_config_file="test/test.json",
+ )
+ c.parse_args(args=["foo", "--no-c"])
+ self.assertEqual(["b", "d"], c.get_actions(), msg="--no-ACTION broken")
+
+ def test_add_action(self):
+ c = config.BaseConfig(
+ default_actions=self.default_actions,
+ all_actions=self.all_actions,
+ initial_config_file="test/test.json",
+ )
+ c.parse_args(args=["foo", "--add-action", "e"])
+ self.assertEqual(
+ ["b", "c", "d", "e"], c.get_actions(), msg="--add-action ACTION broken"
+ )
+
+ def test_only_action(self):
+ c = config.BaseConfig(
+ default_actions=self.default_actions,
+ all_actions=self.all_actions,
+ initial_config_file="test/test.json",
+ )
+ c.parse_args(args=["foo", "--a", "--e"])
+ self.assertEqual(["a", "e"], c.get_actions(), msg="--ACTION broken")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/testing/mozharness/test/test_base_diskutils.py b/testing/mozharness/test/test_base_diskutils.py
new file mode 100644
index 0000000000..9c5ac59083
--- /dev/null
+++ b/testing/mozharness/test/test_base_diskutils.py
@@ -0,0 +1,89 @@
+import unittest
+from unittest import mock
+
+from mozharness.base.diskutils import DiskInfo, DiskSize, DiskutilsError, convert_to
+
+
+class TestDiskutils(unittest.TestCase):
+ def test_convert_to(self):
+ # 0 is 0 regardless from_unit/to_unit
+ self.assertTrue(convert_to(size=0, from_unit="GB", to_unit="MB") == 0)
+ size = 524288 # 512 * 1024
+ # converting from/to same unit
+ self.assertTrue(convert_to(size=size, from_unit="MB", to_unit="MB") == size)
+
+ self.assertTrue(convert_to(size=size, from_unit="MB", to_unit="GB") == 512)
+
+ self.assertRaises(
+ DiskutilsError,
+ lambda: convert_to(size="a string", from_unit="MB", to_unit="MB"),
+ )
+ self.assertRaises(
+ DiskutilsError, lambda: convert_to(size=0, from_unit="foo", to_unit="MB")
+ )
+ self.assertRaises(
+ DiskutilsError, lambda: convert_to(size=0, from_unit="MB", to_unit="foo")
+ )
+
+
+class TestDiskInfo(unittest.TestCase):
+ def testDiskinfo_to(self):
+ di = DiskInfo()
+ self.assertTrue(di.unit == "bytes")
+ self.assertTrue(di.free == 0)
+ self.assertTrue(di.used == 0)
+ self.assertTrue(di.total == 0)
+ # convert to GB
+ di._to("GB")
+ self.assertTrue(di.unit == "GB")
+ self.assertTrue(di.free == 0)
+ self.assertTrue(di.used == 0)
+ self.assertTrue(di.total == 0)
+
+
+class MockStatvfs(object):
+ def __init__(self):
+ self.f_bsize = 0
+ self.f_frsize = 0
+ self.f_blocks = 0
+ self.f_bfree = 0
+ self.f_bavail = 0
+ self.f_files = 0
+ self.f_ffree = 0
+ self.f_favail = 0
+ self.f_flag = 0
+ self.f_namemax = 0
+
+
+class TestDiskSpace(unittest.TestCase):
+ @mock.patch("mozharness.base.diskutils.os")
+ def testDiskSpacePosix(self, mock_os):
+ ds = MockStatvfs()
+ mock_os.statvfs.return_value = ds
+ di = DiskSize()._posix_size("/")
+ self.assertTrue(di.unit == "bytes")
+ self.assertTrue(di.free == 0)
+ self.assertTrue(di.used == 0)
+ self.assertTrue(di.total == 0)
+
+ @mock.patch("mozharness.base.diskutils.ctypes")
+ def testDiskSpaceWindows(self, mock_ctypes):
+ mock_ctypes.windll.kernel32.GetDiskFreeSpaceExA.return_value = 0
+ mock_ctypes.windll.kernel32.GetDiskFreeSpaceExW.return_value = 0
+ di = DiskSize()._windows_size("/c/")
+ self.assertTrue(di.unit == "bytes")
+ self.assertTrue(di.free == 0)
+ self.assertTrue(di.used == 0)
+ self.assertTrue(di.total == 0)
+
+ @mock.patch("mozharness.base.diskutils.os")
+ @mock.patch("mozharness.base.diskutils.ctypes")
+ def testUnspportedPlafrom(self, mock_ctypes, mock_os):
+ mock_os.statvfs.side_effect = AttributeError("")
+ self.assertRaises(AttributeError, lambda: DiskSize()._posix_size("/"))
+ mock_ctypes.windll.kernel32.GetDiskFreeSpaceExW.side_effect = AttributeError("")
+ mock_ctypes.windll.kernel32.GetDiskFreeSpaceExA.side_effect = AttributeError("")
+ self.assertRaises(AttributeError, lambda: DiskSize()._windows_size("/"))
+ self.assertRaises(
+ DiskutilsError, lambda: DiskSize().get_size(path="/", unit="GB")
+ )
diff --git a/testing/mozharness/test/test_base_log.py b/testing/mozharness/test/test_base_log.py
new file mode 100644
index 0000000000..769cb1e390
--- /dev/null
+++ b/testing/mozharness/test/test_base_log.py
@@ -0,0 +1,43 @@
+import os
+import shutil
+import unittest
+
+import mozharness.base.log as log
+
+tmp_dir = "test_log_dir"
+log_name = "test"
+
+
+def clean_log_dir():
+ if os.path.exists(tmp_dir):
+ shutil.rmtree(tmp_dir)
+
+
+def get_log_file_path(level=None):
+ if level:
+ return os.path.join(tmp_dir, "%s_%s.log" % (log_name, level))
+ return os.path.join(tmp_dir, "%s.log" % log_name)
+
+
+class TestLog(unittest.TestCase):
+ def setUp(self):
+ clean_log_dir()
+
+ def tearDown(self):
+ clean_log_dir()
+
+ def test_log_dir(self):
+ fh = open(tmp_dir, "w")
+ fh.write("foo")
+ fh.close()
+ l = log.SimpleFileLogger(
+ log_dir=tmp_dir, log_name=log_name, log_to_console=False
+ )
+ self.assertTrue(os.path.exists(tmp_dir))
+ l.log_message("blah")
+ self.assertTrue(os.path.exists(get_log_file_path()))
+ del l
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/testing/mozharness/test/test_base_parallel.py b/testing/mozharness/test/test_base_parallel.py
new file mode 100644
index 0000000000..272d84484d
--- /dev/null
+++ b/testing/mozharness/test/test_base_parallel.py
@@ -0,0 +1,28 @@
+import unittest
+
+from mozharness.base.parallel import ChunkingMixin
+
+
+class TestChunkingMixin(unittest.TestCase):
+ def setUp(self):
+ self.c = ChunkingMixin()
+
+ def test_one_chunk(self):
+ self.assertEqual(self.c.query_chunked_list([1, 3, 2], 1, 1), [1, 3, 2])
+
+ def test_sorted(self):
+ self.assertEqual(
+ self.c.query_chunked_list([1, 3, 2], 1, 1, sort=True), [1, 2, 3]
+ )
+
+ def test_first_chunk(self):
+ self.assertEqual(self.c.query_chunked_list([4, 5, 4, 3], 1, 2), [4, 5])
+
+ def test_last_chunk(self):
+ self.assertEqual(self.c.query_chunked_list([1, 4, 5, 7, 5, 6], 3, 3), [5, 6])
+
+ def test_not_evenly_divisble(self):
+ thing = [1, 3, 6, 4, 3, 2, 6]
+ self.assertEqual(self.c.query_chunked_list(thing, 1, 3), [1, 3, 6])
+ self.assertEqual(self.c.query_chunked_list(thing, 2, 3), [4, 3])
+ self.assertEqual(self.c.query_chunked_list(thing, 3, 3), [2, 6])
diff --git a/testing/mozharness/test/test_base_python.py b/testing/mozharness/test/test_base_python.py
new file mode 100644
index 0000000000..c05f704ef6
--- /dev/null
+++ b/testing/mozharness/test/test_base_python.py
@@ -0,0 +1,39 @@
+import os
+import unittest
+
+import mozharness.base.python as python
+
+here = os.path.dirname(os.path.abspath(__file__))
+
+
+class TestVirtualenvMixin(unittest.TestCase):
+ def test_package_versions(self):
+ example = os.path.join(here, "pip-freeze.example.txt")
+ output = open(example).read()
+ mixin = python.VirtualenvMixin()
+ packages = mixin.package_versions(output)
+
+ # from the file
+ expected = {
+ "MakeItSo": "0.2.6",
+ "PyYAML": "3.10",
+ "Tempita": "0.5.1",
+ "WebOb": "1.2b3",
+ "coverage": "3.5.1",
+ "logilab-astng": "0.23.1",
+ "logilab-common": "0.57.1",
+ "mozdevice": "0.2",
+ "mozhttpd": "0.3",
+ "mozinfo": "0.3.3",
+ "nose": "1.1.2",
+ "pyflakes": "0.5.0",
+ "pylint": "0.25.1",
+ "virtualenv": "1.7.1.2",
+ "wsgiref": "0.1.2",
+ }
+
+ self.assertEqual(packages, expected)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/testing/mozharness/test/test_base_script.py b/testing/mozharness/test/test_base_script.py
new file mode 100644
index 0000000000..8acb630053
--- /dev/null
+++ b/testing/mozharness/test/test_base_script.py
@@ -0,0 +1,983 @@
+import gc
+import os
+import re
+import shutil
+import tempfile
+import types
+import unittest
+from unittest import mock
+
+PYWIN32 = False
+if os.name == "nt":
+ try:
+ import win32file
+
+ PYWIN32 = True
+ except ImportError:
+ pass
+
+
+import mozharness.base.errors as errors
+import mozharness.base.log as log
+import mozharness.base.script as script
+from mozharness.base.config import parse_config_file
+from mozharness.base.log import CRITICAL, DEBUG, ERROR, FATAL, IGNORE, INFO, WARNING
+
+here = os.path.dirname(os.path.abspath(__file__))
+
+test_string = """foo
+bar
+baz"""
+
+
+class CleanupObj(script.ScriptMixin, log.LogMixin):
+ def __init__(self):
+ super(CleanupObj, self).__init__()
+ self.log_obj = None
+ self.config = {"log_level": ERROR}
+
+
+def cleanup(files=None):
+ files = files or []
+ files.extend(("test_logs", "test_dir", "tmpfile_stdout", "tmpfile_stderr"))
+ gc.collect()
+ c = CleanupObj()
+ for f in files:
+ c.rmtree(f)
+
+
+def get_debug_script_obj():
+ s = script.BaseScript(
+ config={"log_type": "multi", "log_level": DEBUG},
+ initial_config_file="test/test.json",
+ )
+ return s
+
+
+def _post_fatal(self, **kwargs):
+ fh = open("tmpfile_stdout", "w")
+ print(test_string, file=fh)
+ fh.close()
+
+
+# TestScript {{{1
+class TestScript(unittest.TestCase):
+ def setUp(self):
+ cleanup()
+ self.s = None
+ self.tmpdir = tempfile.mkdtemp(suffix=".mozharness")
+
+ def tearDown(self):
+ # Close the logfile handles, or windows can't remove the logs
+ if hasattr(self, "s") and isinstance(self.s, object):
+ del self.s
+ cleanup([self.tmpdir])
+
+ # test _dump_config_hierarchy() when --dump-config-hierarchy is passed
+ def test_dump_config_hierarchy_valid_files_len(self):
+ try:
+ self.s = script.BaseScript(
+ initial_config_file="test/test.json",
+ option_args=["--cfg", "test/test_override.py,test/test_override2.py"],
+ config={"dump_config_hierarchy": True},
+ )
+ except SystemExit:
+ local_cfg_files = parse_config_file("test_logs/localconfigfiles.json")
+ # first let's see if the correct number of config files were
+ # realized
+ self.assertEqual(
+ len(local_cfg_files),
+ 4,
+ msg="--dump-config-hierarchy dumped wrong number of config files",
+ )
+
+ def test_dump_config_hierarchy_keys_unique_and_valid(self):
+ try:
+ self.s = script.BaseScript(
+ initial_config_file="test/test.json",
+ option_args=["--cfg", "test/test_override.py,test/test_override2.py"],
+ config={"dump_config_hierarchy": True},
+ )
+ except SystemExit:
+ local_cfg_files = parse_config_file("test_logs/localconfigfiles.json")
+ # now let's see if only unique items were added from each config
+ t_override = local_cfg_files.get("test/test_override.py", {})
+ self.assertTrue(
+ t_override.get("keep_string") == "don't change me"
+ and len(t_override.keys()) == 1,
+ msg="--dump-config-hierarchy dumped wrong keys/value for "
+ "`test/test_override.py`. There should only be one "
+ "item and it should be unique to all the other "
+ "items in test_log/localconfigfiles.json.",
+ )
+
+ def test_dump_config_hierarchy_matches_self_config(self):
+ try:
+ ######
+ # we need temp_cfg because self.s will be gcollected (NoneType) by
+ # the time we get to SystemExit exception
+ # temp_cfg will differ from self.s.config because of
+ # 'dump_config_hierarchy'. we have to make a deepcopy because
+ # config is a locked dict
+ temp_s = script.BaseScript(
+ initial_config_file="test/test.json",
+ option_args=["--cfg", "test/test_override.py,test/test_override2.py"],
+ )
+ from copy import deepcopy
+
+ temp_cfg = deepcopy(temp_s.config)
+ temp_cfg.update({"dump_config_hierarchy": True})
+ ######
+ self.s = script.BaseScript(
+ initial_config_file="test/test.json",
+ option_args=["--cfg", "test/test_override.py,test/test_override2.py"],
+ config={"dump_config_hierarchy": True},
+ )
+ except SystemExit:
+ local_cfg_files = parse_config_file("test_logs/localconfigfiles.json")
+ # finally let's just make sure that all the items added up, equals
+ # what we started with: self.config
+ target_cfg = {}
+ for cfg_file in local_cfg_files:
+ target_cfg.update(local_cfg_files[cfg_file])
+ self.assertEqual(
+ target_cfg,
+ temp_cfg,
+ msg="all of the items (combined) in each cfg file dumped via "
+ "--dump-config-hierarchy does not equal self.config ",
+ )
+
+ # test _dump_config() when --dump-config is passed
+ def test_dump_config_equals_self_config(self):
+ try:
+ ######
+ # we need temp_cfg because self.s will be gcollected (NoneType) by
+ # the time we get to SystemExit exception
+ # temp_cfg will differ from self.s.config because of
+ # 'dump_config_hierarchy'. we have to make a deepcopy because
+ # config is a locked dict
+ temp_s = script.BaseScript(
+ initial_config_file="test/test.json",
+ option_args=["--cfg", "test/test_override.py,test/test_override2.py"],
+ )
+ from copy import deepcopy
+
+ temp_cfg = deepcopy(temp_s.config)
+ temp_cfg.update({"dump_config": True})
+ ######
+ self.s = script.BaseScript(
+ initial_config_file="test/test.json",
+ option_args=["--cfg", "test/test_override.py,test/test_override2.py"],
+ config={"dump_config": True},
+ )
+ except SystemExit:
+ target_cfg = parse_config_file("test_logs/localconfig.json")
+ self.assertEqual(
+ target_cfg,
+ temp_cfg,
+ msg="all of the items (combined) in each cfg file dumped via "
+ "--dump-config does not equal self.config ",
+ )
+
+ def test_nonexistent_mkdir_p(self):
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ self.s.mkdir_p("test_dir/foo/bar/baz")
+ self.assertTrue(os.path.isdir("test_dir/foo/bar/baz"), msg="mkdir_p error")
+
+ def test_existing_mkdir_p(self):
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ os.makedirs("test_dir/foo/bar/baz")
+ self.s.mkdir_p("test_dir/foo/bar/baz")
+ self.assertTrue(
+ os.path.isdir("test_dir/foo/bar/baz"), msg="mkdir_p error when dir exists"
+ )
+
+ def test_chdir(self):
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ cwd = os.getcwd()
+ self.s.chdir("test_logs")
+ self.assertEqual(os.path.join(cwd, "test_logs"), os.getcwd(), msg="chdir error")
+ self.s.chdir(cwd)
+
+ def _test_log_helper(self, obj):
+ obj.debug("Testing DEBUG")
+ obj.warning("Testing WARNING")
+ obj.error("Testing ERROR")
+ obj.critical("Testing CRITICAL")
+ try:
+ obj.fatal("Testing FATAL")
+ except SystemExit:
+ pass
+ else:
+ self.assertTrue(False, msg="fatal() didn't SystemExit!")
+
+ def test_log(self):
+ self.s = get_debug_script_obj()
+ self.s.log_obj = None
+ self._test_log_helper(self.s)
+ del self.s
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ self._test_log_helper(self.s)
+
+ def test_run_nonexistent_command(self):
+ self.s = get_debug_script_obj()
+ self.s.run_command(
+ command="this_cmd_should_not_exist --help",
+ env={"GARBLE": "FARG"},
+ error_list=errors.PythonErrorList,
+ )
+ error_logsize = os.path.getsize("test_logs/test_info.log")
+ self.assertTrue(error_logsize > 0, msg="command not found error not hit")
+
+ def test_run_command_in_bad_dir(self):
+ self.s = get_debug_script_obj()
+ self.s.run_command(
+ command="ls",
+ cwd="/this_dir_should_not_exist",
+ error_list=errors.PythonErrorList,
+ )
+ error_logsize = os.path.getsize("test_logs/test_error.log")
+ self.assertTrue(error_logsize > 0, msg="bad dir error not hit")
+
+ def test_get_output_from_command_in_bad_dir(self):
+ self.s = get_debug_script_obj()
+ self.s.get_output_from_command(command="ls", cwd="/this_dir_should_not_exist")
+ error_logsize = os.path.getsize("test_logs/test_error.log")
+ self.assertTrue(error_logsize > 0, msg="bad dir error not hit")
+
+ def test_get_output_from_command_with_missing_file(self):
+ self.s = get_debug_script_obj()
+ self.s.get_output_from_command(command="ls /this_file_should_not_exist")
+ error_logsize = os.path.getsize("test_logs/test_error.log")
+ self.assertTrue(error_logsize > 0, msg="bad file error not hit")
+
+ def test_get_output_from_command_with_missing_file2(self):
+ self.s = get_debug_script_obj()
+ self.s.run_command(
+ command="cat mozharness/base/errors.py",
+ error_list=[
+ {"substr": "error", "level": ERROR},
+ {
+ "regex": re.compile(",$"),
+ "level": IGNORE,
+ },
+ {
+ "substr": "]$",
+ "level": WARNING,
+ },
+ ],
+ )
+ error_logsize = os.path.getsize("test_logs/test_error.log")
+ self.assertTrue(error_logsize > 0, msg="error list not working properly")
+
+ def test_download_unpack(self):
+ # NOTE: The action is called *download*, however, it can work for files in disk
+ self.s = get_debug_script_obj()
+
+ archives_path = os.path.join(here, "helper_files", "archives")
+
+ # Test basic decompression
+ for archive in (
+ "archive.tar",
+ "archive.tar.bz2",
+ "archive.tar.gz",
+ "archive.zip",
+ ):
+ self.s.download_unpack(
+ url=os.path.join(archives_path, archive), extract_to=self.tmpdir
+ )
+ self.assertIn("script.sh", os.listdir(os.path.join(self.tmpdir, "bin")))
+ self.assertIn("lorem.txt", os.listdir(self.tmpdir))
+ shutil.rmtree(self.tmpdir)
+
+ # Test permissions for extracted entries from zip archive
+ self.s.download_unpack(
+ url=os.path.join(archives_path, "archive.zip"),
+ extract_to=self.tmpdir,
+ )
+ file_stats = os.stat(os.path.join(self.tmpdir, "bin", "script.sh"))
+ orig_fstats = os.stat(
+ os.path.join(archives_path, "reference", "bin", "script.sh")
+ )
+ self.assertEqual(file_stats.st_mode, orig_fstats.st_mode)
+ shutil.rmtree(self.tmpdir)
+
+ # Test unzip specific dirs only
+ self.s.download_unpack(
+ url=os.path.join(archives_path, "archive.zip"),
+ extract_to=self.tmpdir,
+ extract_dirs=["bin/*"],
+ )
+ self.assertIn("bin", os.listdir(self.tmpdir))
+ self.assertNotIn("lorem.txt", os.listdir(self.tmpdir))
+ shutil.rmtree(self.tmpdir)
+
+ # Test for invalid filenames (Windows only)
+ if PYWIN32:
+ with self.assertRaises(IOError):
+ self.s.download_unpack(
+ url=os.path.join(archives_path, "archive_invalid_filename.zip"),
+ extract_to=self.tmpdir,
+ )
+
+ for archive in (
+ "archive-setuid.tar",
+ "archive-escape.tar",
+ "archive-link.tar",
+ "archive-link-abs.tar",
+ "archive-double-link.tar",
+ ):
+ with self.assertRaises(Exception):
+ self.s.download_unpack(
+ url=os.path.join(archives_path, archive),
+ extract_to=self.tmpdir,
+ )
+
+ def test_unpack(self):
+ self.s = get_debug_script_obj()
+
+ archives_path = os.path.join(here, "helper_files", "archives")
+
+ # Test basic decompression
+ for archive in (
+ "archive.tar",
+ "archive.tar.bz2",
+ "archive.tar.gz",
+ "archive.zip",
+ ):
+ self.s.unpack(os.path.join(archives_path, archive), self.tmpdir)
+ self.assertIn("script.sh", os.listdir(os.path.join(self.tmpdir, "bin")))
+ self.assertIn("lorem.txt", os.listdir(self.tmpdir))
+ shutil.rmtree(self.tmpdir)
+
+ # Test permissions for extracted entries from zip archive
+ self.s.unpack(os.path.join(archives_path, "archive.zip"), self.tmpdir)
+ file_stats = os.stat(os.path.join(self.tmpdir, "bin", "script.sh"))
+ orig_fstats = os.stat(
+ os.path.join(archives_path, "reference", "bin", "script.sh")
+ )
+ self.assertEqual(file_stats.st_mode, orig_fstats.st_mode)
+ shutil.rmtree(self.tmpdir)
+
+ # Test extract specific dirs only
+ self.s.unpack(
+ os.path.join(archives_path, "archive.zip"),
+ self.tmpdir,
+ extract_dirs=["bin/*"],
+ )
+ self.assertIn("bin", os.listdir(self.tmpdir))
+ self.assertNotIn("lorem.txt", os.listdir(self.tmpdir))
+ shutil.rmtree(self.tmpdir)
+
+ # Test for invalid filenames (Windows only)
+ if PYWIN32:
+ with self.assertRaises(IOError):
+ self.s.unpack(
+ os.path.join(archives_path, "archive_invalid_filename.zip"),
+ self.tmpdir,
+ )
+
+ for archive in (
+ "archive-setuid.tar",
+ "archive-escape.tar",
+ "archive-link.tar",
+ "archive-link-abs.tar",
+ "archive-double-link.tar",
+ ):
+ with self.assertRaises(Exception):
+ self.s.unpack(os.path.join(archives_path, archive), self.tmpdir)
+
+
+# TestHelperFunctions {{{1
+class TestHelperFunctions(unittest.TestCase):
+ temp_file = "test_dir/mozilla"
+
+ def setUp(self):
+ cleanup()
+ self.s = None
+
+ def tearDown(self):
+ # Close the logfile handles, or windows can't remove the logs
+ if hasattr(self, "s") and isinstance(self.s, object):
+ del self.s
+ cleanup()
+
+ def _create_temp_file(self, contents=test_string):
+ os.mkdir("test_dir")
+ fh = open(self.temp_file, "w+")
+ fh.write(contents)
+ fh.close
+
+ def test_mkdir_p(self):
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ self.s.mkdir_p("test_dir")
+ self.assertTrue(os.path.isdir("test_dir"), msg="mkdir_p error")
+
+ def test_get_output_from_command(self):
+ self._create_temp_file()
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ contents = self.s.get_output_from_command(
+ ["bash", "-c", "cat %s" % self.temp_file]
+ )
+ self.assertEqual(
+ test_string,
+ contents,
+ msg="get_output_from_command('cat file') differs from fh.write",
+ )
+
+ def test_run_command(self):
+ self._create_temp_file()
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ temp_file_name = os.path.basename(self.temp_file)
+ self.assertEqual(
+ self.s.run_command("cat %s" % temp_file_name, cwd="test_dir"),
+ 0,
+ msg="run_command('cat file') did not exit 0",
+ )
+
+ def test_move1(self):
+ self._create_temp_file()
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ temp_file2 = "%s2" % self.temp_file
+ self.s.move(self.temp_file, temp_file2)
+ self.assertFalse(
+ os.path.exists(self.temp_file),
+ msg="%s still exists after move()" % self.temp_file,
+ )
+
+ def test_move2(self):
+ self._create_temp_file()
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ temp_file2 = "%s2" % self.temp_file
+ self.s.move(self.temp_file, temp_file2)
+ self.assertTrue(
+ os.path.exists(temp_file2), msg="%s doesn't exist after move()" % temp_file2
+ )
+
+ def test_copyfile(self):
+ self._create_temp_file()
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ temp_file2 = "%s2" % self.temp_file
+ self.s.copyfile(self.temp_file, temp_file2)
+ self.assertEqual(
+ os.path.getsize(self.temp_file),
+ os.path.getsize(temp_file2),
+ msg="%s and %s are different sizes after copyfile()"
+ % (self.temp_file, temp_file2),
+ )
+
+ def test_existing_rmtree(self):
+ self._create_temp_file()
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ self.s.mkdir_p("test_dir/foo/bar/baz")
+ self.s.rmtree("test_dir")
+ self.assertFalse(os.path.exists("test_dir"), msg="rmtree unsuccessful")
+
+ def test_nonexistent_rmtree(self):
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ status = self.s.rmtree("test_dir")
+ self.assertFalse(status, msg="nonexistent rmtree error")
+
+ @unittest.skipUnless(PYWIN32, "PyWin32 specific")
+ def test_long_dir_rmtree(self):
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ # create a very long path that the command-prompt cannot delete
+ # by using unicode format (max path length 32000)
+ path = u"\\\\?\\%s\\test_dir" % os.getcwd()
+ win32file.CreateDirectoryExW(u".", path)
+
+ for x in range(0, 20):
+ print("path=%s" % path)
+ path = path + u"\\%sxxxxxxxxxxxxxxxxxxxx" % x
+ win32file.CreateDirectoryExW(u".", path)
+ self.s.rmtree("test_dir")
+ self.assertFalse(os.path.exists("test_dir"), msg="rmtree unsuccessful")
+
+ @unittest.skipUnless(PYWIN32, "PyWin32 specific")
+ def test_chmod_rmtree(self):
+ self._create_temp_file()
+ win32file.SetFileAttributesW(self.temp_file, win32file.FILE_ATTRIBUTE_READONLY)
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ self.s.rmtree("test_dir")
+ self.assertFalse(os.path.exists("test_dir"), msg="rmtree unsuccessful")
+
+ @unittest.skipIf(os.name == "nt", "Not for Windows")
+ def test_chmod(self):
+ self._create_temp_file()
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ self.s.chmod(self.temp_file, 0o100700)
+ self.assertEqual(os.stat(self.temp_file)[0], 33216, msg="chmod unsuccessful")
+
+ def test_env_normal(self):
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ script_env = self.s.query_env()
+ self.assertEqual(
+ script_env,
+ os.environ,
+ msg="query_env() != env\n%s\n%s" % (script_env, os.environ),
+ )
+
+ def test_env_normal2(self):
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ self.s.query_env()
+ script_env = self.s.query_env()
+ self.assertEqual(
+ script_env,
+ os.environ,
+ msg="Second query_env() != env\n%s\n%s" % (script_env, os.environ),
+ )
+
+ def test_env_partial(self):
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ script_env = self.s.query_env(partial_env={"foo": "bar"})
+ self.assertTrue("foo" in script_env and script_env["foo"] == "bar")
+
+ def test_env_path(self):
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ partial_path = "yaddayadda:%(PATH)s"
+ full_path = partial_path % {"PATH": os.environ["PATH"]}
+ script_env = self.s.query_env(partial_env={"PATH": partial_path})
+ self.assertEqual(script_env["PATH"], full_path)
+
+ def test_query_exe(self):
+ self.s = script.BaseScript(
+ initial_config_file="test/test.json",
+ config={"exes": {"foo": "bar"}},
+ )
+ path = self.s.query_exe("foo")
+ self.assertEqual(path, "bar")
+
+ def test_query_exe_string_replacement(self):
+ self.s = script.BaseScript(
+ initial_config_file="test/test.json",
+ config={
+ "base_work_dir": "foo",
+ "work_dir": "bar",
+ "exes": {"foo": os.path.join("%(abs_work_dir)s", "baz")},
+ },
+ )
+ path = self.s.query_exe("foo")
+ self.assertEqual(path, os.path.join("foo", "bar", "baz"))
+
+ def test_read_from_file(self):
+ self._create_temp_file()
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ contents = self.s.read_from_file(self.temp_file)
+ self.assertEqual(contents, test_string)
+
+ def test_read_from_nonexistent_file(self):
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+ contents = self.s.read_from_file("nonexistent_file!!!")
+ self.assertEqual(contents, None)
+
+
+# TestScriptLogging {{{1
+class TestScriptLogging(unittest.TestCase):
+ # I need a log watcher helper function, here and in test_log.
+ def setUp(self):
+ cleanup()
+ self.s = None
+
+ def tearDown(self):
+ # Close the logfile handles, or windows can't remove the logs
+ if hasattr(self, "s") and isinstance(self.s, object):
+ del self.s
+ cleanup()
+
+ def test_info_logsize(self):
+ self.s = script.BaseScript(
+ config={"log_type": "multi"}, initial_config_file="test/test.json"
+ )
+ info_logsize = os.path.getsize("test_logs/test_info.log")
+ self.assertTrue(info_logsize > 0, msg="initial info logfile missing/size 0")
+
+ def test_add_summary_info(self):
+ self.s = script.BaseScript(
+ config={"log_type": "multi"}, initial_config_file="test/test.json"
+ )
+ info_logsize = os.path.getsize("test_logs/test_info.log")
+ self.s.add_summary("one")
+ info_logsize2 = os.path.getsize("test_logs/test_info.log")
+ self.assertTrue(
+ info_logsize < info_logsize2, msg="add_summary() info not logged"
+ )
+
+ def test_add_summary_warning(self):
+ self.s = script.BaseScript(
+ config={"log_type": "multi"}, initial_config_file="test/test.json"
+ )
+ warning_logsize = os.path.getsize("test_logs/test_warning.log")
+ self.s.add_summary("two", level=WARNING)
+ warning_logsize2 = os.path.getsize("test_logs/test_warning.log")
+ self.assertTrue(
+ warning_logsize < warning_logsize2,
+ msg="add_summary(level=%s) not logged in warning log" % WARNING,
+ )
+
+ def test_summary(self):
+ self.s = script.BaseScript(
+ config={"log_type": "multi"}, initial_config_file="test/test.json"
+ )
+ self.s.add_summary("one")
+ self.s.add_summary("two", level=WARNING)
+ info_logsize = os.path.getsize("test_logs/test_info.log")
+ warning_logsize = os.path.getsize("test_logs/test_warning.log")
+ self.s.summary()
+ info_logsize2 = os.path.getsize("test_logs/test_info.log")
+ warning_logsize2 = os.path.getsize("test_logs/test_warning.log")
+ msg = ""
+ if info_logsize >= info_logsize2:
+ msg += "summary() didn't log to info!\n"
+ if warning_logsize >= warning_logsize2:
+ msg += "summary() didn't log to warning!\n"
+ self.assertEqual(msg, "", msg=msg)
+
+ def _test_log_level(self, log_level, log_level_file_list):
+ self.s = script.BaseScript(
+ config={"log_type": "multi"}, initial_config_file="test/test.json"
+ )
+ if log_level != FATAL:
+ self.s.log("testing", level=log_level)
+ else:
+ self.s._post_fatal = types.MethodType(_post_fatal, self.s)
+ try:
+ self.s.fatal("testing")
+ except SystemExit:
+ contents = None
+ if os.path.exists("tmpfile_stdout"):
+ fh = open("tmpfile_stdout")
+ contents = fh.read()
+ fh.close()
+ self.assertEqual(contents.rstrip(), test_string, "_post_fatal failed!")
+ del self.s
+ msg = ""
+ for level in log_level_file_list:
+ log_path = "test_logs/test_%s.log" % level
+ if not os.path.exists(log_path):
+ msg += "%s doesn't exist!\n" % log_path
+ else:
+ filesize = os.path.getsize(log_path)
+ if not filesize > 0:
+ msg += "%s is size 0!\n" % log_path
+ self.assertEqual(msg, "", msg=msg)
+
+ def test_debug(self):
+ self._test_log_level(DEBUG, [])
+
+ def test_ignore(self):
+ self._test_log_level(IGNORE, [])
+
+ def test_info(self):
+ self._test_log_level(INFO, [INFO])
+
+ def test_warning(self):
+ self._test_log_level(WARNING, [INFO, WARNING])
+
+ def test_error(self):
+ self._test_log_level(ERROR, [INFO, WARNING, ERROR])
+
+ def test_critical(self):
+ self._test_log_level(CRITICAL, [INFO, WARNING, ERROR, CRITICAL])
+
+ def test_fatal(self):
+ self._test_log_level(FATAL, [INFO, WARNING, ERROR, CRITICAL, FATAL])
+
+
+# TestRetry {{{1
+class NewError(Exception):
+ pass
+
+
+class OtherError(Exception):
+ pass
+
+
+class TestRetry(unittest.TestCase):
+ def setUp(self):
+ self.ATTEMPT_N = 1
+ self.s = script.BaseScript(initial_config_file="test/test.json")
+
+ def tearDown(self):
+ # Close the logfile handles, or windows can't remove the logs
+ if hasattr(self, "s") and isinstance(self.s, object):
+ del self.s
+ cleanup()
+
+ def _succeedOnSecondAttempt(self, foo=None, exception=Exception):
+ if self.ATTEMPT_N == 2:
+ self.ATTEMPT_N += 1
+ return
+ self.ATTEMPT_N += 1
+ raise exception("Fail")
+
+ def _raiseCustomException(self):
+ return self._succeedOnSecondAttempt(exception=NewError)
+
+ def _alwaysPass(self):
+ self.ATTEMPT_N += 1
+ return True
+
+ def _mirrorArgs(self, *args, **kwargs):
+ return args, kwargs
+
+ def _alwaysFail(self):
+ raise Exception("Fail")
+
+ def testRetrySucceed(self):
+ # Will raise if anything goes wrong
+ self.s.retry(self._succeedOnSecondAttempt, attempts=2, sleeptime=0)
+
+ def testRetryFailWithoutCatching(self):
+ self.assertRaises(
+ Exception, self.s.retry, self._alwaysFail, sleeptime=0, exceptions=()
+ )
+
+ def testRetryFailEnsureRaisesLastException(self):
+ self.assertRaises(
+ SystemExit, self.s.retry, self._alwaysFail, sleeptime=0, error_level=FATAL
+ )
+
+ def testRetrySelectiveExceptionSucceed(self):
+ self.s.retry(
+ self._raiseCustomException,
+ attempts=2,
+ sleeptime=0,
+ retry_exceptions=(NewError,),
+ )
+
+ def testRetrySelectiveExceptionFail(self):
+ self.assertRaises(
+ NewError,
+ self.s.retry,
+ self._raiseCustomException,
+ attempts=2,
+ sleeptime=0,
+ retry_exceptions=(OtherError,),
+ )
+
+ # TODO: figure out a way to test that the sleep actually happened
+ def testRetryWithSleep(self):
+ self.s.retry(self._succeedOnSecondAttempt, attempts=2, sleeptime=1)
+
+ def testRetryOnlyRunOnce(self):
+ """Tests that retry() doesn't call the action again after success"""
+ self.s.retry(self._alwaysPass, attempts=3, sleeptime=0)
+ # self.ATTEMPT_N gets increased regardless of pass/fail
+ self.assertEqual(2, self.ATTEMPT_N)
+
+ def testRetryReturns(self):
+ ret = self.s.retry(self._alwaysPass, sleeptime=0)
+ self.assertEqual(ret, True)
+
+ def testRetryCleanupIsCalled(self):
+ cleanup = mock.Mock()
+ self.s.retry(self._succeedOnSecondAttempt, cleanup=cleanup, sleeptime=0)
+ self.assertEqual(cleanup.call_count, 1)
+
+ def testRetryArgsPassed(self):
+ args = (1, "two", 3)
+ kwargs = dict(foo="a", bar=7)
+ ret = self.s.retry(
+ self._mirrorArgs, args=args, kwargs=kwargs.copy(), sleeptime=0
+ )
+ print(ret)
+ self.assertEqual(ret[0], args)
+ self.assertEqual(ret[1], kwargs)
+
+
+class BaseScriptWithDecorators(script.BaseScript):
+ def __init__(self, *args, **kwargs):
+ super(BaseScriptWithDecorators, self).__init__(*args, **kwargs)
+
+ self.pre_run_1_args = []
+ self.raise_during_pre_run_1 = False
+ self.pre_action_1_args = []
+ self.raise_during_pre_action_1 = False
+ self.pre_action_2_args = []
+ self.pre_action_3_args = []
+ self.post_action_1_args = []
+ self.raise_during_post_action_1 = False
+ self.post_action_2_args = []
+ self.post_action_3_args = []
+ self.post_run_1_args = []
+ self.raise_during_post_run_1 = False
+ self.post_run_2_args = []
+ self.raise_during_build = False
+
+ @script.PreScriptRun
+ def pre_run_1(self, *args, **kwargs):
+ self.pre_run_1_args.append((args, kwargs))
+
+ if self.raise_during_pre_run_1:
+ raise Exception(self.raise_during_pre_run_1)
+
+ @script.PreScriptAction
+ def pre_action_1(self, *args, **kwargs):
+ self.pre_action_1_args.append((args, kwargs))
+
+ if self.raise_during_pre_action_1:
+ raise Exception(self.raise_during_pre_action_1)
+
+ @script.PreScriptAction
+ def pre_action_2(self, *args, **kwargs):
+ self.pre_action_2_args.append((args, kwargs))
+
+ @script.PreScriptAction("clobber")
+ def pre_action_3(self, *args, **kwargs):
+ self.pre_action_3_args.append((args, kwargs))
+
+ @script.PostScriptAction
+ def post_action_1(self, *args, **kwargs):
+ self.post_action_1_args.append((args, kwargs))
+
+ if self.raise_during_post_action_1:
+ raise Exception(self.raise_during_post_action_1)
+
+ @script.PostScriptAction
+ def post_action_2(self, *args, **kwargs):
+ self.post_action_2_args.append((args, kwargs))
+
+ @script.PostScriptAction("build")
+ def post_action_3(self, *args, **kwargs):
+ self.post_action_3_args.append((args, kwargs))
+
+ @script.PostScriptRun
+ def post_run_1(self, *args, **kwargs):
+ self.post_run_1_args.append((args, kwargs))
+
+ if self.raise_during_post_run_1:
+ raise Exception(self.raise_during_post_run_1)
+
+ @script.PostScriptRun
+ def post_run_2(self, *args, **kwargs):
+ self.post_run_2_args.append((args, kwargs))
+
+ def build(self):
+ if self.raise_during_build:
+ raise Exception(self.raise_during_build)
+
+
+class TestScriptDecorators(unittest.TestCase):
+ def setUp(self):
+ cleanup()
+ self.s = None
+
+ def tearDown(self):
+ if hasattr(self, "s") and isinstance(self.s, object):
+ del self.s
+
+ cleanup()
+
+ def test_decorators_registered(self):
+ self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
+
+ self.assertEqual(len(self.s._listeners["pre_run"]), 1)
+ self.assertEqual(len(self.s._listeners["pre_action"]), 3)
+ self.assertEqual(len(self.s._listeners["post_action"]), 3)
+ self.assertEqual(len(self.s._listeners["post_run"]), 2)
+
+ def test_pre_post_fired(self):
+ self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
+ self.s.run()
+
+ self.assertEqual(len(self.s.pre_run_1_args), 1)
+ self.assertEqual(len(self.s.pre_action_1_args), 2)
+ self.assertEqual(len(self.s.pre_action_2_args), 2)
+ self.assertEqual(len(self.s.pre_action_3_args), 1)
+ self.assertEqual(len(self.s.post_action_1_args), 2)
+ self.assertEqual(len(self.s.post_action_2_args), 2)
+ self.assertEqual(len(self.s.post_action_3_args), 1)
+ self.assertEqual(len(self.s.post_run_1_args), 1)
+
+ self.assertEqual(self.s.pre_run_1_args[0], ((), {}))
+
+ self.assertEqual(self.s.pre_action_1_args[0], (("clobber",), {}))
+ self.assertEqual(self.s.pre_action_1_args[1], (("build",), {}))
+
+ # pre_action_3 should only get called for the action it is registered
+ # with.
+ self.assertEqual(self.s.pre_action_3_args[0], (("clobber",), {}))
+
+ self.assertEqual(self.s.post_action_1_args[0][0], ("clobber",))
+ self.assertEqual(self.s.post_action_1_args[0][1], dict(success=True))
+ self.assertEqual(self.s.post_action_1_args[1][0], ("build",))
+ self.assertEqual(self.s.post_action_1_args[1][1], dict(success=True))
+
+ # post_action_3 should only get called for the action it is registered
+ # with.
+ self.assertEqual(self.s.post_action_3_args[0], (("build",), dict(success=True)))
+
+ self.assertEqual(self.s.post_run_1_args[0], ((), {}))
+
+ def test_post_always_fired(self):
+ self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
+ self.s.raise_during_build = "Testing post always fired."
+
+ with self.assertRaises(SystemExit):
+ self.s.run()
+
+ self.assertEqual(len(self.s.pre_run_1_args), 1)
+ self.assertEqual(len(self.s.pre_action_1_args), 2)
+ self.assertEqual(len(self.s.post_action_1_args), 2)
+ self.assertEqual(len(self.s.post_action_2_args), 2)
+ self.assertEqual(len(self.s.post_run_1_args), 1)
+ self.assertEqual(len(self.s.post_run_2_args), 1)
+
+ self.assertEqual(self.s.post_action_1_args[0][1], dict(success=True))
+ self.assertEqual(self.s.post_action_1_args[1][1], dict(success=False))
+ self.assertEqual(self.s.post_action_2_args[1][1], dict(success=False))
+
+ def test_pre_run_exception(self):
+ self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
+ self.s.raise_during_pre_run_1 = "Error during pre run 1"
+
+ with self.assertRaises(SystemExit):
+ self.s.run()
+
+ self.assertEqual(len(self.s.pre_run_1_args), 1)
+ self.assertEqual(len(self.s.pre_action_1_args), 0)
+ self.assertEqual(len(self.s.post_run_1_args), 1)
+ self.assertEqual(len(self.s.post_run_2_args), 1)
+
+ def test_pre_action_exception(self):
+ self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
+ self.s.raise_during_pre_action_1 = "Error during pre 1"
+
+ with self.assertRaises(SystemExit):
+ self.s.run()
+
+ self.assertEqual(len(self.s.pre_run_1_args), 1)
+ self.assertEqual(len(self.s.pre_action_1_args), 1)
+ self.assertEqual(len(self.s.pre_action_2_args), 0)
+ self.assertEqual(len(self.s.post_action_1_args), 1)
+ self.assertEqual(len(self.s.post_action_2_args), 1)
+ self.assertEqual(len(self.s.post_run_1_args), 1)
+ self.assertEqual(len(self.s.post_run_2_args), 1)
+
+ def test_post_action_exception(self):
+ self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
+ self.s.raise_during_post_action_1 = "Error during post 1"
+
+ with self.assertRaises(SystemExit):
+ self.s.run()
+
+ self.assertEqual(len(self.s.pre_run_1_args), 1)
+ self.assertEqual(len(self.s.post_action_1_args), 1)
+ self.assertEqual(len(self.s.post_action_2_args), 1)
+ self.assertEqual(len(self.s.post_run_1_args), 1)
+ self.assertEqual(len(self.s.post_run_2_args), 1)
+
+ def test_post_run_exception(self):
+ self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
+ self.s.raise_during_post_run_1 = "Error during post run 1"
+
+ with self.assertRaises(SystemExit):
+ self.s.run()
+
+ self.assertEqual(len(self.s.post_run_1_args), 1)
+ self.assertEqual(len(self.s.post_run_2_args), 1)
+
+
+# main {{{1
+if __name__ == "__main__":
+ unittest.main()
diff --git a/testing/mozharness/test/test_base_vcs_mercurial.py b/testing/mozharness/test/test_base_vcs_mercurial.py
new file mode 100644
index 0000000000..f00e0b586c
--- /dev/null
+++ b/testing/mozharness/test/test_base_vcs_mercurial.py
@@ -0,0 +1,396 @@
+import os
+import platform
+import shutil
+import tempfile
+import unittest
+
+import mozharness.base.vcs.mercurial as mercurial
+
+test_string = """foo
+bar
+baz"""
+
+HG = ["hg"] + mercurial.HG_OPTIONS
+
+# Known default .hgrc
+os.environ["HGRCPATH"] = os.path.abspath(
+ os.path.join(os.path.dirname(__file__), "helper_files", ".hgrc")
+)
+
+
+def cleanup():
+ if os.path.exists("test_logs"):
+ shutil.rmtree("test_logs")
+ if os.path.exists("test_dir"):
+ if os.path.isdir("test_dir"):
+ shutil.rmtree("test_dir")
+ else:
+ os.remove("test_dir")
+ for filename in ("localconfig.json", "localconfig.json.bak"):
+ if os.path.exists(filename):
+ os.remove(filename)
+
+
+def get_mercurial_vcs_obj():
+ m = mercurial.MercurialVCS()
+ m.config = {}
+ return m
+
+
+def get_revisions(dest):
+ m = get_mercurial_vcs_obj()
+ retval = []
+ command = HG + ["log", "-R", dest, "--template", "{node}\n"]
+ for rev in m.get_output_from_command(command).split("\n"):
+ rev = rev.strip()
+ if not rev:
+ continue
+ retval.append(rev)
+ return retval
+
+
+class TestMakeAbsolute(unittest.TestCase):
+ # _make_absolute() doesn't play nicely with windows/msys paths.
+ # TODO: fix _make_absolute, write it out of the picture, or determine
+ # that it's not needed on windows.
+ if platform.system() not in ("Windows",):
+
+ def test_absolute_path(self):
+ m = get_mercurial_vcs_obj()
+ self.assertEqual(m._make_absolute("/foo/bar"), "/foo/bar")
+
+ def test_relative_path(self):
+ m = get_mercurial_vcs_obj()
+ self.assertEqual(m._make_absolute("foo/bar"), os.path.abspath("foo/bar"))
+
+ def test_HTTP_paths(self):
+ m = get_mercurial_vcs_obj()
+ self.assertEqual(m._make_absolute("http://foo/bar"), "http://foo/bar")
+
+ def test_absolute_file_path(self):
+ m = get_mercurial_vcs_obj()
+ self.assertEqual(m._make_absolute("file:///foo/bar"), "file:///foo/bar")
+
+ def test_relative_file_path(self):
+ m = get_mercurial_vcs_obj()
+ self.assertEqual(
+ m._make_absolute("file://foo/bar"), "file://%s/foo/bar" % os.getcwd()
+ )
+
+
+class TestHg(unittest.TestCase):
+ def _init_hg_repo(self, hg_obj, repodir):
+ hg_obj.run_command(
+ [
+ "bash",
+ os.path.join(
+ os.path.dirname(__file__), "helper_files", "init_hgrepo.sh"
+ ),
+ repodir,
+ ]
+ )
+
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+ self.repodir = os.path.join(self.tmpdir, "repo")
+ m = get_mercurial_vcs_obj()
+ self._init_hg_repo(m, self.repodir)
+ self.revisions = get_revisions(self.repodir)
+ self.wc = os.path.join(self.tmpdir, "wc")
+ self.pwd = os.getcwd()
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+ os.chdir(self.pwd)
+
+ def test_get_branch(self):
+ m = get_mercurial_vcs_obj()
+ m.clone(self.repodir, self.wc)
+ b = m.get_branch_from_path(self.wc)
+ self.assertEqual(b, "default")
+
+ def test_get_branches(self):
+ m = get_mercurial_vcs_obj()
+ m.clone(self.repodir, self.wc)
+ branches = m.get_branches_from_path(self.wc)
+ self.assertEqual(sorted(branches), sorted(["branch2", "default"]))
+
+ def test_clone(self):
+ m = get_mercurial_vcs_obj()
+ rev = m.clone(self.repodir, self.wc, update_dest=False)
+ self.assertEqual(rev, None)
+ self.assertEqual(self.revisions, get_revisions(self.wc))
+ self.assertEqual(sorted(os.listdir(self.wc)), [".hg"])
+
+ def test_clone_into_non_empty_dir(self):
+ m = get_mercurial_vcs_obj()
+ m.mkdir_p(self.wc)
+ open(os.path.join(self.wc, "test.txt"), "w").write("hello")
+ m.clone(self.repodir, self.wc, update_dest=False)
+ self.assertTrue(not os.path.exists(os.path.join(self.wc, "test.txt")))
+
+ def test_clone_update(self):
+ m = get_mercurial_vcs_obj()
+ rev = m.clone(self.repodir, self.wc, update_dest=True)
+ self.assertEqual(rev, self.revisions[0])
+
+ def test_clone_branch(self):
+ m = get_mercurial_vcs_obj()
+ m.clone(self.repodir, self.wc, branch="branch2", update_dest=False)
+ # On hg 1.6, we should only have a subset of the revisions
+ if m.hg_ver() >= (1, 6, 0):
+ self.assertEqual(self.revisions[1:], get_revisions(self.wc))
+ else:
+ self.assertEqual(self.revisions, get_revisions(self.wc))
+
+ def test_clone_update_branch(self):
+ m = get_mercurial_vcs_obj()
+ rev = m.clone(
+ self.repodir,
+ os.path.join(self.tmpdir, "wc"),
+ branch="branch2",
+ update_dest=True,
+ )
+ self.assertEqual(rev, self.revisions[1], self.revisions)
+
+ def test_clone_revision(self):
+ m = get_mercurial_vcs_obj()
+ m.clone(self.repodir, self.wc, revision=self.revisions[0], update_dest=False)
+ # We'll only get a subset of the revisions
+ self.assertEqual(
+ self.revisions[:1] + self.revisions[2:], get_revisions(self.wc)
+ )
+
+ def test_update_revision(self):
+ m = get_mercurial_vcs_obj()
+ rev = m.clone(self.repodir, self.wc, update_dest=False)
+ self.assertEqual(rev, None)
+
+ rev = m.update(self.wc, revision=self.revisions[1])
+ self.assertEqual(rev, self.revisions[1])
+
+ def test_pull(self):
+ m = get_mercurial_vcs_obj()
+ # Clone just the first rev
+ m.clone(self.repodir, self.wc, revision=self.revisions[-1], update_dest=False)
+ self.assertEqual(get_revisions(self.wc), self.revisions[-1:])
+
+ # Now pull in new changes
+ rev = m.pull(self.repodir, self.wc, update_dest=False)
+ self.assertEqual(rev, None)
+ self.assertEqual(get_revisions(self.wc), self.revisions)
+
+ def test_pull_revision(self):
+ m = get_mercurial_vcs_obj()
+ # Clone just the first rev
+ m.clone(self.repodir, self.wc, revision=self.revisions[-1], update_dest=False)
+ self.assertEqual(get_revisions(self.wc), self.revisions[-1:])
+
+ # Now pull in just the last revision
+ rev = m.pull(
+ self.repodir, self.wc, revision=self.revisions[0], update_dest=False
+ )
+ self.assertEqual(rev, None)
+
+ # We'll be missing the middle revision (on another branch)
+ self.assertEqual(
+ get_revisions(self.wc), self.revisions[:1] + self.revisions[2:]
+ )
+
+ def test_pull_branch(self):
+ m = get_mercurial_vcs_obj()
+ # Clone just the first rev
+ m.clone(self.repodir, self.wc, revision=self.revisions[-1], update_dest=False)
+ self.assertEqual(get_revisions(self.wc), self.revisions[-1:])
+
+ # Now pull in the other branch
+ rev = m.pull(self.repodir, self.wc, branch="branch2", update_dest=False)
+ self.assertEqual(rev, None)
+
+ # On hg 1.6, we'll be missing the last revision (on another branch)
+ if m.hg_ver() >= (1, 6, 0):
+ self.assertEqual(get_revisions(self.wc), self.revisions[1:])
+ else:
+ self.assertEqual(get_revisions(self.wc), self.revisions)
+
+ def test_pull_unrelated(self):
+ m = get_mercurial_vcs_obj()
+ # Create a new repo
+ repo2 = os.path.join(self.tmpdir, "repo2")
+ self._init_hg_repo(m, repo2)
+
+ self.assertNotEqual(self.revisions, get_revisions(repo2))
+
+ # Clone the original repo
+ m.clone(self.repodir, self.wc, update_dest=False)
+ # Hide the wanted error
+ m.config = {"log_to_console": False}
+ # Try and pull in changes from the new repo
+ self.assertRaises(
+ mercurial.VCSException, m.pull, repo2, self.wc, update_dest=False
+ )
+
+ def test_push(self):
+ m = get_mercurial_vcs_obj()
+ m.clone(self.repodir, self.wc, revision=self.revisions[-2])
+ m.push(src=self.repodir, remote=self.wc)
+ self.assertEqual(get_revisions(self.wc), self.revisions)
+
+ def test_push_with_branch(self):
+ m = get_mercurial_vcs_obj()
+ if m.hg_ver() >= (1, 6, 0):
+ m.clone(self.repodir, self.wc, revision=self.revisions[-1])
+ m.push(src=self.repodir, remote=self.wc, branch="branch2")
+ m.push(src=self.repodir, remote=self.wc, branch="default")
+ self.assertEqual(get_revisions(self.wc), self.revisions)
+
+ def test_push_with_revision(self):
+ m = get_mercurial_vcs_obj()
+ m.clone(self.repodir, self.wc, revision=self.revisions[-2])
+ m.push(src=self.repodir, remote=self.wc, revision=self.revisions[-1])
+ self.assertEqual(get_revisions(self.wc), self.revisions[-2:])
+
+ def test_mercurial(self):
+ m = get_mercurial_vcs_obj()
+ m.vcs_config = {
+ "repo": self.repodir,
+ "dest": self.wc,
+ "vcs_share_base": os.path.join(self.tmpdir, "share"),
+ }
+ m.ensure_repo_and_revision()
+ rev = m.ensure_repo_and_revision()
+ self.assertEqual(rev, self.revisions[0])
+
+ def test_push_new_branches_not_allowed(self):
+ m = get_mercurial_vcs_obj()
+ m.clone(self.repodir, self.wc, revision=self.revisions[0])
+ # Hide the wanted error
+ m.config = {"log_to_console": False}
+ self.assertRaises(
+ Exception, m.push, self.repodir, self.wc, push_new_branches=False
+ )
+
+ def test_mercurial_relative_dir(self):
+ m = get_mercurial_vcs_obj()
+ repo = os.path.basename(self.repodir)
+ wc = os.path.basename(self.wc)
+ m.vcs_config = {
+ "repo": repo,
+ "dest": wc,
+ "revision": self.revisions[-1],
+ "vcs_share_base": os.path.join(self.tmpdir, "share"),
+ }
+ m.chdir(os.path.dirname(self.repodir))
+ try:
+ rev = m.ensure_repo_and_revision()
+ self.assertEqual(rev, self.revisions[-1])
+ m.info("Creating test.txt")
+ open(os.path.join(self.wc, "test.txt"), "w").write("hello!")
+
+ m = get_mercurial_vcs_obj()
+ m.vcs_config = {
+ "repo": repo,
+ "dest": wc,
+ "revision": self.revisions[0],
+ "vcs_share_base": os.path.join(self.tmpdir, "share"),
+ }
+ rev = m.ensure_repo_and_revision()
+ self.assertEqual(rev, self.revisions[0])
+ # Make sure our local file didn't go away
+ self.assertTrue(os.path.exists(os.path.join(self.wc, "test.txt")))
+ finally:
+ m.chdir(self.pwd)
+
+ def test_mercurial_update_tip(self):
+ m = get_mercurial_vcs_obj()
+ m.vcs_config = {
+ "repo": self.repodir,
+ "dest": self.wc,
+ "revision": self.revisions[-1],
+ "vcs_share_base": os.path.join(self.tmpdir, "share"),
+ }
+ rev = m.ensure_repo_and_revision()
+ self.assertEqual(rev, self.revisions[-1])
+ open(os.path.join(self.wc, "test.txt"), "w").write("hello!")
+
+ m = get_mercurial_vcs_obj()
+ m.vcs_config = {
+ "repo": self.repodir,
+ "dest": self.wc,
+ "vcs_share_base": os.path.join(self.tmpdir, "share"),
+ }
+ rev = m.ensure_repo_and_revision()
+ self.assertEqual(rev, self.revisions[0])
+ # Make sure our local file didn't go away
+ self.assertTrue(os.path.exists(os.path.join(self.wc, "test.txt")))
+
+ def test_mercurial_update_rev(self):
+ m = get_mercurial_vcs_obj()
+ m.vcs_config = {
+ "repo": self.repodir,
+ "dest": self.wc,
+ "revision": self.revisions[-1],
+ "vcs_share_base": os.path.join(self.tmpdir, "share"),
+ }
+ rev = m.ensure_repo_and_revision()
+ self.assertEqual(rev, self.revisions[-1])
+ open(os.path.join(self.wc, "test.txt"), "w").write("hello!")
+
+ m = get_mercurial_vcs_obj()
+ m.vcs_config = {
+ "repo": self.repodir,
+ "dest": self.wc,
+ "revision": self.revisions[0],
+ "vcs_share_base": os.path.join(self.tmpdir, "share"),
+ }
+ rev = m.ensure_repo_and_revision()
+ self.assertEqual(rev, self.revisions[0])
+ # Make sure our local file didn't go away
+ self.assertTrue(os.path.exists(os.path.join(self.wc, "test.txt")))
+
+ def test_make_hg_url(self):
+ # construct an hg url specific to revision, branch and filename and try to pull it down
+ file_url = mercurial.make_hg_url(
+ "hg.mozilla.org",
+ "//build/tools/",
+ revision="FIREFOX_3_6_12_RELEASE",
+ filename="/lib/python/util/hg.py",
+ protocol="https",
+ )
+ expected_url = (
+ "https://hg.mozilla.org/build/tools/raw-file/"
+ "FIREFOX_3_6_12_RELEASE/lib/python/util/hg.py"
+ )
+ self.assertEqual(file_url, expected_url)
+
+ def test_make_hg_url_no_filename(self):
+ file_url = mercurial.make_hg_url(
+ "hg.mozilla.org",
+ "/build/tools",
+ revision="default",
+ protocol="https",
+ )
+ expected_url = "https://hg.mozilla.org/build/tools/rev/default"
+ self.assertEqual(file_url, expected_url)
+
+ def test_make_hg_url_no_revision_no_filename(self):
+ repo_url = mercurial.make_hg_url(
+ "hg.mozilla.org",
+ "/build/tools",
+ protocol="https",
+ )
+ expected_url = "https://hg.mozilla.org/build/tools"
+ self.assertEqual(repo_url, expected_url)
+
+ def test_make_hg_url_different_protocol(self):
+ repo_url = mercurial.make_hg_url(
+ "hg.mozilla.org",
+ "/build/tools",
+ protocol="ssh",
+ )
+ expected_url = "ssh://hg.mozilla.org/build/tools"
+ self.assertEqual(repo_url, expected_url)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/testing/mozharness/test/test_l10n_locales.py b/testing/mozharness/test/test_l10n_locales.py
new file mode 100644
index 0000000000..678ab98004
--- /dev/null
+++ b/testing/mozharness/test/test_l10n_locales.py
@@ -0,0 +1,118 @@
+import os
+import shutil
+import unittest
+from unittest import mock
+
+import mozharness.base.script as script
+import mozharness.mozilla.l10n.locales as locales
+
+ALL_LOCALES = ["ar", "be", "de", "es-ES"]
+
+MH_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+
+
+def cleanup():
+ if os.path.exists("test_logs"):
+ shutil.rmtree("test_logs")
+
+
+class LocalesTest(locales.LocalesMixin, script.BaseScript):
+ def __init__(self, **kwargs):
+ if "config" not in kwargs:
+ kwargs["config"] = {"log_type": "simple", "log_level": "error"}
+ if "initial_config_file" not in kwargs:
+ kwargs["initial_config_file"] = "test/test.json"
+ super(LocalesTest, self).__init__(**kwargs)
+ self.config = {}
+ self.log_obj = None
+
+
+@mock.patch.dict("os.environ", GECKO_PATH="gecko_src")
+class TestLocalesMixin(unittest.TestCase):
+ BASE_ABS_DIRS = {
+ "abs_log_dir",
+ "abs_work_dir",
+ "base_work_dir",
+ "abs_src_dir",
+ "abs_locales_src_dir",
+ "abs_l10n_dir",
+ "abs_obj_dir",
+ "abs_locales_dir",
+ }
+
+ def setUp(self):
+ cleanup()
+
+ def tearDown(self):
+ cleanup()
+
+ def test_query_locales_locales(self):
+ l = LocalesTest()
+ l.locales = ["a", "b", "c"]
+ self.assertEqual(l.locales, l.query_locales())
+
+ def test_query_locales_ignore_locales(self):
+ l = LocalesTest()
+ l.config["locales"] = ["a", "b", "c"]
+ l.config["ignore_locales"] = ["a", "c"]
+ self.assertEqual(["b"], l.query_locales())
+
+ def test_query_locales_config(self):
+ l = LocalesTest()
+ l.config["locales"] = ["a", "b", "c"]
+ self.assertEqual(l.config["locales"], l.query_locales())
+
+ def test_query_locales_json(self):
+ l = LocalesTest()
+ l.config["locales_file"] = os.path.join(
+ MH_DIR, "test/helper_files/locales.json"
+ )
+ l.config["base_work_dir"] = "."
+ l.config["work_dir"] = "."
+ l.config["locales_dir"] = "locales_dir"
+ l.config["objdir"] = "objdir"
+ locales = l.query_locales()
+ locales.sort()
+ self.assertEqual(ALL_LOCALES, locales)
+
+ # Commenting out til we can hide the FATAL ?
+ # def test_query_locales_no_file(self):
+ # l = LocalesTest()
+ # l.config['base_work_dir'] = '.'
+ # l.config['work_dir'] = '.'
+ # try:
+ # l.query_locales()
+ # except SystemExit:
+ # pass # Good
+ # else:
+ # self.assertTrue(False, "query_locales with no file doesn't fatal()!")
+
+ def test_parse_locales_file(self):
+ l = LocalesTest()
+ self.assertEqual(
+ ALL_LOCALES,
+ l.parse_locales_file(os.path.join(MH_DIR, "test/helper_files/locales.txt")),
+ )
+
+ def _get_query_abs_dirs_obj(self):
+ l = LocalesTest()
+ l.config["base_work_dir"] = "base_work_dir"
+ l.config["work_dir"] = "work_dir"
+ l.config["locales_dir"] = "locales_dir"
+ l.config["objdir"] = "objdir"
+ return l
+
+ def test_query_abs_dirs_base(self):
+ l = self._get_query_abs_dirs_obj()
+ dirs = set(l.query_abs_dirs().keys())
+ self.assertEqual(dirs, self.BASE_ABS_DIRS)
+
+ def test_query_abs_dirs_base2(self):
+ l = self._get_query_abs_dirs_obj()
+ l.query_abs_dirs().keys()
+ dirs = set(l.query_abs_dirs().keys())
+ self.assertEqual(dirs, self.BASE_ABS_DIRS)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/testing/mozharness/test/test_mozilla_automation.py b/testing/mozharness/test/test_mozilla_automation.py
new file mode 100644
index 0000000000..e76fec0249
--- /dev/null
+++ b/testing/mozharness/test/test_mozilla_automation.py
@@ -0,0 +1,45 @@
+import gc
+import unittest
+
+import mozharness.base.log as log
+import mozharness.base.script as script
+from mozharness.base.log import ERROR
+from mozharness.mozilla.automation import AutomationMixin
+
+
+class CleanupObj(script.ScriptMixin, log.LogMixin):
+ def __init__(self):
+ super(CleanupObj, self).__init__()
+ self.log_obj = None
+ self.config = {"log_level": ERROR}
+
+
+def cleanup():
+ gc.collect()
+ c = CleanupObj()
+ for f in ("test_logs", "test_dir", "tmpfile_stdout", "tmpfile_stderr"):
+ c.rmtree(f)
+
+
+class AutomationScript(AutomationMixin, script.BaseScript):
+ def __init__(self, **kwargs):
+ super(AutomationScript, self).__init__(**kwargs)
+
+
+# TestAutomationStatus {{{1
+class TestAutomationStatus(unittest.TestCase):
+ # I need a log watcher helper function, here and in test_log.
+ def setUp(self):
+ cleanup()
+ self.s = None
+
+ def tearDown(self):
+ # Close the logfile handles, or windows can't remove the logs
+ if hasattr(self, "s") and isinstance(self.s, object):
+ del self.s
+ cleanup()
+
+
+# main {{{1
+if __name__ == "__main__":
+ unittest.main()
diff --git a/testing/mozharness/test/test_mozilla_building_buildbase.py b/testing/mozharness/test/test_mozilla_building_buildbase.py
new file mode 100644
index 0000000000..483196d094
--- /dev/null
+++ b/testing/mozharness/test/test_mozilla_building_buildbase.py
@@ -0,0 +1,146 @@
+import os
+import unittest
+
+from mozharness.base.log import LogMixin
+from mozharness.base.script import ScriptMixin
+from mozharness.mozilla.building.buildbase import MozconfigPathError, get_mozconfig_path
+
+
+class FakeLogger(object):
+ def log_message(self, *args, **kwargs):
+ pass
+
+
+class FakeScriptMixin(LogMixin, ScriptMixin, object):
+ def __init__(self):
+ self.script_obj = self
+ self.log_obj = FakeLogger()
+
+
+class TestMozconfigPath(unittest.TestCase):
+ """
+ Tests for :func:`get_mozconfig_path`.
+ """
+
+ def test_path(self):
+ """
+ Passing just ``src_mozconfig`` gives that file in ``abs_src_dir``.
+ """
+ script = FakeScriptMixin()
+
+ abs_src_path = get_mozconfig_path(
+ script,
+ config={"src_mozconfig": "path/to/mozconfig"},
+ dirs={"abs_src_dir": "/src"},
+ )
+ self.assertEqual(abs_src_path, "/src/path/to/mozconfig")
+
+ def test_composite(self):
+ """
+ Passing ``app_name``, ``mozconfig_platform``, and ``mozconfig_variant``
+ find the file in the ``config/mozconfigs`` subdirectory of that app
+ directory.
+ """
+ script = FakeScriptMixin()
+
+ config = {
+ "app_name": "the-app",
+ "mozconfig_variant": "variant",
+ "mozconfig_platform": "platform9000",
+ }
+ abs_src_path = get_mozconfig_path(
+ script,
+ config=config,
+ dirs={"abs_src_dir": "/src"},
+ )
+ self.assertEqual(
+ abs_src_path,
+ "/src/the-app/config/mozconfigs/platform9000/variant",
+ )
+
+ def test_manifest(self):
+ """
+ Passing just ``src_mozconfig_manifest`` looks in that file in
+ ``abs_work_dir``, and finds the mozconfig file specified there in
+ ``abs_src_dir``.
+ """
+ script = FakeScriptMixin()
+
+ test_dir = os.path.dirname(__file__)
+ config = {"src_mozconfig_manifest": "helper_files/mozconfig_manifest.json"}
+ abs_src_path = get_mozconfig_path(
+ script,
+ config=config,
+ dirs={
+ "abs_src_dir": "/src",
+ "abs_work_dir": test_dir,
+ },
+ )
+ self.assertEqual(abs_src_path, "/src/path/to/mozconfig")
+
+ def test_errors(self):
+ script = FakeScriptMixin()
+
+ configs = [
+ # Not specifying any parts of a mozconfig path
+ {},
+ # Specifying both src_mozconfig and src_mozconfig_manifest
+ {"src_mozconfig": "path", "src_mozconfig_manifest": "path"},
+ # Specifying src_mozconfig with some or all of a composite
+ # mozconfig path
+ {
+ "src_mozconfig": "path",
+ "app_name": "app",
+ "mozconfig_platform": "platform",
+ "mozconfig_variant": "variant",
+ },
+ {
+ "src_mozconfig": "path",
+ "mozconfig_platform": "platform",
+ "mozconfig_variant": "variant",
+ },
+ {
+ "src_mozconfig": "path",
+ "app_name": "app",
+ "mozconfig_variant": "variant",
+ },
+ {
+ "src_mozconfig": "path",
+ "app_name": "app",
+ "mozconfig_platform": "platform",
+ },
+ # Specifying src_mozconfig_manifest with some or all of a composite
+ # mozconfig path
+ {
+ "src_mozconfig_manifest": "path",
+ "app_name": "app",
+ "mozconfig_platform": "platform",
+ "mozconfig_variant": "variant",
+ },
+ {
+ "src_mozconfig_manifest": "path",
+ "mozconfig_platform": "platform",
+ "mozconfig_variant": "variant",
+ },
+ {
+ "src_mozconfig_manifest": "path",
+ "app_name": "app",
+ "mozconfig_variant": "variant",
+ },
+ {
+ "src_mozconfig_manifest": "path",
+ "app_name": "app",
+ "mozconfig_platform": "platform",
+ },
+ # Specifying only some parts of a compsite mozconfig path
+ {"mozconfig_platform": "platform", "mozconfig_variant": "variant"},
+ {"app_name": "app", "mozconfig_variant": "variant"},
+ {"app_name": "app", "mozconfig_platform": "platform"},
+ {"app_name": "app"},
+ {"mozconfig_variant": "variant"},
+ {"mozconfig_platform": "platform"},
+ ]
+
+ for config in configs:
+ with self.assertRaises(MozconfigPathError):
+ get_mozconfig_path(script, config=config, dirs={})
diff --git a/testing/mozharness/test/test_mozilla_merkle.py b/testing/mozharness/test/test_mozilla_merkle.py
new file mode 100644
index 0000000000..226499142f
--- /dev/null
+++ b/testing/mozharness/test/test_mozilla_merkle.py
@@ -0,0 +1,134 @@
+import codecs
+import hashlib
+import random
+import unittest
+
+from mozharness.mozilla.merkle import InclusionProof, MerkleTree
+
+decode_hex = codecs.getdecoder("hex_codec")
+encode_hex = codecs.getencoder("hex_codec")
+
+# Pre-computed tree on 7 inputs
+#
+# ______F_____
+# / \
+# __D__ _E_
+# / \ / \
+# A B C |
+# / \ / \ / \ |
+# 0 1 2 3 4 5 6
+hash_fn = hashlib.sha256
+
+data = [
+ decode_hex("fbc459361fc111024c6d1fd83d23a9ff")[0],
+ decode_hex("ae3a44925afec860451cd8658b3cadde")[0],
+ decode_hex("418903fe6ef29fc8cab93d778a7b018b")[0],
+ decode_hex("3d1c53c00b2e137af8c4c23a06388c6b")[0],
+ decode_hex("e656ebd8e2758bc72599e5896be357be")[0],
+ decode_hex("81aae91cf90be172eedd1c75c349bf9e")[0],
+ decode_hex("00c262edf8b0bc345aca769e8733e25e")[0],
+]
+
+leaves = [
+ decode_hex("5cb551f87797381a24a5359a986e2cef25b1f2113b387197fe48e8babc9ad5c7")[0],
+ decode_hex("9899dc0be00306bda2a8e69cec32525ca6244f132479bcf840d8c1bc8bdfbff2")[0],
+ decode_hex("fdd27d0393e32637b474efb9b3efad29568c3ec9b091fdda40fd57ec9196f06d")[0],
+ decode_hex("c87292a6c8528c2a0679b6c1eefb47e4dbac7840d23645d5b7cb47cf1a8d365f")[0],
+ decode_hex("2ff3bdac9bec3580b82da8a357746f15919414d9cbe517e2dd96910c9814c30c")[0],
+ decode_hex("883e318240eccc0e2effafebdb0fd4fd26d0996da1b01439566cb9babef8725f")[0],
+ decode_hex("bb13dfb7b202a95f241ea1715c8549dc048d9936ec747028002f7c795de72fcf")[0],
+]
+
+nodeA = decode_hex("06447a7baa079cb0b4b6119d0f575bec508915403fdc30923eba982b63759805")[
+ 0
+]
+nodeB = decode_hex("3db98027c655ead4fe897bef3a4b361839a337941a9e624b475580c9d4e882ee")[
+ 0
+]
+nodeC = decode_hex("17524f8b0169b2745c67846925d55449ae80a8022ef8189dcf4cbb0ec7fcc470")[
+ 0
+]
+nodeD = decode_hex("380d0dc6fd7d4f37859a12dbfc7171b3cce29ab0688c6cffd2b15f3e0b21af49")[
+ 0
+]
+nodeE = decode_hex("3a9c2886a5179a6e1948876034f99d52a8f393f47a09887adee6d1b4a5c5fbd6")[
+ 0
+]
+nodeF = decode_hex("d1a0d3947db4ae8305f2ac32985957e02659b2ea3c10da52a48d2526e9af3bbc")[
+ 0
+]
+
+proofs = [
+ [leaves[1], nodeB, nodeE],
+ [leaves[0], nodeB, nodeE],
+ [leaves[3], nodeA, nodeE],
+ [leaves[2], nodeA, nodeE],
+ [leaves[5], leaves[6], nodeD],
+ [leaves[4], leaves[6], nodeD],
+ [nodeC, nodeD],
+]
+
+known_proof5 = decode_hex(
+ "020000"
+ + "0000000000000007"
+ + "0000000000000005"
+ + "0063"
+ + "20"
+ + encode_hex(leaves[4])[0].decode()
+ + "20"
+ + encode_hex(leaves[6])[0].decode()
+ + "20"
+ + encode_hex(nodeD)[0].decode()
+)[0]
+
+
+class TestMerkleTree(unittest.TestCase):
+ def testPreComputed(self):
+ tree = MerkleTree(hash_fn, data)
+ head = tree.head()
+ self.assertEqual(head, nodeF)
+
+ for i in range(len(data)):
+ proof = tree.inclusion_proof(i)
+
+ self.assertTrue(proof.verify(hash_fn, data[i], i, len(data), head))
+ self.assertEqual(proof.leaf_index, i)
+ self.assertEqual(proof.tree_size, tree.n)
+ self.assertEqual(proof.path_elements, proofs[i])
+
+ def testInclusionProofEncodeDecode(self):
+ tree = MerkleTree(hash_fn, data)
+
+ # Inclusion proof encode/decode round trip test
+ proof5 = tree.inclusion_proof(5)
+ serialized5 = proof5.to_rfc6962_bis()
+ deserialized5 = InclusionProof.from_rfc6962_bis(serialized5)
+ reserialized5 = deserialized5.to_rfc6962_bis()
+ self.assertEqual(serialized5, reserialized5)
+
+ # Inclusion proof encode known answer test
+ serialized5 = proof5.to_rfc6962_bis()
+ self.assertEqual(serialized5, known_proof5)
+
+ # Inclusion proof decode known answer test
+ known_deserialized5 = InclusionProof.from_rfc6962_bis(known_proof5)
+ self.assertEqual(proof5.leaf_index, known_deserialized5.leaf_index)
+ self.assertEqual(proof5.tree_size, known_deserialized5.tree_size)
+ self.assertEqual(proof5.path_elements, known_deserialized5.path_elements)
+
+ def testLargeTree(self):
+ TEST_SIZE = 5000
+ ELEM_SIZE_BYTES = 16
+ data = [
+ bytearray(random.getrandbits(8) for _ in range(ELEM_SIZE_BYTES))
+ for _ in range(TEST_SIZE)
+ ]
+ tree = MerkleTree(hash_fn, data)
+ head = tree.head()
+
+ for i in range(len(data)):
+ proof = tree.inclusion_proof(i)
+
+ self.assertTrue(proof.verify(hash_fn, data[i], i, len(data), head))
+ self.assertEqual(proof.leaf_index, i)
+ self.assertEqual(proof.tree_size, tree.n)
diff --git a/testing/mozharness/test/test_mozilla_structured.py b/testing/mozharness/test/test_mozilla_structured.py
new file mode 100644
index 0000000000..8bc30d5f8c
--- /dev/null
+++ b/testing/mozharness/test/test_mozilla_structured.py
@@ -0,0 +1,68 @@
+import unittest
+
+from mozharness.base.log import INFO, WARNING
+from mozharness.mozilla.automation import TBPL_SUCCESS, TBPL_WARNING
+from mozharness.mozilla.mozbase import MozbaseMixin
+from mozharness.mozilla.structuredlog import StructuredOutputParser
+from mozlog.handlers.statushandler import RunSummary
+
+success_summary = RunSummary(
+ unexpected_statuses={},
+ expected_statuses={"PASS": 3, "OK": 1, "FAIL": 1},
+ known_intermittent_statuses={"FAIL": 1},
+ log_level_counts={"info": 5},
+ action_counts={"test_status": 4, "test_end": 1, "suite_end": 1},
+)
+
+failure_summary = RunSummary(
+ unexpected_statuses={"FAIL": 2},
+ expected_statuses={"PASS": 2, "OK": 1},
+ known_intermittent_statuses={},
+ log_level_counts={"warning": 2, "info": 3},
+ action_counts={"test_status": 3, "test_end": 2, "suite_end": 1},
+)
+
+
+class TestParser(MozbaseMixin, StructuredOutputParser):
+ def __init__(self, *args, **kwargs):
+ super(TestParser, self).__init__(*args, **kwargs)
+ self.config = {}
+
+
+class TestStructuredOutputParser(unittest.TestCase):
+ def setUp(self):
+ self.parser = TestParser()
+
+ def test_evaluate_parser_success(self):
+ self.parser.handler.expected_statuses = {"PASS": 3, "OK": 1, "FAIL": 1}
+ self.parser.handler.log_level_counts = {"info": 5}
+ self.parser.handler.action_counts = {
+ "test_status": 4,
+ "test_end": 1,
+ "suite_end": 1,
+ }
+ self.parser.handler.known_intermittent_statuses = {"FAIL": 1}
+ result = self.parser.evaluate_parser(
+ return_code=TBPL_SUCCESS, success_codes=[TBPL_SUCCESS]
+ )
+ tbpl_status, worst_log_level, joined_summary = result
+ self.assertEqual(tbpl_status, TBPL_SUCCESS)
+ self.assertEqual(worst_log_level, INFO)
+ self.assertEqual(joined_summary, success_summary)
+
+ def test_evaluate_parser_failure(self):
+ self.parser.handler.unexpected_statuses = {"FAIL": 2}
+ self.parser.handler.expected_statuses = {"PASS": 2, "OK": 1}
+ self.parser.handler.log_level_counts = {"warning": 2, "info": 3}
+ self.parser.handler.action_counts = {
+ "test_status": 3,
+ "test_end": 2,
+ "suite_end": 1,
+ }
+ result = self.parser.evaluate_parser(
+ return_code=TBPL_SUCCESS, success_codes=[TBPL_SUCCESS]
+ )
+ tbpl_status, worst_log_level, joined_summary = result
+ self.assertEqual(tbpl_status, TBPL_WARNING)
+ self.assertEqual(worst_log_level, WARNING)
+ self.assertEqual(joined_summary, failure_summary)