summaryrefslogtreecommitdiffstats
path: root/testing/mozbase/mozlog/tests
diff options
context:
space:
mode:
Diffstat (limited to 'testing/mozbase/mozlog/tests')
-rw-r--r--testing/mozbase/mozlog/tests/conftest.py24
-rw-r--r--testing/mozbase/mozlog/tests/manifest.ini9
-rw-r--r--testing/mozbase/mozlog/tests/test_capture.py37
-rw-r--r--testing/mozbase/mozlog/tests/test_errorsummary.py125
-rw-r--r--testing/mozbase/mozlog/tests/test_formatters.py767
-rw-r--r--testing/mozbase/mozlog/tests/test_logger.py303
-rw-r--r--testing/mozbase/mozlog/tests/test_logtypes.py106
-rw-r--r--testing/mozbase/mozlog/tests/test_structured.py1098
-rw-r--r--testing/mozbase/mozlog/tests/test_terminal_colors.py62
9 files changed, 2531 insertions, 0 deletions
diff --git a/testing/mozbase/mozlog/tests/conftest.py b/testing/mozbase/mozlog/tests/conftest.py
new file mode 100644
index 0000000000..eaf79897e4
--- /dev/null
+++ b/testing/mozbase/mozlog/tests/conftest.py
@@ -0,0 +1,24 @@
+import pytest
+from mozlog.formatters import ErrorSummaryFormatter, MachFormatter
+from mozlog.handlers import StreamHandler
+from mozlog.structuredlog import StructuredLogger
+from six import StringIO
+
+
+@pytest.fixture
+def get_logger():
+ # Ensure a new state instance is created for each test function.
+ StructuredLogger._logger_states = {}
+ formatters = {
+ "mach": MachFormatter,
+ "errorsummary": ErrorSummaryFormatter,
+ }
+
+ def inner(name, **fmt_args):
+ buf = StringIO()
+ fmt = formatters[name](**fmt_args)
+ logger = StructuredLogger("test_logger")
+ logger.add_handler(StreamHandler(buf, fmt))
+ return logger
+
+ return inner
diff --git a/testing/mozbase/mozlog/tests/manifest.ini b/testing/mozbase/mozlog/tests/manifest.ini
new file mode 100644
index 0000000000..23737ca469
--- /dev/null
+++ b/testing/mozbase/mozlog/tests/manifest.ini
@@ -0,0 +1,9 @@
+[DEFAULT]
+subsuite = mozbase
+[test_errorsummary.py]
+[test_logger.py]
+[test_logtypes.py]
+[test_formatters.py]
+[test_structured.py]
+[test_terminal_colors.py]
+[test_capture.py]
diff --git a/testing/mozbase/mozlog/tests/test_capture.py b/testing/mozbase/mozlog/tests/test_capture.py
new file mode 100644
index 0000000000..4adbda4180
--- /dev/null
+++ b/testing/mozbase/mozlog/tests/test_capture.py
@@ -0,0 +1,37 @@
+import sys
+import unittest
+
+import mozunit
+from mozlog import capture, structuredlog
+from test_structured import TestHandler
+
+
+class TestCaptureIO(unittest.TestCase):
+ """Tests expected logging output of CaptureIO"""
+
+ def setUp(self):
+ self.logger = structuredlog.StructuredLogger("test")
+ self.handler = TestHandler()
+ self.logger.add_handler(self.handler)
+
+ def test_captureio_log(self):
+ """
+ CaptureIO takes in two arguments. The second argument must
+ be truthy in order for the code to run. Hence, the string
+ "capture_stdio" has been used in this test case.
+ """
+ with capture.CaptureIO(self.logger, "capture_stdio"):
+ print("message 1")
+ sys.stdout.write("message 2")
+ sys.stderr.write("message 3")
+ sys.stdout.write("\xff")
+ log = self.handler.items
+ messages = [item["message"] for item in log]
+ self.assertIn("STDOUT: message 1", messages)
+ self.assertIn("STDOUT: message 2", messages)
+ self.assertIn("STDERR: message 3", messages)
+ self.assertIn(u"STDOUT: \xff", messages)
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozlog/tests/test_errorsummary.py b/testing/mozbase/mozlog/tests/test_errorsummary.py
new file mode 100644
index 0000000000..422bd53358
--- /dev/null
+++ b/testing/mozbase/mozlog/tests/test_errorsummary.py
@@ -0,0 +1,125 @@
+# -*- coding: utf-8 -*-
+
+import json
+import time
+
+import mozunit
+import pytest
+
+# flake8: noqa
+
+
+@pytest.mark.parametrize(
+ "logs,expected",
+ (
+ pytest.param(
+ [
+ (
+ "suite_start",
+ {
+ "manifestA": ["test_foo", "test_bar", "test_baz"],
+ "manifestB": ["test_something"],
+ },
+ ),
+ ("test_start", "test_foo"),
+ ("test_end", "test_foo", "SKIP"),
+ ("test_start", "test_bar"),
+ ("test_end", "test_bar", "OK"),
+ ("test_start", "test_something"),
+ ("test_end", "test_something", "OK"),
+ ("test_start", "test_baz"),
+ ("test_end", "test_baz", "PASS", "FAIL"),
+ ("suite_end",),
+ ],
+ """
+ {"groups": ["manifestA", "manifestB"], "action": "test_groups", "line": 0}
+ {"test": "test_baz", "subtest": null, "group": "manifestA", "status": "PASS", "expected": "FAIL", "message": null, "stack": null, "known_intermittent": [], "action": "test_result", "line": 8}
+ {"group": "manifestA", "status": "ERROR", "duration": 70, "action": "group_result", "line": 9}
+ {"group": "manifestB", "status": "OK", "duration": 10, "action": "group_result", "line": 9}
+ """.strip(),
+ id="basic",
+ ),
+ pytest.param(
+ [
+ ("suite_start", {"manifest": ["test_foo"]}),
+ ("test_start", "test_foo"),
+ ("suite_end",),
+ ],
+ """
+ {"groups": ["manifest"], "action": "test_groups", "line": 0}
+ {"group": "manifest", "status": null, "duration": null, "action": "group_result", "line": 2}
+ """.strip(),
+ id="missing_test_end",
+ ),
+ pytest.param(
+ [
+ ("suite_start", {"manifest": ["test_foo"]}),
+ ("test_start", "test_foo"),
+ ("test_status", "test_foo", "subtest", "PASS"),
+ ("suite_end",),
+ ],
+ """
+ {"groups": ["manifest"], "action": "test_groups", "line": 0}
+ {"group": "manifest", "status": "ERROR", "duration": null, "action": "group_result", "line": 3}
+ """.strip(),
+ id="missing_test_end_with_test_status_ok",
+ marks=pytest.mark.xfail, # status is OK but should be ERROR
+ ),
+ pytest.param(
+ [
+ (
+ "suite_start",
+ {
+ "manifestA": ["test_foo", "test_bar", "test_baz"],
+ "manifestB": ["test_something"],
+ },
+ ),
+ ("test_start", "test_foo"),
+ ("test_end", "test_foo", "SKIP"),
+ ("test_start", "test_bar"),
+ ("test_end", "test_bar", "CRASH"),
+ ("test_start", "test_something"),
+ ("test_end", "test_something", "OK"),
+ ("test_start", "test_baz"),
+ ("test_end", "test_baz", "FAIL", "FAIL"),
+ ("suite_end",),
+ ],
+ """
+ {"groups": ["manifestA", "manifestB"], "action": "test_groups", "line": 0}
+ {"test": "test_bar", "subtest": null, "group": "manifestA", "status": "CRASH", "expected": "OK", "message": null, "stack": null, "known_intermittent": [], "action": "test_result", "line": 4}
+ {"group": "manifestA", "status": "ERROR", "duration": 70, "action": "group_result", "line": 9}
+ {"group": "manifestB", "status": "OK", "duration": 10, "action": "group_result", "line": 9}
+ """.strip(),
+ id="crash_and_group_status",
+ ),
+ ),
+)
+def test_errorsummary(monkeypatch, get_logger, logs, expected):
+ ts = {"ts": 0.0} # need to use dict since 'nonlocal' doesn't exist on PY2
+
+ def fake_time():
+ ts["ts"] += 0.01
+ return ts["ts"]
+
+ monkeypatch.setattr(time, "time", fake_time)
+ logger = get_logger("errorsummary")
+
+ for log in logs:
+ getattr(logger, log[0])(*log[1:])
+
+ buf = logger.handlers[0].stream
+ result = buf.getvalue()
+ print("Dumping result for copy/paste:")
+ print(result)
+
+ expected = expected.split("\n")
+ for i, line in enumerate(result.split("\n")):
+ if not line:
+ continue
+
+ data = json.loads(line)
+ assert data == json.loads(expected[i])
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozlog/tests/test_formatters.py b/testing/mozbase/mozlog/tests/test_formatters.py
new file mode 100644
index 0000000000..e0e3a51d97
--- /dev/null
+++ b/testing/mozbase/mozlog/tests/test_formatters.py
@@ -0,0 +1,767 @@
+# encoding: utf-8
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import os
+import signal
+import unittest
+import xml.etree.ElementTree as ET
+from textwrap import dedent
+
+import mozunit
+import pytest
+from mozlog.formatters import (
+ GroupingFormatter,
+ HTMLFormatter,
+ MachFormatter,
+ TbplFormatter,
+ XUnitFormatter,
+)
+from mozlog.handlers import StreamHandler
+from mozlog.structuredlog import StructuredLogger
+from six import StringIO, ensure_text, unichr
+
+FORMATS = {
+ # A list of tuples consisting of (name, options, expected string).
+ "PASS": [
+ (
+ "mach",
+ {},
+ dedent(
+ """
+ 0:00.00 SUITE_START: running 3 tests
+ 0:00.00 TEST_START: test_foo
+ 0:00.00 TEST_END: OK
+ 0:00.00 TEST_START: test_bar
+ 0:00.00 TEST_END: Test OK. Subtests passed 1/1. Unexpected 0
+ 0:00.00 TEST_START: test_baz
+ 0:00.00 TEST_END: FAIL
+ 0:00.00 SUITE_END
+
+ suite 1
+ ~~~~~~~
+ Ran 4 checks (1 subtests, 3 tests)
+ Expected results: 4
+ Unexpected results: 0
+ OK
+ """
+ ).lstrip("\n"),
+ ),
+ (
+ "mach",
+ {"verbose": True},
+ dedent(
+ """
+ 0:00.00 SUITE_START: running 3 tests
+ 0:00.00 TEST_START: test_foo
+ 0:00.00 TEST_END: OK
+ 0:00.00 TEST_START: test_bar
+ 0:00.00 PASS a subtest
+ 0:00.00 TEST_END: Test OK. Subtests passed 1/1. Unexpected 0
+ 0:00.00 TEST_START: test_baz
+ 0:00.00 TEST_END: FAIL
+ 0:00.00 SUITE_END
+
+ suite 1
+ ~~~~~~~
+ Ran 4 checks (1 subtests, 3 tests)
+ Expected results: 4
+ Unexpected results: 0
+ OK
+ """
+ ).lstrip("\n"),
+ ),
+ ],
+ "FAIL": [
+ (
+ "mach",
+ {},
+ dedent(
+ """
+ 0:00.00 SUITE_START: running 3 tests
+ 0:00.00 TEST_START: test_foo
+ 0:00.00 TEST_END: FAIL, expected PASS - expected 0 got 1
+ 0:00.00 TEST_START: test_bar
+ 0:00.00 TEST_END: Test OK. Subtests passed 0/2. Unexpected 2
+ FAIL a subtest - expected 0 got 1
+ SimpleTest.is@SimpleTest/SimpleTest.js:312:5
+ @caps/tests/mochitest/test_bug246699.html:53:1
+ TIMEOUT another subtest
+ 0:00.00 TEST_START: test_baz
+ 0:00.00 TEST_END: PASS, expected FAIL
+ 0:00.00 SUITE_END
+
+ suite 1
+ ~~~~~~~
+ Ran 5 checks (2 subtests, 3 tests)
+ Expected results: 1
+ Unexpected results: 4
+ test: 2 (1 fail, 1 pass)
+ subtest: 2 (1 fail, 1 timeout)
+
+ Unexpected Results
+ ------------------
+ test_foo
+ FAIL test_foo - expected 0 got 1
+ test_bar
+ FAIL a subtest - expected 0 got 1
+ SimpleTest.is@SimpleTest/SimpleTest.js:312:5
+ @caps/tests/mochitest/test_bug246699.html:53:1
+ TIMEOUT another subtest
+ test_baz
+ UNEXPECTED-PASS test_baz
+ """
+ ).lstrip("\n"),
+ ),
+ (
+ "mach",
+ {"verbose": True},
+ dedent(
+ """
+ 0:00.00 SUITE_START: running 3 tests
+ 0:00.00 TEST_START: test_foo
+ 0:00.00 TEST_END: FAIL, expected PASS - expected 0 got 1
+ 0:00.00 TEST_START: test_bar
+ 0:00.00 FAIL a subtest - expected 0 got 1
+ SimpleTest.is@SimpleTest/SimpleTest.js:312:5
+ @caps/tests/mochitest/test_bug246699.html:53:1
+ 0:00.00 TIMEOUT another subtest
+ 0:00.00 TEST_END: Test OK. Subtests passed 0/2. Unexpected 2
+ 0:00.00 TEST_START: test_baz
+ 0:00.00 TEST_END: PASS, expected FAIL
+ 0:00.00 SUITE_END
+
+ suite 1
+ ~~~~~~~
+ Ran 5 checks (2 subtests, 3 tests)
+ Expected results: 1
+ Unexpected results: 4
+ test: 2 (1 fail, 1 pass)
+ subtest: 2 (1 fail, 1 timeout)
+
+ Unexpected Results
+ ------------------
+ test_foo
+ FAIL test_foo - expected 0 got 1
+ test_bar
+ FAIL a subtest - expected 0 got 1
+ SimpleTest.is@SimpleTest/SimpleTest.js:312:5
+ @caps/tests/mochitest/test_bug246699.html:53:1
+ TIMEOUT another subtest
+ test_baz
+ UNEXPECTED-PASS test_baz
+ """
+ ).lstrip("\n"),
+ ),
+ ],
+ "PRECONDITION_FAILED": [
+ (
+ "mach",
+ {},
+ dedent(
+ """
+ 0:00.00 SUITE_START: running 2 tests
+ 0:00.00 TEST_START: test_foo
+ 0:00.00 TEST_END: PRECONDITION_FAILED, expected OK
+ 0:00.00 TEST_START: test_bar
+ 0:00.00 TEST_END: Test OK. Subtests passed 1/2. Unexpected 1
+ PRECONDITION_FAILED another subtest
+ 0:00.00 SUITE_END
+
+ suite 1
+ ~~~~~~~
+ Ran 4 checks (2 subtests, 2 tests)
+ Expected results: 2
+ Unexpected results: 2
+ test: 1 (1 precondition_failed)
+ subtest: 1 (1 precondition_failed)
+
+ Unexpected Results
+ ------------------
+ test_foo
+ PRECONDITION_FAILED test_foo
+ test_bar
+ PRECONDITION_FAILED another subtest
+ """
+ ).lstrip("\n"),
+ ),
+ (
+ "mach",
+ {"verbose": True},
+ dedent(
+ """
+ 0:00.00 SUITE_START: running 2 tests
+ 0:00.00 TEST_START: test_foo
+ 0:00.00 TEST_END: PRECONDITION_FAILED, expected OK
+ 0:00.00 TEST_START: test_bar
+ 0:00.00 PASS a subtest
+ 0:00.00 PRECONDITION_FAILED another subtest
+ 0:00.00 TEST_END: Test OK. Subtests passed 1/2. Unexpected 1
+ 0:00.00 SUITE_END
+
+ suite 1
+ ~~~~~~~
+ Ran 4 checks (2 subtests, 2 tests)
+ Expected results: 2
+ Unexpected results: 2
+ test: 1 (1 precondition_failed)
+ subtest: 1 (1 precondition_failed)
+
+ Unexpected Results
+ ------------------
+ test_foo
+ PRECONDITION_FAILED test_foo
+ test_bar
+ PRECONDITION_FAILED another subtest
+ """
+ ).lstrip("\n"),
+ ),
+ ],
+ "KNOWN-INTERMITTENT": [
+ (
+ "mach",
+ {},
+ dedent(
+ """
+ 0:00.00 SUITE_START: running 3 tests
+ 0:00.00 TEST_START: test_foo
+ 0:00.00 TEST_END: FAIL
+ KNOWN-INTERMITTENT-FAIL test_foo
+ 0:00.00 TEST_START: test_bar
+ 0:00.00 TEST_END: Test OK. Subtests passed 1/1. Unexpected 0
+ KNOWN-INTERMITTENT-PASS a subtest
+ 0:00.00 TEST_START: test_baz
+ 0:00.00 TEST_END: FAIL
+ 0:00.00 SUITE_END
+
+ suite 1
+ ~~~~~~~
+ Ran 4 checks (1 subtests, 3 tests)
+ Expected results: 4 (2 known intermittents)
+ Unexpected results: 0
+
+ Known Intermittent Results
+ --------------------------
+ test_foo
+ KNOWN-INTERMITTENT-FAIL test_foo
+ test_bar
+ KNOWN-INTERMITTENT-PASS a subtest
+ OK
+ """
+ ).lstrip("\n"),
+ ),
+ (
+ "mach",
+ {"verbose": True},
+ dedent(
+ """
+ 0:00.00 SUITE_START: running 3 tests
+ 0:00.00 TEST_START: test_foo
+ 0:00.00 TEST_END: FAIL
+ KNOWN-INTERMITTENT-FAIL test_foo
+ 0:00.00 TEST_START: test_bar
+ 0:00.00 KNOWN-INTERMITTENT-PASS a subtest
+ 0:00.00 TEST_END: Test OK. Subtests passed 1/1. Unexpected 0
+ KNOWN-INTERMITTENT-PASS a subtest
+ 0:00.00 TEST_START: test_baz
+ 0:00.00 TEST_END: FAIL
+ 0:00.00 SUITE_END
+
+ suite 1
+ ~~~~~~~
+ Ran 4 checks (1 subtests, 3 tests)
+ Expected results: 4 (2 known intermittents)
+ Unexpected results: 0
+
+ Known Intermittent Results
+ --------------------------
+ test_foo
+ KNOWN-INTERMITTENT-FAIL test_foo
+ test_bar
+ KNOWN-INTERMITTENT-PASS a subtest
+ OK
+ """
+ ).lstrip("\n"),
+ ),
+ ],
+}
+
+
+def ids(test):
+ ids = []
+ for value in FORMATS[test]:
+ args = ", ".join(["{}={}".format(k, v) for k, v in value[1].items()])
+ if args:
+ args = "-{}".format(args)
+ ids.append("{}{}".format(value[0], args))
+ return ids
+
+
+@pytest.fixture(autouse=True)
+def timestamp(monkeypatch):
+ def fake_time(*args, **kwargs):
+ return 0
+
+ monkeypatch.setattr(MachFormatter, "_time", fake_time)
+
+
+@pytest.mark.parametrize("name,opts,expected", FORMATS["PASS"], ids=ids("PASS"))
+def test_pass(get_logger, name, opts, expected):
+ logger = get_logger(name, **opts)
+
+ logger.suite_start(["test_foo", "test_bar", "test_baz"])
+ logger.test_start("test_foo")
+ logger.test_end("test_foo", "OK")
+ logger.test_start("test_bar")
+ logger.test_status("test_bar", "a subtest", "PASS")
+ logger.test_end("test_bar", "OK")
+ logger.test_start("test_baz")
+ logger.test_end("test_baz", "FAIL", "FAIL", "expected 0 got 1")
+ logger.suite_end()
+
+ buf = logger.handlers[0].stream
+ result = buf.getvalue()
+ print("Dumping result for copy/paste:")
+ print(result)
+ assert result == expected
+
+
+@pytest.mark.parametrize("name,opts,expected", FORMATS["FAIL"], ids=ids("FAIL"))
+def test_fail(get_logger, name, opts, expected):
+ stack = """
+ SimpleTest.is@SimpleTest/SimpleTest.js:312:5
+ @caps/tests/mochitest/test_bug246699.html:53:1
+""".strip(
+ "\n"
+ )
+
+ logger = get_logger(name, **opts)
+
+ logger.suite_start(["test_foo", "test_bar", "test_baz"])
+ logger.test_start("test_foo")
+ logger.test_end("test_foo", "FAIL", "PASS", "expected 0 got 1")
+ logger.test_start("test_bar")
+ logger.test_status(
+ "test_bar", "a subtest", "FAIL", "PASS", "expected 0 got 1", stack
+ )
+ logger.test_status("test_bar", "another subtest", "TIMEOUT")
+ logger.test_end("test_bar", "OK")
+ logger.test_start("test_baz")
+ logger.test_end("test_baz", "PASS", "FAIL")
+ logger.suite_end()
+
+ buf = logger.handlers[0].stream
+ result = buf.getvalue()
+ print("Dumping result for copy/paste:")
+ print(result)
+ assert result == expected
+
+
+@pytest.mark.parametrize(
+ "name,opts,expected", FORMATS["PRECONDITION_FAILED"], ids=ids("PRECONDITION_FAILED")
+)
+def test_precondition_failed(get_logger, name, opts, expected):
+ logger = get_logger(name, **opts)
+
+ logger.suite_start(["test_foo", "test_bar"])
+ logger.test_start("test_foo")
+ logger.test_end("test_foo", "PRECONDITION_FAILED")
+ logger.test_start("test_bar")
+ logger.test_status("test_bar", "a subtest", "PASS")
+ logger.test_status("test_bar", "another subtest", "PRECONDITION_FAILED")
+ logger.test_end("test_bar", "OK")
+ logger.suite_end()
+
+ buf = logger.handlers[0].stream
+ result = buf.getvalue()
+ print("Dumping result for copy/paste:")
+ print(result)
+ assert result == expected
+
+
+@pytest.mark.parametrize(
+ "name,opts,expected", FORMATS["KNOWN-INTERMITTENT"], ids=ids("KNOWN-INTERMITTENT")
+)
+def test_known_intermittent(get_logger, name, opts, expected):
+ logger = get_logger(name, **opts)
+
+ logger.suite_start(["test_foo", "test_bar", "test_baz"])
+ logger.test_start("test_foo")
+ logger.test_end("test_foo", "FAIL", "PASS", known_intermittent=["FAIL"])
+ logger.test_start("test_bar")
+ logger.test_status(
+ "test_bar", "a subtest", "PASS", "FAIL", known_intermittent=["PASS"]
+ )
+ logger.test_end("test_bar", "OK")
+ logger.test_start("test_baz")
+ logger.test_end(
+ "test_baz", "FAIL", "FAIL", "expected 0 got 1", known_intermittent=["PASS"]
+ )
+ logger.suite_end()
+
+ buf = logger.handlers[0].stream
+ result = buf.getvalue()
+ print("Dumping result for copy/paste:")
+ print(result)
+ assert result == expected
+
+
+class FormatterTest(unittest.TestCase):
+ def setUp(self):
+ self.position = 0
+ self.logger = StructuredLogger("test_%s" % type(self).__name__)
+ self.output_file = StringIO()
+ self.handler = StreamHandler(self.output_file, self.get_formatter())
+ self.logger.add_handler(self.handler)
+
+ def set_position(self, pos=None):
+ if pos is None:
+ pos = self.output_file.tell()
+ self.position = pos
+
+ def get_formatter(self):
+ raise NotImplementedError(
+ "FormatterTest subclasses must implement get_formatter"
+ )
+
+ @property
+ def loglines(self):
+ self.output_file.seek(self.position)
+ return [ensure_text(line.rstrip()) for line in self.output_file.readlines()]
+
+
+class TestHTMLFormatter(FormatterTest):
+ def get_formatter(self):
+ return HTMLFormatter()
+
+ def test_base64_string(self):
+ self.logger.suite_start([])
+ self.logger.test_start("string_test")
+ self.logger.test_end("string_test", "FAIL", extra={"data": "foobar"})
+ self.logger.suite_end()
+ self.assertIn("data:text/html;charset=utf-8;base64,Zm9vYmFy", self.loglines[-3])
+
+ def test_base64_unicode(self):
+ self.logger.suite_start([])
+ self.logger.test_start("unicode_test")
+ self.logger.test_end("unicode_test", "FAIL", extra={"data": unichr(0x02A9)})
+ self.logger.suite_end()
+ self.assertIn("data:text/html;charset=utf-8;base64,yqk=", self.loglines[-3])
+
+ def test_base64_other(self):
+ self.logger.suite_start([])
+ self.logger.test_start("int_test")
+ self.logger.test_end("int_test", "FAIL", extra={"data": {"foo": "bar"}})
+ self.logger.suite_end()
+ self.assertIn(
+ "data:text/html;charset=utf-8;base64,eyJmb28iOiAiYmFyIn0=",
+ self.loglines[-3],
+ )
+
+
+class TestTBPLFormatter(FormatterTest):
+ def get_formatter(self):
+ return TbplFormatter()
+
+ def test_unexpected_message(self):
+ self.logger.suite_start([])
+ self.logger.test_start("timeout_test")
+ self.logger.test_end("timeout_test", "TIMEOUT", message="timed out")
+ self.assertIn(
+ "TEST-UNEXPECTED-TIMEOUT | timeout_test | timed out", self.loglines
+ )
+ self.logger.suite_end()
+
+ def test_default_unexpected_end_message(self):
+ self.logger.suite_start([])
+ self.logger.test_start("timeout_test")
+ self.logger.test_end("timeout_test", "TIMEOUT")
+ self.assertIn(
+ "TEST-UNEXPECTED-TIMEOUT | timeout_test | expected OK", self.loglines
+ )
+ self.logger.suite_end()
+
+ def test_default_unexpected_status_message(self):
+ self.logger.suite_start([])
+ self.logger.test_start("timeout_test")
+ self.logger.test_status("timeout_test", "subtest", status="TIMEOUT")
+ self.assertIn(
+ "TEST-UNEXPECTED-TIMEOUT | timeout_test | subtest - expected PASS",
+ self.loglines,
+ )
+ self.logger.test_end("timeout_test", "OK")
+ self.logger.suite_end()
+
+ def test_known_intermittent_end(self):
+ self.logger.suite_start([])
+ self.logger.test_start("intermittent_test")
+ self.logger.test_end(
+ "intermittent_test",
+ status="FAIL",
+ expected="PASS",
+ known_intermittent=["FAIL"],
+ )
+ # test_end log format:
+ # "TEST-KNOWN-INTERMITTENT-<STATUS> | <test> | took <duration>ms"
+ # where duration may be different each time
+ self.assertIn(
+ "TEST-KNOWN-INTERMITTENT-FAIL | intermittent_test | took ", self.loglines[2]
+ )
+ self.assertIn("ms", self.loglines[2])
+ self.logger.suite_end()
+
+ def test_known_intermittent_status(self):
+ self.logger.suite_start([])
+ self.logger.test_start("intermittent_test")
+ self.logger.test_status(
+ "intermittent_test",
+ "subtest",
+ status="FAIL",
+ expected="PASS",
+ known_intermittent=["FAIL"],
+ )
+ self.assertIn(
+ "TEST-KNOWN-INTERMITTENT-FAIL | intermittent_test | subtest", self.loglines
+ )
+ self.logger.test_end("intermittent_test", "OK")
+ self.logger.suite_end()
+
+ def test_single_newline(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.set_position()
+ self.logger.test_status("test1", "subtest", status="PASS", expected="FAIL")
+ self.logger.test_end("test1", "OK")
+ self.logger.suite_end()
+
+ # This sequence should not produce blanklines
+ for line in self.loglines:
+ self.assertNotEqual("", line)
+
+ def test_process_exit(self):
+ self.logger.process_exit(1234, 0)
+ self.assertIn("TEST-INFO | 1234: exit 0", self.loglines)
+
+ @unittest.skipUnless(os.name == "posix", "posix only")
+ def test_process_exit_with_sig(self):
+ # subprocess return code is negative when process
+ # has been killed by signal on posix.
+ self.logger.process_exit(1234, -signal.SIGTERM)
+ self.assertIn("TEST-INFO | 1234: killed by SIGTERM", self.loglines)
+
+
+class TestTBPLFormatterWithShutdown(FormatterTest):
+ def get_formatter(self):
+ return TbplFormatter(summary_on_shutdown=True)
+
+ def test_suite_summary_on_shutdown(self):
+ self.logger.suite_start([])
+ self.logger.test_start("summary_test")
+ self.logger.test_status(
+ "summary_test", "subtest", "FAIL", "PASS", known_intermittent=["FAIL"]
+ )
+ self.logger.test_end("summary_test", "FAIL", "OK", known_intermittent=["FAIL"])
+ self.logger.suite_end()
+ self.logger.shutdown()
+
+ self.assertIn("suite 1: 2/2 (2 known intermittent tests)", self.loglines)
+ self.assertIn("Known Intermittent tests:", self.loglines)
+ self.assertIn(
+ "TEST-KNOWN-INTERMITTENT-FAIL | summary_test | subtest", self.loglines
+ )
+
+
+class TestMachFormatter(FormatterTest):
+ def get_formatter(self):
+ return MachFormatter(disable_colors=True)
+
+ def test_summary(self):
+ self.logger.suite_start([])
+
+ # Some tests that pass
+ self.logger.test_start("test1")
+ self.logger.test_end("test1", status="PASS", expected="PASS")
+
+ self.logger.test_start("test2")
+ self.logger.test_end("test2", status="PASS", expected="TIMEOUT")
+
+ self.logger.test_start("test3")
+ self.logger.test_end("test3", status="FAIL", expected="PASS")
+
+ self.set_position()
+ self.logger.suite_end()
+
+ self.assertIn("Ran 3 checks (3 tests)", self.loglines)
+ self.assertIn("Expected results: 1", self.loglines)
+ self.assertIn(
+ """
+Unexpected results: 2
+ test: 2 (1 fail, 1 pass)
+""".strip(),
+ "\n".join(self.loglines),
+ )
+ self.assertNotIn("test1", self.loglines)
+ self.assertIn("UNEXPECTED-PASS test2", self.loglines)
+ self.assertIn("FAIL test3", self.loglines)
+
+ def test_summary_subtests(self):
+ self.logger.suite_start([])
+
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "subtest1", status="PASS")
+ self.logger.test_status("test1", "subtest2", status="FAIL")
+ self.logger.test_end("test1", status="OK", expected="OK")
+
+ self.logger.test_start("test2")
+ self.logger.test_status("test2", "subtest1", status="TIMEOUT", expected="PASS")
+ self.logger.test_end("test2", status="TIMEOUT", expected="OK")
+
+ self.set_position()
+ self.logger.suite_end()
+
+ self.assertIn("Ran 5 checks (3 subtests, 2 tests)", self.loglines)
+ self.assertIn("Expected results: 2", self.loglines)
+ self.assertIn(
+ """
+Unexpected results: 3
+ test: 1 (1 timeout)
+ subtest: 2 (1 fail, 1 timeout)
+""".strip(),
+ "\n".join(self.loglines),
+ )
+
+ def test_summary_ok(self):
+ self.logger.suite_start([])
+
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "subtest1", status="PASS")
+ self.logger.test_status("test1", "subtest2", status="PASS")
+ self.logger.test_end("test1", status="OK", expected="OK")
+
+ self.logger.test_start("test2")
+ self.logger.test_status("test2", "subtest1", status="PASS", expected="PASS")
+ self.logger.test_end("test2", status="OK", expected="OK")
+
+ self.set_position()
+ self.logger.suite_end()
+
+ self.assertIn("OK", self.loglines)
+ self.assertIn("Expected results: 5", self.loglines)
+ self.assertIn("Unexpected results: 0", self.loglines)
+
+ def test_process_start(self):
+ self.logger.process_start(1234)
+ self.assertIn("Started process `1234`", self.loglines[0])
+
+ def test_process_start_with_command(self):
+ self.logger.process_start(1234, command="test cmd")
+ self.assertIn("Started process `1234` (test cmd)", self.loglines[0])
+
+ def test_process_exit(self):
+ self.logger.process_exit(1234, 0)
+ self.assertIn("1234: exit 0", self.loglines[0])
+
+ @unittest.skipUnless(os.name == "posix", "posix only")
+ def test_process_exit_with_sig(self):
+ # subprocess return code is negative when process
+ # has been killed by signal on posix.
+ self.logger.process_exit(1234, -signal.SIGTERM)
+ self.assertIn("1234: killed by SIGTERM", self.loglines[0])
+
+
+class TestGroupingFormatter(FormatterTest):
+ def get_formatter(self):
+ return GroupingFormatter()
+
+ def test_results_total(self):
+ self.logger.suite_start([])
+
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "subtest1", status="PASS")
+ self.logger.test_status("test1", "subtest1", status="PASS")
+ self.logger.test_end("test1", status="OK")
+
+ self.logger.test_start("test2")
+ self.logger.test_status(
+ "test2",
+ "subtest2",
+ status="FAIL",
+ expected="PASS",
+ known_intermittent=["FAIL"],
+ )
+ self.logger.test_end("test2", status="FAIL", expected="OK")
+
+ self.set_position()
+ self.logger.suite_end()
+
+ self.assertIn("Ran 2 tests finished in 0.0 seconds.", self.loglines)
+ self.assertIn(" \u2022 1 ran as expected. 0 tests skipped.", self.loglines)
+ self.assertIn(" \u2022 1 known intermittent results.", self.loglines)
+ self.assertIn(" \u2022 1 tests failed unexpectedly", self.loglines)
+ self.assertIn(" \u25B6 FAIL [expected OK] test2", self.loglines)
+ self.assertIn(
+ " \u25B6 FAIL [expected PASS, known intermittent [FAIL] test2, subtest2",
+ self.loglines,
+ )
+
+
+class TestXUnitFormatter(FormatterTest):
+ def get_formatter(self):
+ return XUnitFormatter()
+
+ def log_as_xml(self):
+ return ET.fromstring("\n".join(self.loglines))
+
+ def test_stacktrace_is_present(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end(
+ "test1", "fail", message="Test message", stack="this\nis\na\nstack"
+ )
+ self.logger.suite_end()
+
+ root = self.log_as_xml()
+ self.assertIn("this\nis\na\nstack", root.find("testcase/failure").text)
+
+ def test_failure_message(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end("test1", "fail", message="Test message")
+ self.logger.suite_end()
+
+ root = self.log_as_xml()
+ self.assertEqual(
+ "Expected OK, got FAIL", root.find("testcase/failure").get("message")
+ )
+
+ def test_suite_attrs(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end("test1", "ok", message="Test message")
+ self.logger.suite_end()
+
+ root = self.log_as_xml()
+ self.assertEqual(root.get("skips"), "0")
+ self.assertEqual(root.get("failures"), "0")
+ self.assertEqual(root.get("errors"), "0")
+ self.assertEqual(root.get("tests"), "1")
+
+ def test_time_is_not_rounded(self):
+ # call formatter directly, it is easier here
+ formatter = self.get_formatter()
+ formatter.suite_start(dict(time=55000))
+ formatter.test_start(dict(time=55100))
+ formatter.test_end(
+ dict(time=55558, test="id", message="message", status="PASS")
+ )
+ xml_string = formatter.suite_end(dict(time=55559))
+
+ root = ET.fromstring(xml_string)
+ self.assertEqual(root.get("time"), "0.56")
+ self.assertEqual(root.find("testcase").get("time"), "0.46")
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozlog/tests/test_logger.py b/testing/mozbase/mozlog/tests/test_logger.py
new file mode 100644
index 0000000000..0776d87000
--- /dev/null
+++ b/testing/mozbase/mozlog/tests/test_logger.py
@@ -0,0 +1,303 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import datetime
+import json
+import socket
+import threading
+import time
+import unittest
+
+import mozfile
+import mozlog.unstructured as mozlog
+import mozunit
+import six
+
+
+class ListHandler(mozlog.Handler):
+ """Mock handler appends messages to a list for later inspection."""
+
+ def __init__(self):
+ mozlog.Handler.__init__(self)
+ self.messages = []
+
+ def emit(self, record):
+ self.messages.append(self.format(record))
+
+
+class TestLogging(unittest.TestCase):
+ """Tests behavior of basic mozlog api."""
+
+ def test_logger_defaults(self):
+ """Tests the default logging format and behavior."""
+
+ default_logger = mozlog.getLogger("default.logger")
+ self.assertEqual(default_logger.name, "default.logger")
+ self.assertEqual(len(default_logger.handlers), 1)
+ self.assertTrue(isinstance(default_logger.handlers[0], mozlog.StreamHandler))
+
+ f = mozfile.NamedTemporaryFile()
+ list_logger = mozlog.getLogger(
+ "file.logger", handler=mozlog.FileHandler(f.name)
+ )
+ self.assertEqual(len(list_logger.handlers), 1)
+ self.assertTrue(isinstance(list_logger.handlers[0], mozlog.FileHandler))
+ f.close()
+
+ self.assertRaises(
+ ValueError, mozlog.getLogger, "file.logger", handler=ListHandler()
+ )
+
+ def test_timestamps(self):
+ """Verifies that timestamps are included when asked for."""
+ log_name = "test"
+ handler = ListHandler()
+ handler.setFormatter(mozlog.MozFormatter())
+ log = mozlog.getLogger(log_name, handler=handler)
+ log.info("no timestamp")
+ self.assertTrue(handler.messages[-1].startswith("%s " % log_name))
+ handler.setFormatter(mozlog.MozFormatter(include_timestamp=True))
+ log.info("timestamp")
+ # Just verify that this raises no exceptions.
+ datetime.datetime.strptime(handler.messages[-1][:23], "%Y-%m-%d %H:%M:%S,%f")
+
+
+class TestStructuredLogging(unittest.TestCase):
+ """Tests structured output in mozlog."""
+
+ def setUp(self):
+ self.handler = ListHandler()
+ self.handler.setFormatter(mozlog.JSONFormatter())
+ self.logger = mozlog.MozLogger("test.Logger")
+ self.logger.addHandler(self.handler)
+ self.logger.setLevel(mozlog.DEBUG)
+
+ def check_messages(self, expected, actual):
+ """Checks actual for equality with corresponding fields in actual.
+ The actual message should contain all fields in expected, and
+ should be identical, with the exception of the timestamp field.
+ The actual message should contain no fields other than the timestamp
+ field and those present in expected."""
+
+ self.assertTrue(isinstance(actual["_time"], six.integer_types))
+
+ for k, v in expected.items():
+ self.assertEqual(v, actual[k])
+
+ for k in actual.keys():
+ if k != "_time":
+ self.assertTrue(expected.get(k) is not None)
+
+ def test_structured_output(self):
+ self.logger.log_structured(
+ "test_message", {"_level": mozlog.INFO, "_message": "message one"}
+ )
+ self.logger.log_structured(
+ "test_message", {"_level": mozlog.INFO, "_message": "message two"}
+ )
+ self.logger.log_structured(
+ "error_message", {"_level": mozlog.ERROR, "diagnostic": "unexpected error"}
+ )
+
+ message_one_expected = {
+ "_namespace": "test.Logger",
+ "_level": "INFO",
+ "_message": "message one",
+ "action": "test_message",
+ }
+ message_two_expected = {
+ "_namespace": "test.Logger",
+ "_level": "INFO",
+ "_message": "message two",
+ "action": "test_message",
+ }
+ message_three_expected = {
+ "_namespace": "test.Logger",
+ "_level": "ERROR",
+ "diagnostic": "unexpected error",
+ "action": "error_message",
+ }
+
+ message_one_actual = json.loads(self.handler.messages[0])
+ message_two_actual = json.loads(self.handler.messages[1])
+ message_three_actual = json.loads(self.handler.messages[2])
+
+ self.check_messages(message_one_expected, message_one_actual)
+ self.check_messages(message_two_expected, message_two_actual)
+ self.check_messages(message_three_expected, message_three_actual)
+
+ def test_unstructured_conversion(self):
+ """Tests that logging to a logger with a structured formatter
+ via the traditional logging interface works as expected."""
+ self.logger.info("%s %s %d", "Message", "number", 1)
+ self.logger.error("Message number 2")
+ self.logger.debug(
+ "Message with %s",
+ "some extras",
+ extra={"params": {"action": "mozlog_test_output", "is_failure": False}},
+ )
+ message_one_expected = {
+ "_namespace": "test.Logger",
+ "_level": "INFO",
+ "_message": "Message number 1",
+ }
+ message_two_expected = {
+ "_namespace": "test.Logger",
+ "_level": "ERROR",
+ "_message": "Message number 2",
+ }
+ message_three_expected = {
+ "_namespace": "test.Logger",
+ "_level": "DEBUG",
+ "_message": "Message with some extras",
+ "action": "mozlog_test_output",
+ "is_failure": False,
+ }
+
+ message_one_actual = json.loads(self.handler.messages[0])
+ message_two_actual = json.loads(self.handler.messages[1])
+ message_three_actual = json.loads(self.handler.messages[2])
+
+ self.check_messages(message_one_expected, message_one_actual)
+ self.check_messages(message_two_expected, message_two_actual)
+ self.check_messages(message_three_expected, message_three_actual)
+
+ def message_callback(self):
+ if len(self.handler.messages) == 3:
+ message_one_expected = {
+ "_namespace": "test.Logger",
+ "_level": "DEBUG",
+ "_message": "socket message one",
+ "action": "test_message",
+ }
+ message_two_expected = {
+ "_namespace": "test.Logger",
+ "_level": "DEBUG",
+ "_message": "socket message two",
+ "action": "test_message",
+ }
+ message_three_expected = {
+ "_namespace": "test.Logger",
+ "_level": "DEBUG",
+ "_message": "socket message three",
+ "action": "test_message",
+ }
+
+ message_one_actual = json.loads(self.handler.messages[0])
+
+ message_two_actual = json.loads(self.handler.messages[1])
+
+ message_three_actual = json.loads(self.handler.messages[2])
+
+ self.check_messages(message_one_expected, message_one_actual)
+ self.check_messages(message_two_expected, message_two_actual)
+ self.check_messages(message_three_expected, message_three_actual)
+
+ def test_log_listener(self):
+ connection = "127.0.0.1", 0
+ self.log_server = mozlog.LogMessageServer(
+ connection, self.logger, message_callback=self.message_callback, timeout=0.5
+ )
+
+ message_string_one = json.dumps(
+ {
+ "_message": "socket message one",
+ "action": "test_message",
+ "_level": "DEBUG",
+ }
+ )
+ message_string_two = json.dumps(
+ {
+ "_message": "socket message two",
+ "action": "test_message",
+ "_level": "DEBUG",
+ }
+ )
+
+ message_string_three = json.dumps(
+ {
+ "_message": "socket message three",
+ "action": "test_message",
+ "_level": "DEBUG",
+ }
+ )
+
+ message_string = (
+ message_string_one
+ + "\n"
+ + message_string_two
+ + "\n"
+ + message_string_three
+ + "\n"
+ )
+
+ server_thread = threading.Thread(target=self.log_server.handle_request)
+ server_thread.start()
+
+ host, port = self.log_server.server_address
+
+ sock = socket.socket()
+ sock.connect((host, port))
+
+ # Sleeps prevent listener from receiving entire message in a single call
+ # to recv in order to test reconstruction of partial messages.
+ sock.sendall(message_string[:8].encode())
+ time.sleep(0.01)
+ sock.sendall(message_string[8:32].encode())
+ time.sleep(0.01)
+ sock.sendall(message_string[32:64].encode())
+ time.sleep(0.01)
+ sock.sendall(message_string[64:128].encode())
+ time.sleep(0.01)
+ sock.sendall(message_string[128:].encode())
+
+ server_thread.join()
+
+
+class Loggable(mozlog.LoggingMixin):
+ """Trivial class inheriting from LoggingMixin"""
+
+ pass
+
+
+class TestLoggingMixin(unittest.TestCase):
+ """Tests basic use of LoggingMixin"""
+
+ def test_mixin(self):
+ loggable = Loggable()
+ self.assertTrue(not hasattr(loggable, "_logger"))
+ loggable.log(mozlog.INFO, "This will instantiate the logger")
+ self.assertTrue(hasattr(loggable, "_logger"))
+ self.assertEqual(loggable._logger.name, "test_logger.Loggable")
+
+ self.assertRaises(ValueError, loggable.set_logger, "not a logger")
+
+ logger = mozlog.MozLogger("test.mixin")
+ handler = ListHandler()
+ logger.addHandler(handler)
+ loggable.set_logger(logger)
+ self.assertTrue(isinstance(loggable._logger.handlers[0], ListHandler))
+ self.assertEqual(loggable._logger.name, "test.mixin")
+
+ loggable.log(mozlog.WARN, 'message for "log" method')
+ loggable.info('message for "info" method')
+ loggable.error('message for "error" method')
+ loggable.log_structured(
+ "test_message",
+ params={"_message": "message for " + '"log_structured" method'},
+ )
+
+ expected_messages = [
+ 'message for "log" method',
+ 'message for "info" method',
+ 'message for "error" method',
+ 'message for "log_structured" method',
+ ]
+
+ actual_messages = loggable._logger.handlers[0].messages
+ self.assertEqual(expected_messages, actual_messages)
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozlog/tests/test_logtypes.py b/testing/mozbase/mozlog/tests/test_logtypes.py
new file mode 100644
index 0000000000..177d25cdb0
--- /dev/null
+++ b/testing/mozbase/mozlog/tests/test_logtypes.py
@@ -0,0 +1,106 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import unittest
+
+import mozunit
+from mozlog.logtypes import Any, Dict, Int, List, TestList, Tuple, Unicode
+
+
+class TestContainerTypes(unittest.TestCase):
+ def test_dict_type_basic(self):
+ d = Dict("name")
+ with self.assertRaises(ValueError):
+ d({"foo": "bar"})
+
+ d = Dict(Any, "name")
+ d({"foo": "bar"}) # doesn't raise
+
+ def test_dict_type_with_dictionary_item_type(self):
+ d = Dict({Int: Int}, "name")
+ with self.assertRaises(ValueError):
+ d({"foo": 1})
+
+ with self.assertRaises(ValueError):
+ d({1: "foo"})
+
+ d({1: 2}) # doesn't raise
+
+ def test_dict_type_with_recursive_item_types(self):
+ d = Dict(Dict({Unicode: List(Int)}), "name")
+ with self.assertRaises(ValueError):
+ d({"foo": "bar"})
+
+ with self.assertRaises(ValueError):
+ d({"foo": {"bar": "baz"}})
+
+ with self.assertRaises(ValueError):
+ d({"foo": {"bar": ["baz"]}})
+
+ d({"foo": {"bar": [1]}}) # doesn't raise
+
+ def test_list_type_basic(self):
+ l = List("name")
+ with self.assertRaises(ValueError):
+ l(["foo"])
+
+ l = List(Any, "name")
+ l(["foo", 1]) # doesn't raise
+
+ def test_list_type_with_recursive_item_types(self):
+ l = List(Dict(List(Tuple((Unicode, Int)))), "name")
+ with self.assertRaises(ValueError):
+ l(["foo"])
+
+ with self.assertRaises(ValueError):
+ l([{"foo": "bar"}])
+
+ with self.assertRaises(ValueError):
+ l([{"foo": ["bar"]}])
+
+ l([{"foo": [("bar", 1)]}]) # doesn't raise
+
+ def test_tuple_type_basic(self):
+ t = Tuple("name")
+ with self.assertRaises(ValueError):
+ t((1,))
+
+ t = Tuple(Any, "name")
+ t((1,)) # doesn't raise
+
+ def test_tuple_type_with_tuple_item_type(self):
+ t = Tuple((Unicode, Int))
+ with self.assertRaises(ValueError):
+ t(("foo", "bar"))
+
+ t(("foo", 1)) # doesn't raise
+
+ def test_tuple_type_with_recursive_item_types(self):
+ t = Tuple((Dict(List(Any)), List(Dict(Any)), Unicode), "name")
+ with self.assertRaises(ValueError):
+ t(({"foo": "bar"}, [{"foo": "bar"}], "foo"))
+
+ with self.assertRaises(ValueError):
+ t(({"foo": ["bar"]}, ["foo"], "foo"))
+
+ t(({"foo": ["bar"]}, [{"foo": "bar"}], "foo")) # doesn't raise
+
+
+class TestDataTypes(unittest.TestCase):
+ def test_test_list(self):
+ t = TestList("name")
+ with self.assertRaises(ValueError):
+ t("foo")
+
+ with self.assertRaises(ValueError):
+ t({"foo": 1})
+
+ d1 = t({"default": ["bar"]}) # doesn't raise
+ d2 = t(["bar"]) # doesn't raise
+
+ self.assertDictContainsSubset(d1, d2)
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozlog/tests/test_structured.py b/testing/mozbase/mozlog/tests/test_structured.py
new file mode 100644
index 0000000000..8c4d2c2197
--- /dev/null
+++ b/testing/mozbase/mozlog/tests/test_structured.py
@@ -0,0 +1,1098 @@
+# -*- coding: utf-8 -*-
+
+import argparse
+import json
+import optparse
+import os
+import sys
+import unittest
+
+import mozfile
+import mozunit
+import six
+from mozlog import commandline, formatters, handlers, reader, stdadapter, structuredlog
+from six import StringIO
+
+
+class TestHandler(object):
+ def __init__(self):
+ self.items = []
+
+ def __call__(self, data):
+ self.items.append(data)
+
+ @property
+ def last_item(self):
+ return self.items[-1]
+
+ @property
+ def empty(self):
+ return not self.items
+
+
+class BaseStructuredTest(unittest.TestCase):
+ def setUp(self):
+ self.logger = structuredlog.StructuredLogger("test")
+ self.handler = TestHandler()
+ self.logger.add_handler(self.handler)
+
+ def pop_last_item(self):
+ return self.handler.items.pop()
+
+ def assert_log_equals(self, expected, actual=None):
+ if actual is None:
+ actual = self.pop_last_item()
+
+ all_expected = {"pid": os.getpid(), "thread": "MainThread", "source": "test"}
+ specials = set(["time"])
+
+ all_expected.update(expected)
+ for key, value in six.iteritems(all_expected):
+ self.assertEqual(actual[key], value)
+
+ self.assertEqual(set(all_expected.keys()) | specials, set(actual.keys()))
+
+
+class TestStatusHandler(BaseStructuredTest):
+ def setUp(self):
+ super(TestStatusHandler, self).setUp()
+ self.handler = handlers.StatusHandler()
+ self.logger.add_handler(self.handler)
+
+ def test_failure_run(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "sub1", status="PASS")
+ self.logger.test_status("test1", "sub2", status="TIMEOUT")
+ self.logger.test_status(
+ "test1", "sub3", status="FAIL", expected="PASS", known_intermittent=["FAIL"]
+ )
+ self.logger.test_end("test1", status="OK")
+ self.logger.suite_end()
+ summary = self.handler.summarize()
+ self.assertIn("TIMEOUT", summary.unexpected_statuses)
+ self.assertEqual(1, summary.unexpected_statuses["TIMEOUT"])
+ self.assertIn("PASS", summary.expected_statuses)
+ self.assertEqual(1, summary.expected_statuses["PASS"])
+ self.assertIn("OK", summary.expected_statuses)
+ self.assertEqual(1, summary.expected_statuses["OK"])
+ self.assertIn("FAIL", summary.expected_statuses)
+ self.assertEqual(1, summary.expected_statuses["FAIL"])
+ self.assertIn("FAIL", summary.known_intermittent_statuses)
+ self.assertEqual(1, summary.known_intermittent_statuses["FAIL"])
+ self.assertEqual(3, summary.action_counts["test_status"])
+ self.assertEqual(1, summary.action_counts["test_end"])
+
+ def test_precondition_failed_run(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end("test1", status="PRECONDITION_FAILED")
+ self.logger.test_start("test2")
+ self.logger.test_status("test2", "sub1", status="PRECONDITION_FAILED")
+ self.logger.test_end("test2", status="OK")
+ self.logger.suite_end()
+ summary = self.handler.summarize()
+ self.assertEqual(1, summary.expected_statuses["OK"])
+ self.assertEqual(2, summary.unexpected_statuses["PRECONDITION_FAILED"])
+
+ def test_error_run(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.error("ERRR!")
+ self.logger.test_end("test1", status="OK")
+ self.logger.test_start("test2")
+ self.logger.test_end("test2", status="OK")
+ self.logger.suite_end()
+ summary = self.handler.summarize()
+ self.assertIn("ERROR", summary.log_level_counts)
+ self.assertEqual(1, summary.log_level_counts["ERROR"])
+ self.assertIn("OK", summary.expected_statuses)
+ self.assertEqual(2, summary.expected_statuses["OK"])
+
+
+class TestSummaryHandler(BaseStructuredTest):
+ def setUp(self):
+ super(TestSummaryHandler, self).setUp()
+ self.handler = handlers.SummaryHandler()
+ self.logger.add_handler(self.handler)
+
+ def test_failure_run(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "sub1", status="PASS")
+ self.logger.test_status("test1", "sub2", status="TIMEOUT")
+ self.logger.assertion_count("test1", 5, 1, 10)
+ self.logger.assertion_count("test1", 5, 10, 15)
+ self.logger.test_end("test1", status="OK")
+ self.logger.suite_end()
+
+ counts = self.handler.current["counts"]
+ self.assertIn("timeout", counts["subtest"]["unexpected"])
+ self.assertEqual(1, counts["subtest"]["unexpected"]["timeout"])
+ self.assertIn("pass", counts["subtest"]["expected"])
+ self.assertEqual(1, counts["subtest"]["expected"]["pass"])
+ self.assertIn("ok", counts["test"]["expected"])
+ self.assertEqual(1, counts["test"]["expected"]["ok"])
+ self.assertIn("pass", counts["assert"]["unexpected"])
+ self.assertEqual(1, counts["assert"]["unexpected"]["pass"])
+ self.assertIn("fail", counts["assert"]["expected"])
+ self.assertEqual(1, counts["assert"]["expected"]["fail"])
+
+ logs = self.handler.current["unexpected_logs"]
+ self.assertEqual(1, len(logs))
+ self.assertIn("test1", logs)
+ self.assertEqual(1, len(logs["test1"]))
+ self.assertEqual("sub2", logs["test1"][0]["subtest"])
+
+ def test_precondition_failed_run(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "sub1", status="PASS")
+ self.logger.test_end("test1", status="PRECONDITION_FAILED")
+ self.logger.test_start("test2")
+ self.logger.test_status("test2", "sub1", status="PRECONDITION_FAILED")
+ self.logger.test_status("test2", "sub2", status="PRECONDITION_FAILED")
+ self.logger.test_end("test2", status="OK")
+ self.logger.suite_end()
+
+ counts = self.handler.current["counts"]
+ self.assertIn("precondition_failed", counts["test"]["unexpected"])
+ self.assertEqual(1, counts["test"]["unexpected"]["precondition_failed"])
+ self.assertIn("pass", counts["subtest"]["expected"])
+ self.assertEqual(1, counts["subtest"]["expected"]["pass"])
+ self.assertIn("ok", counts["test"]["expected"])
+ self.assertEqual(1, counts["test"]["expected"]["ok"])
+ self.assertIn("precondition_failed", counts["subtest"]["unexpected"])
+ self.assertEqual(2, counts["subtest"]["unexpected"]["precondition_failed"])
+
+
+class TestStructuredLog(BaseStructuredTest):
+ def test_suite_start(self):
+ self.logger.suite_start(["test"], "logtest")
+ self.assert_log_equals(
+ {"action": "suite_start", "name": "logtest", "tests": {"default": ["test"]}}
+ )
+ self.logger.suite_end()
+
+ def test_suite_end(self):
+ self.logger.suite_start([])
+ self.logger.suite_end()
+ self.assert_log_equals({"action": "suite_end"})
+
+ def test_start(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.assert_log_equals({"action": "test_start", "test": "test1"})
+
+ self.logger.test_start(("test1", "==", "test1-ref"), path="path/to/test")
+ self.assert_log_equals(
+ {
+ "action": "test_start",
+ "test": ("test1", "==", "test1-ref"),
+ "path": "path/to/test",
+ }
+ )
+ self.logger.suite_end()
+
+ def test_start_inprogress(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_start("test1")
+ self.assert_log_equals(
+ {
+ "action": "log",
+ "message": "test_start for test1 logged while in progress.",
+ "level": "ERROR",
+ }
+ )
+ self.logger.suite_end()
+
+ def test_status(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status(
+ "test1", "subtest name", "fail", expected="FAIL", message="Test message"
+ )
+ self.assert_log_equals(
+ {
+ "action": "test_status",
+ "subtest": "subtest name",
+ "status": "FAIL",
+ "message": "Test message",
+ "test": "test1",
+ }
+ )
+ self.logger.test_end("test1", "OK")
+ self.logger.suite_end()
+
+ def test_status_1(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "subtest name", "fail")
+ self.assert_log_equals(
+ {
+ "action": "test_status",
+ "subtest": "subtest name",
+ "status": "FAIL",
+ "expected": "PASS",
+ "test": "test1",
+ }
+ )
+ self.logger.test_end("test1", "OK")
+ self.logger.suite_end()
+
+ def test_status_2(self):
+ self.assertRaises(
+ ValueError,
+ self.logger.test_status,
+ "test1",
+ "subtest name",
+ "XXXUNKNOWNXXX",
+ )
+
+ def test_status_extra(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status(
+ "test1", "subtest name", "FAIL", expected="PASS", extra={"data": 42}
+ )
+ self.assert_log_equals(
+ {
+ "action": "test_status",
+ "subtest": "subtest name",
+ "status": "FAIL",
+ "expected": "PASS",
+ "test": "test1",
+ "extra": {"data": 42},
+ }
+ )
+ self.logger.test_end("test1", "OK")
+ self.logger.suite_end()
+
+ def test_status_stack(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status(
+ "test1",
+ "subtest name",
+ "FAIL",
+ expected="PASS",
+ stack="many\nlines\nof\nstack",
+ )
+ self.assert_log_equals(
+ {
+ "action": "test_status",
+ "subtest": "subtest name",
+ "status": "FAIL",
+ "expected": "PASS",
+ "test": "test1",
+ "stack": "many\nlines\nof\nstack",
+ }
+ )
+ self.logger.test_end("test1", "OK")
+ self.logger.suite_end()
+
+ def test_status_known_intermittent(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status(
+ "test1", "subtest name", "fail", known_intermittent=["FAIL"]
+ )
+ self.assert_log_equals(
+ {
+ "action": "test_status",
+ "subtest": "subtest name",
+ "status": "FAIL",
+ "expected": "PASS",
+ "known_intermittent": ["FAIL"],
+ "test": "test1",
+ }
+ )
+ self.logger.test_end("test1", "OK")
+ self.logger.suite_end()
+
+ def test_status_not_started(self):
+ self.logger.test_status("test_UNKNOWN", "subtest", "PASS")
+ self.assertTrue(
+ self.pop_last_item()["message"].startswith(
+ "test_status for test_UNKNOWN logged while not in progress. Logged with data: {"
+ )
+ )
+
+ def test_remove_optional_defaults(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status(
+ "test1", "subtest name", "fail", message=None, stack=None
+ )
+ self.assert_log_equals(
+ {
+ "action": "test_status",
+ "subtest": "subtest name",
+ "status": "FAIL",
+ "expected": "PASS",
+ "test": "test1",
+ }
+ )
+ self.logger.test_end("test1", "OK")
+ self.logger.suite_end()
+
+ def test_remove_optional_defaults_raw_log(self):
+ self.logger.log_raw({"action": "suite_start", "tests": [1], "name": None})
+ self.assert_log_equals({"action": "suite_start", "tests": {"default": ["1"]}})
+ self.logger.suite_end()
+
+ def test_end(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end("test1", "fail", message="Test message")
+ self.assert_log_equals(
+ {
+ "action": "test_end",
+ "status": "FAIL",
+ "expected": "OK",
+ "message": "Test message",
+ "test": "test1",
+ }
+ )
+ self.logger.suite_end()
+
+ def test_end_1(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end("test1", "PASS", expected="PASS", extra={"data": 123})
+ self.assert_log_equals(
+ {
+ "action": "test_end",
+ "status": "PASS",
+ "extra": {"data": 123},
+ "test": "test1",
+ }
+ )
+ self.logger.suite_end()
+
+ def test_end_2(self):
+ self.assertRaises(ValueError, self.logger.test_end, "test1", "XXXUNKNOWNXXX")
+
+ def test_end_stack(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end(
+ "test1", "PASS", expected="PASS", stack="many\nlines\nof\nstack"
+ )
+ self.assert_log_equals(
+ {
+ "action": "test_end",
+ "status": "PASS",
+ "test": "test1",
+ "stack": "many\nlines\nof\nstack",
+ }
+ )
+ self.logger.suite_end()
+
+ def test_end_no_start(self):
+ self.logger.test_end("test1", "PASS", expected="PASS")
+ self.assertTrue(
+ self.pop_last_item()["message"].startswith(
+ "test_end for test1 logged while not in progress. Logged with data: {"
+ )
+ )
+ self.logger.suite_end()
+
+ def test_end_twice(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test2")
+ self.logger.test_end("test2", "PASS", expected="PASS")
+ self.assert_log_equals(
+ {"action": "test_end", "status": "PASS", "test": "test2"}
+ )
+ self.logger.test_end("test2", "PASS", expected="PASS")
+ last_item = self.pop_last_item()
+ self.assertEqual(last_item["action"], "log")
+ self.assertEqual(last_item["level"], "ERROR")
+ self.assertTrue(
+ last_item["message"].startswith(
+ "test_end for test2 logged while not in progress. Logged with data: {"
+ )
+ )
+ self.logger.suite_end()
+
+ def test_suite_start_twice(self):
+ self.logger.suite_start([])
+ self.assert_log_equals({"action": "suite_start", "tests": {"default": []}})
+ self.logger.suite_start([])
+ last_item = self.pop_last_item()
+ self.assertEqual(last_item["action"], "log")
+ self.assertEqual(last_item["level"], "ERROR")
+ self.logger.suite_end()
+
+ def test_suite_end_no_start(self):
+ self.logger.suite_start([])
+ self.assert_log_equals({"action": "suite_start", "tests": {"default": []}})
+ self.logger.suite_end()
+ self.assert_log_equals({"action": "suite_end"})
+ self.logger.suite_end()
+ last_item = self.pop_last_item()
+ self.assertEqual(last_item["action"], "log")
+ self.assertEqual(last_item["level"], "ERROR")
+
+ def test_multiple_loggers_suite_start(self):
+ logger1 = structuredlog.StructuredLogger("test")
+ self.logger.suite_start([])
+ logger1.suite_start([])
+ last_item = self.pop_last_item()
+ self.assertEqual(last_item["action"], "log")
+ self.assertEqual(last_item["level"], "ERROR")
+
+ def test_multiple_loggers_test_start(self):
+ logger1 = structuredlog.StructuredLogger("test")
+ self.logger.suite_start([])
+ self.logger.test_start("test")
+ logger1.test_start("test")
+ last_item = self.pop_last_item()
+ self.assertEqual(last_item["action"], "log")
+ self.assertEqual(last_item["level"], "ERROR")
+
+ def test_process(self):
+ self.logger.process_output(1234, "test output")
+ self.assert_log_equals(
+ {"action": "process_output", "process": "1234", "data": "test output"}
+ )
+
+ def test_process_start(self):
+ self.logger.process_start(1234)
+ self.assert_log_equals({"action": "process_start", "process": "1234"})
+
+ def test_process_exit(self):
+ self.logger.process_exit(1234, 0)
+ self.assert_log_equals(
+ {"action": "process_exit", "process": "1234", "exitcode": 0}
+ )
+
+ def test_log(self):
+ for level in ["critical", "error", "warning", "info", "debug"]:
+ getattr(self.logger, level)("message")
+ self.assert_log_equals(
+ {"action": "log", "level": level.upper(), "message": "message"}
+ )
+
+ def test_logging_adapter(self):
+ import logging
+
+ logging.basicConfig(level="DEBUG")
+ old_level = logging.root.getEffectiveLevel()
+ logging.root.setLevel("DEBUG")
+
+ std_logger = logging.getLogger("test")
+ std_logger.setLevel("DEBUG")
+
+ logger = stdadapter.std_logging_adapter(std_logger)
+
+ try:
+ for level in ["critical", "error", "warning", "info", "debug"]:
+ getattr(logger, level)("message")
+ self.assert_log_equals(
+ {"action": "log", "level": level.upper(), "message": "message"}
+ )
+ finally:
+ logging.root.setLevel(old_level)
+
+ def test_add_remove_handlers(self):
+ handler = TestHandler()
+ self.logger.add_handler(handler)
+ self.logger.info("test1")
+
+ self.assert_log_equals({"action": "log", "level": "INFO", "message": "test1"})
+
+ self.assert_log_equals(
+ {"action": "log", "level": "INFO", "message": "test1"},
+ actual=handler.last_item,
+ )
+
+ self.logger.remove_handler(handler)
+ self.logger.info("test2")
+
+ self.assert_log_equals({"action": "log", "level": "INFO", "message": "test2"})
+
+ self.assert_log_equals(
+ {"action": "log", "level": "INFO", "message": "test1"},
+ actual=handler.last_item,
+ )
+
+ def test_wrapper(self):
+ file_like = structuredlog.StructuredLogFileLike(self.logger)
+
+ file_like.write("line 1")
+
+ self.assert_log_equals({"action": "log", "level": "INFO", "message": "line 1"})
+
+ file_like.write("line 2\n")
+
+ self.assert_log_equals({"action": "log", "level": "INFO", "message": "line 2"})
+
+ file_like.write("line 3\r")
+
+ self.assert_log_equals({"action": "log", "level": "INFO", "message": "line 3"})
+
+ file_like.write("line 4\r\n")
+
+ self.assert_log_equals({"action": "log", "level": "INFO", "message": "line 4"})
+
+ def test_shutdown(self):
+ # explicit shutdown
+ log = structuredlog.StructuredLogger("test 1")
+ log.add_handler(self.handler)
+ log.info("line 1")
+ self.assert_log_equals(
+ {"action": "log", "level": "INFO", "message": "line 1", "source": "test 1"}
+ )
+ log.shutdown()
+ self.assert_log_equals({"action": "shutdown", "source": "test 1"})
+ with self.assertRaises(structuredlog.LoggerShutdownError):
+ log.info("bad log")
+ with self.assertRaises(structuredlog.LoggerShutdownError):
+ log.log_raw({"action": "log", "level": "info", "message": "bad log"})
+
+ # shutdown still applies to new instances
+ del log
+ log = structuredlog.StructuredLogger("test 1")
+ with self.assertRaises(structuredlog.LoggerShutdownError):
+ log.info("bad log")
+
+ # context manager shutdown
+ with structuredlog.StructuredLogger("test 2") as log:
+ log.add_handler(self.handler)
+ log.info("line 2")
+ self.assert_log_equals(
+ {
+ "action": "log",
+ "level": "INFO",
+ "message": "line 2",
+ "source": "test 2",
+ }
+ )
+ self.assert_log_equals({"action": "shutdown", "source": "test 2"})
+
+ # shutdown prevents logging across instances
+ log1 = structuredlog.StructuredLogger("test 3")
+ log2 = structuredlog.StructuredLogger("test 3", component="bar")
+ log1.shutdown()
+ with self.assertRaises(structuredlog.LoggerShutdownError):
+ log2.info("line 3")
+
+
+class TestTypeConversions(BaseStructuredTest):
+ def test_raw(self):
+ self.logger.log_raw({"action": "suite_start", "tests": [1], "time": "1234"})
+ self.assert_log_equals(
+ {"action": "suite_start", "tests": {"default": ["1"]}, "time": 1234}
+ )
+ self.logger.suite_end()
+
+ def test_tuple(self):
+ self.logger.suite_start([])
+ if six.PY3:
+ self.logger.test_start(
+ (
+ b"\xf0\x90\x8d\x84\xf0\x90\x8c\xb4\xf0\x90"
+ b"\x8d\x83\xf0\x90\x8d\x84".decode(),
+ 42,
+ u"\u16a4",
+ )
+ )
+ else:
+ self.logger.test_start(
+ (
+ "\xf0\x90\x8d\x84\xf0\x90\x8c\xb4\xf0\x90"
+ "\x8d\x83\xf0\x90\x8d\x84",
+ 42,
+ u"\u16a4",
+ )
+ )
+ self.assert_log_equals(
+ {
+ "action": "test_start",
+ "test": (u"\U00010344\U00010334\U00010343\U00010344", u"42", u"\u16a4"),
+ }
+ )
+ self.logger.suite_end()
+
+ def test_non_string_messages(self):
+ self.logger.suite_start([])
+ self.logger.info(1)
+ self.assert_log_equals({"action": "log", "message": "1", "level": "INFO"})
+ self.logger.info([1, (2, "3"), "s", "s" + chr(255)])
+ if six.PY3:
+ self.assert_log_equals(
+ {
+ "action": "log",
+ "message": "[1, (2, '3'), 's', 's\xff']",
+ "level": "INFO",
+ }
+ )
+ else:
+ self.assert_log_equals(
+ {
+ "action": "log",
+ "message": "[1, (2, '3'), 's', 's\\xff']",
+ "level": "INFO",
+ }
+ )
+
+ self.logger.suite_end()
+
+ def test_utf8str_write(self):
+ with mozfile.NamedTemporaryFile() as logfile:
+ _fmt = formatters.TbplFormatter()
+ _handler = handlers.StreamHandler(logfile, _fmt)
+ self.logger.add_handler(_handler)
+ self.logger.suite_start([])
+ self.logger.info("☺")
+ logfile.seek(0)
+ data = logfile.readlines()[-1].strip()
+ if six.PY3:
+ self.assertEqual(data.decode(), "☺")
+ else:
+ self.assertEqual(data, "☺")
+ self.logger.suite_end()
+ self.logger.remove_handler(_handler)
+
+ def test_arguments(self):
+ self.logger.info(message="test")
+ self.assert_log_equals({"action": "log", "message": "test", "level": "INFO"})
+
+ self.logger.suite_start([], run_info={})
+ self.assert_log_equals(
+ {"action": "suite_start", "tests": {"default": []}, "run_info": {}}
+ )
+ self.logger.test_start(test="test1")
+ self.logger.test_status("subtest1", "FAIL", test="test1", status="PASS")
+ self.assert_log_equals(
+ {
+ "action": "test_status",
+ "test": "test1",
+ "subtest": "subtest1",
+ "status": "PASS",
+ "expected": "FAIL",
+ }
+ )
+ self.logger.process_output(123, "data", "test")
+ self.assert_log_equals(
+ {
+ "action": "process_output",
+ "process": "123",
+ "command": "test",
+ "data": "data",
+ }
+ )
+ self.assertRaises(
+ TypeError,
+ self.logger.test_status,
+ subtest="subtest2",
+ status="FAIL",
+ expected="PASS",
+ )
+ self.assertRaises(
+ TypeError,
+ self.logger.test_status,
+ "test1",
+ "subtest1",
+ "PASS",
+ "FAIL",
+ "message",
+ "stack",
+ {},
+ [],
+ None,
+ "unexpected",
+ )
+ self.assertRaises(TypeError, self.logger.test_status, "test1", test="test2")
+ self.logger.suite_end()
+
+
+class TestComponentFilter(BaseStructuredTest):
+ def test_filter_component(self):
+ component_logger = structuredlog.StructuredLogger(
+ self.logger.name, "test_component"
+ )
+ component_logger.component_filter = handlers.LogLevelFilter(lambda x: x, "info")
+
+ self.logger.debug("Test")
+ self.assertFalse(self.handler.empty)
+ self.assert_log_equals({"action": "log", "level": "DEBUG", "message": "Test"})
+ self.assertTrue(self.handler.empty)
+
+ component_logger.info("Test 1")
+ self.assertFalse(self.handler.empty)
+ self.assert_log_equals(
+ {
+ "action": "log",
+ "level": "INFO",
+ "message": "Test 1",
+ "component": "test_component",
+ }
+ )
+
+ component_logger.debug("Test 2")
+ self.assertTrue(self.handler.empty)
+
+ component_logger.component_filter = None
+
+ component_logger.debug("Test 3")
+ self.assertFalse(self.handler.empty)
+ self.assert_log_equals(
+ {
+ "action": "log",
+ "level": "DEBUG",
+ "message": "Test 3",
+ "component": "test_component",
+ }
+ )
+
+ def test_filter_default_component(self):
+ component_logger = structuredlog.StructuredLogger(
+ self.logger.name, "test_component"
+ )
+
+ self.logger.debug("Test")
+ self.assertFalse(self.handler.empty)
+ self.assert_log_equals({"action": "log", "level": "DEBUG", "message": "Test"})
+
+ self.logger.component_filter = handlers.LogLevelFilter(lambda x: x, "info")
+
+ self.logger.debug("Test 1")
+ self.assertTrue(self.handler.empty)
+
+ component_logger.debug("Test 2")
+ self.assertFalse(self.handler.empty)
+ self.assert_log_equals(
+ {
+ "action": "log",
+ "level": "DEBUG",
+ "message": "Test 2",
+ "component": "test_component",
+ }
+ )
+
+ self.logger.component_filter = None
+
+ self.logger.debug("Test 3")
+ self.assertFalse(self.handler.empty)
+ self.assert_log_equals({"action": "log", "level": "DEBUG", "message": "Test 3"})
+
+ def test_filter_message_mutuate(self):
+ def filter_mutate(msg):
+ if msg["action"] == "log":
+ msg["message"] = "FILTERED! %s" % msg["message"]
+ return msg
+
+ self.logger.component_filter = filter_mutate
+ self.logger.debug("Test")
+ self.assert_log_equals(
+ {"action": "log", "level": "DEBUG", "message": "FILTERED! Test"}
+ )
+ self.logger.component_filter = None
+
+
+class TestCommandline(unittest.TestCase):
+ def setUp(self):
+ self.logfile = mozfile.NamedTemporaryFile()
+
+ @property
+ def loglines(self):
+ self.logfile.seek(0)
+ return [line.rstrip() for line in self.logfile.readlines()]
+
+ def test_setup_logging(self):
+ parser = argparse.ArgumentParser()
+ commandline.add_logging_group(parser)
+ args = parser.parse_args(["--log-raw=-"])
+ logger = commandline.setup_logging("test_setup_logging", args, {})
+ self.assertEqual(len(logger.handlers), 1)
+
+ def test_setup_logging_optparse(self):
+ parser = optparse.OptionParser()
+ commandline.add_logging_group(parser)
+ args, _ = parser.parse_args(["--log-raw=-"])
+ logger = commandline.setup_logging("test_optparse", args, {})
+ self.assertEqual(len(logger.handlers), 1)
+ self.assertIsInstance(logger.handlers[0], handlers.StreamHandler)
+
+ def test_limit_formatters(self):
+ parser = argparse.ArgumentParser()
+ commandline.add_logging_group(parser, include_formatters=["raw"])
+ other_formatters = [fmt for fmt in commandline.log_formatters if fmt != "raw"]
+ # check that every formatter except raw is not present
+ for fmt in other_formatters:
+ with self.assertRaises(SystemExit):
+ parser.parse_args(["--log-%s=-" % fmt])
+ with self.assertRaises(SystemExit):
+ parser.parse_args(["--log-%s-level=error" % fmt])
+ # raw is still ok
+ args = parser.parse_args(["--log-raw=-"])
+ logger = commandline.setup_logging("test_setup_logging2", args, {})
+ self.assertEqual(len(logger.handlers), 1)
+
+ def test_setup_logging_optparse_unicode(self):
+ parser = optparse.OptionParser()
+ commandline.add_logging_group(parser)
+ args, _ = parser.parse_args([u"--log-raw=-"])
+ logger = commandline.setup_logging("test_optparse_unicode", args, {})
+ self.assertEqual(len(logger.handlers), 1)
+ self.assertEqual(logger.handlers[0].stream, sys.stdout)
+ self.assertIsInstance(logger.handlers[0], handlers.StreamHandler)
+
+ def test_logging_defaultlevel(self):
+ parser = argparse.ArgumentParser()
+ commandline.add_logging_group(parser)
+
+ args = parser.parse_args(["--log-tbpl=%s" % self.logfile.name])
+ logger = commandline.setup_logging("test_fmtopts", args, {})
+ logger.info("INFO message")
+ logger.debug("DEBUG message")
+ logger.error("ERROR message")
+ # The debug level is not logged by default.
+ self.assertEqual([b"INFO message", b"ERROR message"], self.loglines)
+
+ def test_logging_errorlevel(self):
+ parser = argparse.ArgumentParser()
+ commandline.add_logging_group(parser)
+ args = parser.parse_args(
+ ["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=error"]
+ )
+ logger = commandline.setup_logging("test_fmtopts", args, {})
+ logger.info("INFO message")
+ logger.debug("DEBUG message")
+ logger.error("ERROR message")
+
+ # Only the error level and above were requested.
+ self.assertEqual([b"ERROR message"], self.loglines)
+
+ def test_logging_debuglevel(self):
+ parser = argparse.ArgumentParser()
+ commandline.add_logging_group(parser)
+ args = parser.parse_args(
+ ["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=debug"]
+ )
+ logger = commandline.setup_logging("test_fmtopts", args, {})
+ logger.info("INFO message")
+ logger.debug("DEBUG message")
+ logger.error("ERROR message")
+ # Requesting a lower log level than default works as expected.
+ self.assertEqual(
+ [b"INFO message", b"DEBUG message", b"ERROR message"], self.loglines
+ )
+
+ def test_unused_options(self):
+ parser = argparse.ArgumentParser()
+ commandline.add_logging_group(parser)
+ args = parser.parse_args(["--log-tbpl-level=error"])
+ self.assertRaises(
+ ValueError, commandline.setup_logging, "test_fmtopts", args, {}
+ )
+
+
+class TestBuffer(BaseStructuredTest):
+ def assert_log_equals(self, expected, actual=None):
+ if actual is None:
+ actual = self.pop_last_item()
+
+ all_expected = {
+ "pid": os.getpid(),
+ "thread": "MainThread",
+ "source": "testBuffer",
+ }
+ specials = set(["time"])
+
+ all_expected.update(expected)
+ for key, value in six.iteritems(all_expected):
+ self.assertEqual(actual[key], value)
+
+ self.assertEqual(set(all_expected.keys()) | specials, set(actual.keys()))
+
+ def setUp(self):
+ self.logger = structuredlog.StructuredLogger("testBuffer")
+ self.handler = handlers.BufferHandler(TestHandler(), message_limit=4)
+ self.logger.add_handler(self.handler)
+
+ def tearDown(self):
+ self.logger.remove_handler(self.handler)
+
+ def pop_last_item(self):
+ return self.handler.inner.items.pop()
+
+ def test_buffer_messages(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.send_message("buffer", "off")
+ self.logger.test_status("test1", "sub1", status="PASS")
+ # Even for buffered actions, the buffer does not interfere if
+ # buffering is turned off.
+ self.assert_log_equals(
+ {
+ "action": "test_status",
+ "test": "test1",
+ "status": "PASS",
+ "subtest": "sub1",
+ }
+ )
+ self.logger.send_message("buffer", "on")
+ self.logger.test_status("test1", "sub2", status="PASS")
+ self.logger.test_status("test1", "sub3", status="PASS")
+ self.logger.test_status("test1", "sub4", status="PASS")
+ self.logger.test_status("test1", "sub5", status="PASS")
+ self.logger.test_status("test1", "sub6", status="PASS")
+ self.logger.test_status("test1", "sub7", status="PASS")
+ self.logger.test_end("test1", status="OK")
+ self.logger.send_message("buffer", "clear")
+ self.assert_log_equals({"action": "test_end", "test": "test1", "status": "OK"})
+ self.logger.suite_end()
+
+ def test_buffer_size(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "sub1", status="PASS")
+ self.logger.test_status("test1", "sub2", status="PASS")
+ self.logger.test_status("test1", "sub3", status="PASS")
+ self.logger.test_status("test1", "sub4", status="PASS")
+ self.logger.test_status("test1", "sub5", status="PASS")
+ self.logger.test_status("test1", "sub6", status="PASS")
+ self.logger.test_status("test1", "sub7", status="PASS")
+
+ # No test status messages made it to the underlying handler.
+ self.assert_log_equals({"action": "test_start", "test": "test1"})
+
+ # The buffer's actual size never grows beyond the specified limit.
+ self.assertEqual(len(self.handler._buffer), 4)
+
+ self.logger.test_status("test1", "sub8", status="FAIL")
+ # The number of messages deleted comes back in a list.
+ self.assertEqual([4], self.logger.send_message("buffer", "flush"))
+
+ # When the buffer is dumped, the failure is the last thing logged
+ self.assert_log_equals(
+ {
+ "action": "test_status",
+ "test": "test1",
+ "subtest": "sub8",
+ "status": "FAIL",
+ "expected": "PASS",
+ }
+ )
+ # Three additional messages should have been retained for context
+ self.assert_log_equals(
+ {
+ "action": "test_status",
+ "test": "test1",
+ "status": "PASS",
+ "subtest": "sub7",
+ }
+ )
+ self.assert_log_equals(
+ {
+ "action": "test_status",
+ "test": "test1",
+ "status": "PASS",
+ "subtest": "sub6",
+ }
+ )
+ self.assert_log_equals(
+ {
+ "action": "test_status",
+ "test": "test1",
+ "status": "PASS",
+ "subtest": "sub5",
+ }
+ )
+ self.assert_log_equals({"action": "suite_start", "tests": {"default": []}})
+
+
+class TestReader(unittest.TestCase):
+ def to_file_like(self, obj):
+ data_str = "\n".join(json.dumps(item) for item in obj)
+ return StringIO(data_str)
+
+ def test_read(self):
+ data = [
+ {"action": "action_0", "data": "data_0"},
+ {"action": "action_1", "data": "data_1"},
+ ]
+
+ f = self.to_file_like(data)
+ self.assertEqual(data, list(reader.read(f)))
+
+ def test_imap_log(self):
+ data = [
+ {"action": "action_0", "data": "data_0"},
+ {"action": "action_1", "data": "data_1"},
+ ]
+
+ f = self.to_file_like(data)
+
+ def f_action_0(item):
+ return ("action_0", item["data"])
+
+ def f_action_1(item):
+ return ("action_1", item["data"])
+
+ res_iter = reader.imap_log(
+ reader.read(f), {"action_0": f_action_0, "action_1": f_action_1}
+ )
+ self.assertEqual(
+ [("action_0", "data_0"), ("action_1", "data_1")], list(res_iter)
+ )
+
+ def test_each_log(self):
+ data = [
+ {"action": "action_0", "data": "data_0"},
+ {"action": "action_1", "data": "data_1"},
+ ]
+
+ f = self.to_file_like(data)
+
+ count = {"action_0": 0, "action_1": 0}
+
+ def f_action_0(item):
+ count[item["action"]] += 1
+
+ def f_action_1(item):
+ count[item["action"]] += 2
+
+ reader.each_log(
+ reader.read(f), {"action_0": f_action_0, "action_1": f_action_1}
+ )
+
+ self.assertEqual({"action_0": 1, "action_1": 2}, count)
+
+ def test_handler(self):
+ data = [
+ {"action": "action_0", "data": "data_0"},
+ {"action": "action_1", "data": "data_1"},
+ ]
+
+ f = self.to_file_like(data)
+
+ test = self
+
+ class ReaderTestHandler(reader.LogHandler):
+ def __init__(self):
+ self.action_0_count = 0
+ self.action_1_count = 0
+
+ def action_0(self, item):
+ test.assertEqual(item["action"], "action_0")
+ self.action_0_count += 1
+
+ def action_1(self, item):
+ test.assertEqual(item["action"], "action_1")
+ self.action_1_count += 1
+
+ handler = ReaderTestHandler()
+ reader.handle_log(reader.read(f), handler)
+
+ self.assertEqual(handler.action_0_count, 1)
+ self.assertEqual(handler.action_1_count, 1)
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozlog/tests/test_terminal_colors.py b/testing/mozbase/mozlog/tests/test_terminal_colors.py
new file mode 100644
index 0000000000..2dd72b7d53
--- /dev/null
+++ b/testing/mozbase/mozlog/tests/test_terminal_colors.py
@@ -0,0 +1,62 @@
+# encoding: utf-8
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import sys
+from io import StringIO
+
+import mozunit
+import pytest
+from mozterm import Terminal
+
+
+@pytest.fixture
+def terminal():
+ blessed = pytest.importorskip("blessed")
+
+ kind = "xterm-256color"
+ try:
+ term = Terminal(stream=StringIO(), force_styling=True, kind=kind)
+ except blessed.curses.error:
+ pytest.skip("terminal '{}' not found".format(kind))
+
+ return term
+
+
+EXPECTED_DICT = {
+ "log_test_status_fail": "\x1b[31mlog_test_status_fail\x1b(B\x1b[m",
+ "log_process_output": "\x1b[34mlog_process_output\x1b(B\x1b[m",
+ "log_test_status_pass": "\x1b[32mlog_test_status_pass\x1b(B\x1b[m",
+ "log_test_status_unexpected_fail": "\x1b[31mlog_test_status_unexpected_fail\x1b(B\x1b[m",
+ "log_test_status_known_intermittent": "\x1b[33mlog_test_status_known_intermittent\x1b(B\x1b[m",
+ "time": "\x1b[36mtime\x1b(B\x1b[m",
+ "action": "\x1b[33maction\x1b(B\x1b[m",
+ "pid": "\x1b[36mpid\x1b(B\x1b[m",
+ "heading": "\x1b[1m\x1b[33mheading\x1b(B\x1b[m",
+ "sub_heading": "\x1b[33msub_heading\x1b(B\x1b[m",
+ "error": "\x1b[31merror\x1b(B\x1b[m",
+ "warning": "\x1b[33mwarning\x1b(B\x1b[m",
+ "bold": "\x1b[1mbold\x1b(B\x1b[m",
+ "grey": "\x1b[38;2;190;190;190mgrey\x1b(B\x1b[m",
+ "normal": "\x1b[90mnormal\x1b(B\x1b[m",
+ "bright_black": "\x1b[90mbright_black\x1b(B\x1b[m",
+}
+
+
+@pytest.mark.skipif(
+ not sys.platform.startswith("win"),
+ reason="Only do ANSI Escape Sequence comparisons on Windows.",
+)
+def test_terminal_colors(terminal):
+ from mozlog.formatters.machformatter import TerminalColors, color_dict
+
+ actual_dict = TerminalColors(terminal, color_dict)
+
+ for key in color_dict:
+ assert getattr(actual_dict, key)(key) == EXPECTED_DICT[key]
+
+
+if __name__ == "__main__":
+ mozunit.main()