diff options
Diffstat (limited to 'testing/mozbase/mozlog/tests')
-rw-r--r-- | testing/mozbase/mozlog/tests/conftest.py | 24 | ||||
-rw-r--r-- | testing/mozbase/mozlog/tests/manifest.ini | 9 | ||||
-rw-r--r-- | testing/mozbase/mozlog/tests/test_capture.py | 37 | ||||
-rw-r--r-- | testing/mozbase/mozlog/tests/test_errorsummary.py | 125 | ||||
-rw-r--r-- | testing/mozbase/mozlog/tests/test_formatters.py | 767 | ||||
-rw-r--r-- | testing/mozbase/mozlog/tests/test_logger.py | 303 | ||||
-rw-r--r-- | testing/mozbase/mozlog/tests/test_logtypes.py | 106 | ||||
-rw-r--r-- | testing/mozbase/mozlog/tests/test_structured.py | 1097 | ||||
-rw-r--r-- | testing/mozbase/mozlog/tests/test_terminal_colors.py | 62 |
9 files changed, 2530 insertions, 0 deletions
diff --git a/testing/mozbase/mozlog/tests/conftest.py b/testing/mozbase/mozlog/tests/conftest.py new file mode 100644 index 0000000000..eaf79897e4 --- /dev/null +++ b/testing/mozbase/mozlog/tests/conftest.py @@ -0,0 +1,24 @@ +import pytest +from mozlog.formatters import ErrorSummaryFormatter, MachFormatter +from mozlog.handlers import StreamHandler +from mozlog.structuredlog import StructuredLogger +from six import StringIO + + +@pytest.fixture +def get_logger(): + # Ensure a new state instance is created for each test function. + StructuredLogger._logger_states = {} + formatters = { + "mach": MachFormatter, + "errorsummary": ErrorSummaryFormatter, + } + + def inner(name, **fmt_args): + buf = StringIO() + fmt = formatters[name](**fmt_args) + logger = StructuredLogger("test_logger") + logger.add_handler(StreamHandler(buf, fmt)) + return logger + + return inner diff --git a/testing/mozbase/mozlog/tests/manifest.ini b/testing/mozbase/mozlog/tests/manifest.ini new file mode 100644 index 0000000000..23737ca469 --- /dev/null +++ b/testing/mozbase/mozlog/tests/manifest.ini @@ -0,0 +1,9 @@ +[DEFAULT] +subsuite = mozbase +[test_errorsummary.py] +[test_logger.py] +[test_logtypes.py] +[test_formatters.py] +[test_structured.py] +[test_terminal_colors.py] +[test_capture.py] diff --git a/testing/mozbase/mozlog/tests/test_capture.py b/testing/mozbase/mozlog/tests/test_capture.py new file mode 100644 index 0000000000..4adbda4180 --- /dev/null +++ b/testing/mozbase/mozlog/tests/test_capture.py @@ -0,0 +1,37 @@ +import sys +import unittest + +import mozunit +from mozlog import capture, structuredlog +from test_structured import TestHandler + + +class TestCaptureIO(unittest.TestCase): + """Tests expected logging output of CaptureIO""" + + def setUp(self): + self.logger = structuredlog.StructuredLogger("test") + self.handler = TestHandler() + self.logger.add_handler(self.handler) + + def test_captureio_log(self): + """ + CaptureIO takes in two arguments. The second argument must + be truthy in order for the code to run. Hence, the string + "capture_stdio" has been used in this test case. + """ + with capture.CaptureIO(self.logger, "capture_stdio"): + print("message 1") + sys.stdout.write("message 2") + sys.stderr.write("message 3") + sys.stdout.write("\xff") + log = self.handler.items + messages = [item["message"] for item in log] + self.assertIn("STDOUT: message 1", messages) + self.assertIn("STDOUT: message 2", messages) + self.assertIn("STDERR: message 3", messages) + self.assertIn(u"STDOUT: \xff", messages) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozlog/tests/test_errorsummary.py b/testing/mozbase/mozlog/tests/test_errorsummary.py new file mode 100644 index 0000000000..422bd53358 --- /dev/null +++ b/testing/mozbase/mozlog/tests/test_errorsummary.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- + +import json +import time + +import mozunit +import pytest + +# flake8: noqa + + +@pytest.mark.parametrize( + "logs,expected", + ( + pytest.param( + [ + ( + "suite_start", + { + "manifestA": ["test_foo", "test_bar", "test_baz"], + "manifestB": ["test_something"], + }, + ), + ("test_start", "test_foo"), + ("test_end", "test_foo", "SKIP"), + ("test_start", "test_bar"), + ("test_end", "test_bar", "OK"), + ("test_start", "test_something"), + ("test_end", "test_something", "OK"), + ("test_start", "test_baz"), + ("test_end", "test_baz", "PASS", "FAIL"), + ("suite_end",), + ], + """ + {"groups": ["manifestA", "manifestB"], "action": "test_groups", "line": 0} + {"test": "test_baz", "subtest": null, "group": "manifestA", "status": "PASS", "expected": "FAIL", "message": null, "stack": null, "known_intermittent": [], "action": "test_result", "line": 8} + {"group": "manifestA", "status": "ERROR", "duration": 70, "action": "group_result", "line": 9} + {"group": "manifestB", "status": "OK", "duration": 10, "action": "group_result", "line": 9} + """.strip(), + id="basic", + ), + pytest.param( + [ + ("suite_start", {"manifest": ["test_foo"]}), + ("test_start", "test_foo"), + ("suite_end",), + ], + """ + {"groups": ["manifest"], "action": "test_groups", "line": 0} + {"group": "manifest", "status": null, "duration": null, "action": "group_result", "line": 2} + """.strip(), + id="missing_test_end", + ), + pytest.param( + [ + ("suite_start", {"manifest": ["test_foo"]}), + ("test_start", "test_foo"), + ("test_status", "test_foo", "subtest", "PASS"), + ("suite_end",), + ], + """ + {"groups": ["manifest"], "action": "test_groups", "line": 0} + {"group": "manifest", "status": "ERROR", "duration": null, "action": "group_result", "line": 3} + """.strip(), + id="missing_test_end_with_test_status_ok", + marks=pytest.mark.xfail, # status is OK but should be ERROR + ), + pytest.param( + [ + ( + "suite_start", + { + "manifestA": ["test_foo", "test_bar", "test_baz"], + "manifestB": ["test_something"], + }, + ), + ("test_start", "test_foo"), + ("test_end", "test_foo", "SKIP"), + ("test_start", "test_bar"), + ("test_end", "test_bar", "CRASH"), + ("test_start", "test_something"), + ("test_end", "test_something", "OK"), + ("test_start", "test_baz"), + ("test_end", "test_baz", "FAIL", "FAIL"), + ("suite_end",), + ], + """ + {"groups": ["manifestA", "manifestB"], "action": "test_groups", "line": 0} + {"test": "test_bar", "subtest": null, "group": "manifestA", "status": "CRASH", "expected": "OK", "message": null, "stack": null, "known_intermittent": [], "action": "test_result", "line": 4} + {"group": "manifestA", "status": "ERROR", "duration": 70, "action": "group_result", "line": 9} + {"group": "manifestB", "status": "OK", "duration": 10, "action": "group_result", "line": 9} + """.strip(), + id="crash_and_group_status", + ), + ), +) +def test_errorsummary(monkeypatch, get_logger, logs, expected): + ts = {"ts": 0.0} # need to use dict since 'nonlocal' doesn't exist on PY2 + + def fake_time(): + ts["ts"] += 0.01 + return ts["ts"] + + monkeypatch.setattr(time, "time", fake_time) + logger = get_logger("errorsummary") + + for log in logs: + getattr(logger, log[0])(*log[1:]) + + buf = logger.handlers[0].stream + result = buf.getvalue() + print("Dumping result for copy/paste:") + print(result) + + expected = expected.split("\n") + for i, line in enumerate(result.split("\n")): + if not line: + continue + + data = json.loads(line) + assert data == json.loads(expected[i]) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozlog/tests/test_formatters.py b/testing/mozbase/mozlog/tests/test_formatters.py new file mode 100644 index 0000000000..e0e3a51d97 --- /dev/null +++ b/testing/mozbase/mozlog/tests/test_formatters.py @@ -0,0 +1,767 @@ +# encoding: utf-8 + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import os +import signal +import unittest +import xml.etree.ElementTree as ET +from textwrap import dedent + +import mozunit +import pytest +from mozlog.formatters import ( + GroupingFormatter, + HTMLFormatter, + MachFormatter, + TbplFormatter, + XUnitFormatter, +) +from mozlog.handlers import StreamHandler +from mozlog.structuredlog import StructuredLogger +from six import StringIO, ensure_text, unichr + +FORMATS = { + # A list of tuples consisting of (name, options, expected string). + "PASS": [ + ( + "mach", + {}, + dedent( + """ + 0:00.00 SUITE_START: running 3 tests + 0:00.00 TEST_START: test_foo + 0:00.00 TEST_END: OK + 0:00.00 TEST_START: test_bar + 0:00.00 TEST_END: Test OK. Subtests passed 1/1. Unexpected 0 + 0:00.00 TEST_START: test_baz + 0:00.00 TEST_END: FAIL + 0:00.00 SUITE_END + + suite 1 + ~~~~~~~ + Ran 4 checks (1 subtests, 3 tests) + Expected results: 4 + Unexpected results: 0 + OK + """ + ).lstrip("\n"), + ), + ( + "mach", + {"verbose": True}, + dedent( + """ + 0:00.00 SUITE_START: running 3 tests + 0:00.00 TEST_START: test_foo + 0:00.00 TEST_END: OK + 0:00.00 TEST_START: test_bar + 0:00.00 PASS a subtest + 0:00.00 TEST_END: Test OK. Subtests passed 1/1. Unexpected 0 + 0:00.00 TEST_START: test_baz + 0:00.00 TEST_END: FAIL + 0:00.00 SUITE_END + + suite 1 + ~~~~~~~ + Ran 4 checks (1 subtests, 3 tests) + Expected results: 4 + Unexpected results: 0 + OK + """ + ).lstrip("\n"), + ), + ], + "FAIL": [ + ( + "mach", + {}, + dedent( + """ + 0:00.00 SUITE_START: running 3 tests + 0:00.00 TEST_START: test_foo + 0:00.00 TEST_END: FAIL, expected PASS - expected 0 got 1 + 0:00.00 TEST_START: test_bar + 0:00.00 TEST_END: Test OK. Subtests passed 0/2. Unexpected 2 + FAIL a subtest - expected 0 got 1 + SimpleTest.is@SimpleTest/SimpleTest.js:312:5 + @caps/tests/mochitest/test_bug246699.html:53:1 + TIMEOUT another subtest + 0:00.00 TEST_START: test_baz + 0:00.00 TEST_END: PASS, expected FAIL + 0:00.00 SUITE_END + + suite 1 + ~~~~~~~ + Ran 5 checks (2 subtests, 3 tests) + Expected results: 1 + Unexpected results: 4 + test: 2 (1 fail, 1 pass) + subtest: 2 (1 fail, 1 timeout) + + Unexpected Results + ------------------ + test_foo + FAIL test_foo - expected 0 got 1 + test_bar + FAIL a subtest - expected 0 got 1 + SimpleTest.is@SimpleTest/SimpleTest.js:312:5 + @caps/tests/mochitest/test_bug246699.html:53:1 + TIMEOUT another subtest + test_baz + UNEXPECTED-PASS test_baz + """ + ).lstrip("\n"), + ), + ( + "mach", + {"verbose": True}, + dedent( + """ + 0:00.00 SUITE_START: running 3 tests + 0:00.00 TEST_START: test_foo + 0:00.00 TEST_END: FAIL, expected PASS - expected 0 got 1 + 0:00.00 TEST_START: test_bar + 0:00.00 FAIL a subtest - expected 0 got 1 + SimpleTest.is@SimpleTest/SimpleTest.js:312:5 + @caps/tests/mochitest/test_bug246699.html:53:1 + 0:00.00 TIMEOUT another subtest + 0:00.00 TEST_END: Test OK. Subtests passed 0/2. Unexpected 2 + 0:00.00 TEST_START: test_baz + 0:00.00 TEST_END: PASS, expected FAIL + 0:00.00 SUITE_END + + suite 1 + ~~~~~~~ + Ran 5 checks (2 subtests, 3 tests) + Expected results: 1 + Unexpected results: 4 + test: 2 (1 fail, 1 pass) + subtest: 2 (1 fail, 1 timeout) + + Unexpected Results + ------------------ + test_foo + FAIL test_foo - expected 0 got 1 + test_bar + FAIL a subtest - expected 0 got 1 + SimpleTest.is@SimpleTest/SimpleTest.js:312:5 + @caps/tests/mochitest/test_bug246699.html:53:1 + TIMEOUT another subtest + test_baz + UNEXPECTED-PASS test_baz + """ + ).lstrip("\n"), + ), + ], + "PRECONDITION_FAILED": [ + ( + "mach", + {}, + dedent( + """ + 0:00.00 SUITE_START: running 2 tests + 0:00.00 TEST_START: test_foo + 0:00.00 TEST_END: PRECONDITION_FAILED, expected OK + 0:00.00 TEST_START: test_bar + 0:00.00 TEST_END: Test OK. Subtests passed 1/2. Unexpected 1 + PRECONDITION_FAILED another subtest + 0:00.00 SUITE_END + + suite 1 + ~~~~~~~ + Ran 4 checks (2 subtests, 2 tests) + Expected results: 2 + Unexpected results: 2 + test: 1 (1 precondition_failed) + subtest: 1 (1 precondition_failed) + + Unexpected Results + ------------------ + test_foo + PRECONDITION_FAILED test_foo + test_bar + PRECONDITION_FAILED another subtest + """ + ).lstrip("\n"), + ), + ( + "mach", + {"verbose": True}, + dedent( + """ + 0:00.00 SUITE_START: running 2 tests + 0:00.00 TEST_START: test_foo + 0:00.00 TEST_END: PRECONDITION_FAILED, expected OK + 0:00.00 TEST_START: test_bar + 0:00.00 PASS a subtest + 0:00.00 PRECONDITION_FAILED another subtest + 0:00.00 TEST_END: Test OK. Subtests passed 1/2. Unexpected 1 + 0:00.00 SUITE_END + + suite 1 + ~~~~~~~ + Ran 4 checks (2 subtests, 2 tests) + Expected results: 2 + Unexpected results: 2 + test: 1 (1 precondition_failed) + subtest: 1 (1 precondition_failed) + + Unexpected Results + ------------------ + test_foo + PRECONDITION_FAILED test_foo + test_bar + PRECONDITION_FAILED another subtest + """ + ).lstrip("\n"), + ), + ], + "KNOWN-INTERMITTENT": [ + ( + "mach", + {}, + dedent( + """ + 0:00.00 SUITE_START: running 3 tests + 0:00.00 TEST_START: test_foo + 0:00.00 TEST_END: FAIL + KNOWN-INTERMITTENT-FAIL test_foo + 0:00.00 TEST_START: test_bar + 0:00.00 TEST_END: Test OK. Subtests passed 1/1. Unexpected 0 + KNOWN-INTERMITTENT-PASS a subtest + 0:00.00 TEST_START: test_baz + 0:00.00 TEST_END: FAIL + 0:00.00 SUITE_END + + suite 1 + ~~~~~~~ + Ran 4 checks (1 subtests, 3 tests) + Expected results: 4 (2 known intermittents) + Unexpected results: 0 + + Known Intermittent Results + -------------------------- + test_foo + KNOWN-INTERMITTENT-FAIL test_foo + test_bar + KNOWN-INTERMITTENT-PASS a subtest + OK + """ + ).lstrip("\n"), + ), + ( + "mach", + {"verbose": True}, + dedent( + """ + 0:00.00 SUITE_START: running 3 tests + 0:00.00 TEST_START: test_foo + 0:00.00 TEST_END: FAIL + KNOWN-INTERMITTENT-FAIL test_foo + 0:00.00 TEST_START: test_bar + 0:00.00 KNOWN-INTERMITTENT-PASS a subtest + 0:00.00 TEST_END: Test OK. Subtests passed 1/1. Unexpected 0 + KNOWN-INTERMITTENT-PASS a subtest + 0:00.00 TEST_START: test_baz + 0:00.00 TEST_END: FAIL + 0:00.00 SUITE_END + + suite 1 + ~~~~~~~ + Ran 4 checks (1 subtests, 3 tests) + Expected results: 4 (2 known intermittents) + Unexpected results: 0 + + Known Intermittent Results + -------------------------- + test_foo + KNOWN-INTERMITTENT-FAIL test_foo + test_bar + KNOWN-INTERMITTENT-PASS a subtest + OK + """ + ).lstrip("\n"), + ), + ], +} + + +def ids(test): + ids = [] + for value in FORMATS[test]: + args = ", ".join(["{}={}".format(k, v) for k, v in value[1].items()]) + if args: + args = "-{}".format(args) + ids.append("{}{}".format(value[0], args)) + return ids + + +@pytest.fixture(autouse=True) +def timestamp(monkeypatch): + def fake_time(*args, **kwargs): + return 0 + + monkeypatch.setattr(MachFormatter, "_time", fake_time) + + +@pytest.mark.parametrize("name,opts,expected", FORMATS["PASS"], ids=ids("PASS")) +def test_pass(get_logger, name, opts, expected): + logger = get_logger(name, **opts) + + logger.suite_start(["test_foo", "test_bar", "test_baz"]) + logger.test_start("test_foo") + logger.test_end("test_foo", "OK") + logger.test_start("test_bar") + logger.test_status("test_bar", "a subtest", "PASS") + logger.test_end("test_bar", "OK") + logger.test_start("test_baz") + logger.test_end("test_baz", "FAIL", "FAIL", "expected 0 got 1") + logger.suite_end() + + buf = logger.handlers[0].stream + result = buf.getvalue() + print("Dumping result for copy/paste:") + print(result) + assert result == expected + + +@pytest.mark.parametrize("name,opts,expected", FORMATS["FAIL"], ids=ids("FAIL")) +def test_fail(get_logger, name, opts, expected): + stack = """ + SimpleTest.is@SimpleTest/SimpleTest.js:312:5 + @caps/tests/mochitest/test_bug246699.html:53:1 +""".strip( + "\n" + ) + + logger = get_logger(name, **opts) + + logger.suite_start(["test_foo", "test_bar", "test_baz"]) + logger.test_start("test_foo") + logger.test_end("test_foo", "FAIL", "PASS", "expected 0 got 1") + logger.test_start("test_bar") + logger.test_status( + "test_bar", "a subtest", "FAIL", "PASS", "expected 0 got 1", stack + ) + logger.test_status("test_bar", "another subtest", "TIMEOUT") + logger.test_end("test_bar", "OK") + logger.test_start("test_baz") + logger.test_end("test_baz", "PASS", "FAIL") + logger.suite_end() + + buf = logger.handlers[0].stream + result = buf.getvalue() + print("Dumping result for copy/paste:") + print(result) + assert result == expected + + +@pytest.mark.parametrize( + "name,opts,expected", FORMATS["PRECONDITION_FAILED"], ids=ids("PRECONDITION_FAILED") +) +def test_precondition_failed(get_logger, name, opts, expected): + logger = get_logger(name, **opts) + + logger.suite_start(["test_foo", "test_bar"]) + logger.test_start("test_foo") + logger.test_end("test_foo", "PRECONDITION_FAILED") + logger.test_start("test_bar") + logger.test_status("test_bar", "a subtest", "PASS") + logger.test_status("test_bar", "another subtest", "PRECONDITION_FAILED") + logger.test_end("test_bar", "OK") + logger.suite_end() + + buf = logger.handlers[0].stream + result = buf.getvalue() + print("Dumping result for copy/paste:") + print(result) + assert result == expected + + +@pytest.mark.parametrize( + "name,opts,expected", FORMATS["KNOWN-INTERMITTENT"], ids=ids("KNOWN-INTERMITTENT") +) +def test_known_intermittent(get_logger, name, opts, expected): + logger = get_logger(name, **opts) + + logger.suite_start(["test_foo", "test_bar", "test_baz"]) + logger.test_start("test_foo") + logger.test_end("test_foo", "FAIL", "PASS", known_intermittent=["FAIL"]) + logger.test_start("test_bar") + logger.test_status( + "test_bar", "a subtest", "PASS", "FAIL", known_intermittent=["PASS"] + ) + logger.test_end("test_bar", "OK") + logger.test_start("test_baz") + logger.test_end( + "test_baz", "FAIL", "FAIL", "expected 0 got 1", known_intermittent=["PASS"] + ) + logger.suite_end() + + buf = logger.handlers[0].stream + result = buf.getvalue() + print("Dumping result for copy/paste:") + print(result) + assert result == expected + + +class FormatterTest(unittest.TestCase): + def setUp(self): + self.position = 0 + self.logger = StructuredLogger("test_%s" % type(self).__name__) + self.output_file = StringIO() + self.handler = StreamHandler(self.output_file, self.get_formatter()) + self.logger.add_handler(self.handler) + + def set_position(self, pos=None): + if pos is None: + pos = self.output_file.tell() + self.position = pos + + def get_formatter(self): + raise NotImplementedError( + "FormatterTest subclasses must implement get_formatter" + ) + + @property + def loglines(self): + self.output_file.seek(self.position) + return [ensure_text(line.rstrip()) for line in self.output_file.readlines()] + + +class TestHTMLFormatter(FormatterTest): + def get_formatter(self): + return HTMLFormatter() + + def test_base64_string(self): + self.logger.suite_start([]) + self.logger.test_start("string_test") + self.logger.test_end("string_test", "FAIL", extra={"data": "foobar"}) + self.logger.suite_end() + self.assertIn("data:text/html;charset=utf-8;base64,Zm9vYmFy", self.loglines[-3]) + + def test_base64_unicode(self): + self.logger.suite_start([]) + self.logger.test_start("unicode_test") + self.logger.test_end("unicode_test", "FAIL", extra={"data": unichr(0x02A9)}) + self.logger.suite_end() + self.assertIn("data:text/html;charset=utf-8;base64,yqk=", self.loglines[-3]) + + def test_base64_other(self): + self.logger.suite_start([]) + self.logger.test_start("int_test") + self.logger.test_end("int_test", "FAIL", extra={"data": {"foo": "bar"}}) + self.logger.suite_end() + self.assertIn( + "data:text/html;charset=utf-8;base64,eyJmb28iOiAiYmFyIn0=", + self.loglines[-3], + ) + + +class TestTBPLFormatter(FormatterTest): + def get_formatter(self): + return TbplFormatter() + + def test_unexpected_message(self): + self.logger.suite_start([]) + self.logger.test_start("timeout_test") + self.logger.test_end("timeout_test", "TIMEOUT", message="timed out") + self.assertIn( + "TEST-UNEXPECTED-TIMEOUT | timeout_test | timed out", self.loglines + ) + self.logger.suite_end() + + def test_default_unexpected_end_message(self): + self.logger.suite_start([]) + self.logger.test_start("timeout_test") + self.logger.test_end("timeout_test", "TIMEOUT") + self.assertIn( + "TEST-UNEXPECTED-TIMEOUT | timeout_test | expected OK", self.loglines + ) + self.logger.suite_end() + + def test_default_unexpected_status_message(self): + self.logger.suite_start([]) + self.logger.test_start("timeout_test") + self.logger.test_status("timeout_test", "subtest", status="TIMEOUT") + self.assertIn( + "TEST-UNEXPECTED-TIMEOUT | timeout_test | subtest - expected PASS", + self.loglines, + ) + self.logger.test_end("timeout_test", "OK") + self.logger.suite_end() + + def test_known_intermittent_end(self): + self.logger.suite_start([]) + self.logger.test_start("intermittent_test") + self.logger.test_end( + "intermittent_test", + status="FAIL", + expected="PASS", + known_intermittent=["FAIL"], + ) + # test_end log format: + # "TEST-KNOWN-INTERMITTENT-<STATUS> | <test> | took <duration>ms" + # where duration may be different each time + self.assertIn( + "TEST-KNOWN-INTERMITTENT-FAIL | intermittent_test | took ", self.loglines[2] + ) + self.assertIn("ms", self.loglines[2]) + self.logger.suite_end() + + def test_known_intermittent_status(self): + self.logger.suite_start([]) + self.logger.test_start("intermittent_test") + self.logger.test_status( + "intermittent_test", + "subtest", + status="FAIL", + expected="PASS", + known_intermittent=["FAIL"], + ) + self.assertIn( + "TEST-KNOWN-INTERMITTENT-FAIL | intermittent_test | subtest", self.loglines + ) + self.logger.test_end("intermittent_test", "OK") + self.logger.suite_end() + + def test_single_newline(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.set_position() + self.logger.test_status("test1", "subtest", status="PASS", expected="FAIL") + self.logger.test_end("test1", "OK") + self.logger.suite_end() + + # This sequence should not produce blanklines + for line in self.loglines: + self.assertNotEqual("", line) + + def test_process_exit(self): + self.logger.process_exit(1234, 0) + self.assertIn("TEST-INFO | 1234: exit 0", self.loglines) + + @unittest.skipUnless(os.name == "posix", "posix only") + def test_process_exit_with_sig(self): + # subprocess return code is negative when process + # has been killed by signal on posix. + self.logger.process_exit(1234, -signal.SIGTERM) + self.assertIn("TEST-INFO | 1234: killed by SIGTERM", self.loglines) + + +class TestTBPLFormatterWithShutdown(FormatterTest): + def get_formatter(self): + return TbplFormatter(summary_on_shutdown=True) + + def test_suite_summary_on_shutdown(self): + self.logger.suite_start([]) + self.logger.test_start("summary_test") + self.logger.test_status( + "summary_test", "subtest", "FAIL", "PASS", known_intermittent=["FAIL"] + ) + self.logger.test_end("summary_test", "FAIL", "OK", known_intermittent=["FAIL"]) + self.logger.suite_end() + self.logger.shutdown() + + self.assertIn("suite 1: 2/2 (2 known intermittent tests)", self.loglines) + self.assertIn("Known Intermittent tests:", self.loglines) + self.assertIn( + "TEST-KNOWN-INTERMITTENT-FAIL | summary_test | subtest", self.loglines + ) + + +class TestMachFormatter(FormatterTest): + def get_formatter(self): + return MachFormatter(disable_colors=True) + + def test_summary(self): + self.logger.suite_start([]) + + # Some tests that pass + self.logger.test_start("test1") + self.logger.test_end("test1", status="PASS", expected="PASS") + + self.logger.test_start("test2") + self.logger.test_end("test2", status="PASS", expected="TIMEOUT") + + self.logger.test_start("test3") + self.logger.test_end("test3", status="FAIL", expected="PASS") + + self.set_position() + self.logger.suite_end() + + self.assertIn("Ran 3 checks (3 tests)", self.loglines) + self.assertIn("Expected results: 1", self.loglines) + self.assertIn( + """ +Unexpected results: 2 + test: 2 (1 fail, 1 pass) +""".strip(), + "\n".join(self.loglines), + ) + self.assertNotIn("test1", self.loglines) + self.assertIn("UNEXPECTED-PASS test2", self.loglines) + self.assertIn("FAIL test3", self.loglines) + + def test_summary_subtests(self): + self.logger.suite_start([]) + + self.logger.test_start("test1") + self.logger.test_status("test1", "subtest1", status="PASS") + self.logger.test_status("test1", "subtest2", status="FAIL") + self.logger.test_end("test1", status="OK", expected="OK") + + self.logger.test_start("test2") + self.logger.test_status("test2", "subtest1", status="TIMEOUT", expected="PASS") + self.logger.test_end("test2", status="TIMEOUT", expected="OK") + + self.set_position() + self.logger.suite_end() + + self.assertIn("Ran 5 checks (3 subtests, 2 tests)", self.loglines) + self.assertIn("Expected results: 2", self.loglines) + self.assertIn( + """ +Unexpected results: 3 + test: 1 (1 timeout) + subtest: 2 (1 fail, 1 timeout) +""".strip(), + "\n".join(self.loglines), + ) + + def test_summary_ok(self): + self.logger.suite_start([]) + + self.logger.test_start("test1") + self.logger.test_status("test1", "subtest1", status="PASS") + self.logger.test_status("test1", "subtest2", status="PASS") + self.logger.test_end("test1", status="OK", expected="OK") + + self.logger.test_start("test2") + self.logger.test_status("test2", "subtest1", status="PASS", expected="PASS") + self.logger.test_end("test2", status="OK", expected="OK") + + self.set_position() + self.logger.suite_end() + + self.assertIn("OK", self.loglines) + self.assertIn("Expected results: 5", self.loglines) + self.assertIn("Unexpected results: 0", self.loglines) + + def test_process_start(self): + self.logger.process_start(1234) + self.assertIn("Started process `1234`", self.loglines[0]) + + def test_process_start_with_command(self): + self.logger.process_start(1234, command="test cmd") + self.assertIn("Started process `1234` (test cmd)", self.loglines[0]) + + def test_process_exit(self): + self.logger.process_exit(1234, 0) + self.assertIn("1234: exit 0", self.loglines[0]) + + @unittest.skipUnless(os.name == "posix", "posix only") + def test_process_exit_with_sig(self): + # subprocess return code is negative when process + # has been killed by signal on posix. + self.logger.process_exit(1234, -signal.SIGTERM) + self.assertIn("1234: killed by SIGTERM", self.loglines[0]) + + +class TestGroupingFormatter(FormatterTest): + def get_formatter(self): + return GroupingFormatter() + + def test_results_total(self): + self.logger.suite_start([]) + + self.logger.test_start("test1") + self.logger.test_status("test1", "subtest1", status="PASS") + self.logger.test_status("test1", "subtest1", status="PASS") + self.logger.test_end("test1", status="OK") + + self.logger.test_start("test2") + self.logger.test_status( + "test2", + "subtest2", + status="FAIL", + expected="PASS", + known_intermittent=["FAIL"], + ) + self.logger.test_end("test2", status="FAIL", expected="OK") + + self.set_position() + self.logger.suite_end() + + self.assertIn("Ran 2 tests finished in 0.0 seconds.", self.loglines) + self.assertIn(" \u2022 1 ran as expected. 0 tests skipped.", self.loglines) + self.assertIn(" \u2022 1 known intermittent results.", self.loglines) + self.assertIn(" \u2022 1 tests failed unexpectedly", self.loglines) + self.assertIn(" \u25B6 FAIL [expected OK] test2", self.loglines) + self.assertIn( + " \u25B6 FAIL [expected PASS, known intermittent [FAIL] test2, subtest2", + self.loglines, + ) + + +class TestXUnitFormatter(FormatterTest): + def get_formatter(self): + return XUnitFormatter() + + def log_as_xml(self): + return ET.fromstring("\n".join(self.loglines)) + + def test_stacktrace_is_present(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end( + "test1", "fail", message="Test message", stack="this\nis\na\nstack" + ) + self.logger.suite_end() + + root = self.log_as_xml() + self.assertIn("this\nis\na\nstack", root.find("testcase/failure").text) + + def test_failure_message(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end("test1", "fail", message="Test message") + self.logger.suite_end() + + root = self.log_as_xml() + self.assertEqual( + "Expected OK, got FAIL", root.find("testcase/failure").get("message") + ) + + def test_suite_attrs(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end("test1", "ok", message="Test message") + self.logger.suite_end() + + root = self.log_as_xml() + self.assertEqual(root.get("skips"), "0") + self.assertEqual(root.get("failures"), "0") + self.assertEqual(root.get("errors"), "0") + self.assertEqual(root.get("tests"), "1") + + def test_time_is_not_rounded(self): + # call formatter directly, it is easier here + formatter = self.get_formatter() + formatter.suite_start(dict(time=55000)) + formatter.test_start(dict(time=55100)) + formatter.test_end( + dict(time=55558, test="id", message="message", status="PASS") + ) + xml_string = formatter.suite_end(dict(time=55559)) + + root = ET.fromstring(xml_string) + self.assertEqual(root.get("time"), "0.56") + self.assertEqual(root.find("testcase").get("time"), "0.46") + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozlog/tests/test_logger.py b/testing/mozbase/mozlog/tests/test_logger.py new file mode 100644 index 0000000000..0776d87000 --- /dev/null +++ b/testing/mozbase/mozlog/tests/test_logger.py @@ -0,0 +1,303 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +import datetime +import json +import socket +import threading +import time +import unittest + +import mozfile +import mozlog.unstructured as mozlog +import mozunit +import six + + +class ListHandler(mozlog.Handler): + """Mock handler appends messages to a list for later inspection.""" + + def __init__(self): + mozlog.Handler.__init__(self) + self.messages = [] + + def emit(self, record): + self.messages.append(self.format(record)) + + +class TestLogging(unittest.TestCase): + """Tests behavior of basic mozlog api.""" + + def test_logger_defaults(self): + """Tests the default logging format and behavior.""" + + default_logger = mozlog.getLogger("default.logger") + self.assertEqual(default_logger.name, "default.logger") + self.assertEqual(len(default_logger.handlers), 1) + self.assertTrue(isinstance(default_logger.handlers[0], mozlog.StreamHandler)) + + f = mozfile.NamedTemporaryFile() + list_logger = mozlog.getLogger( + "file.logger", handler=mozlog.FileHandler(f.name) + ) + self.assertEqual(len(list_logger.handlers), 1) + self.assertTrue(isinstance(list_logger.handlers[0], mozlog.FileHandler)) + f.close() + + self.assertRaises( + ValueError, mozlog.getLogger, "file.logger", handler=ListHandler() + ) + + def test_timestamps(self): + """Verifies that timestamps are included when asked for.""" + log_name = "test" + handler = ListHandler() + handler.setFormatter(mozlog.MozFormatter()) + log = mozlog.getLogger(log_name, handler=handler) + log.info("no timestamp") + self.assertTrue(handler.messages[-1].startswith("%s " % log_name)) + handler.setFormatter(mozlog.MozFormatter(include_timestamp=True)) + log.info("timestamp") + # Just verify that this raises no exceptions. + datetime.datetime.strptime(handler.messages[-1][:23], "%Y-%m-%d %H:%M:%S,%f") + + +class TestStructuredLogging(unittest.TestCase): + """Tests structured output in mozlog.""" + + def setUp(self): + self.handler = ListHandler() + self.handler.setFormatter(mozlog.JSONFormatter()) + self.logger = mozlog.MozLogger("test.Logger") + self.logger.addHandler(self.handler) + self.logger.setLevel(mozlog.DEBUG) + + def check_messages(self, expected, actual): + """Checks actual for equality with corresponding fields in actual. + The actual message should contain all fields in expected, and + should be identical, with the exception of the timestamp field. + The actual message should contain no fields other than the timestamp + field and those present in expected.""" + + self.assertTrue(isinstance(actual["_time"], six.integer_types)) + + for k, v in expected.items(): + self.assertEqual(v, actual[k]) + + for k in actual.keys(): + if k != "_time": + self.assertTrue(expected.get(k) is not None) + + def test_structured_output(self): + self.logger.log_structured( + "test_message", {"_level": mozlog.INFO, "_message": "message one"} + ) + self.logger.log_structured( + "test_message", {"_level": mozlog.INFO, "_message": "message two"} + ) + self.logger.log_structured( + "error_message", {"_level": mozlog.ERROR, "diagnostic": "unexpected error"} + ) + + message_one_expected = { + "_namespace": "test.Logger", + "_level": "INFO", + "_message": "message one", + "action": "test_message", + } + message_two_expected = { + "_namespace": "test.Logger", + "_level": "INFO", + "_message": "message two", + "action": "test_message", + } + message_three_expected = { + "_namespace": "test.Logger", + "_level": "ERROR", + "diagnostic": "unexpected error", + "action": "error_message", + } + + message_one_actual = json.loads(self.handler.messages[0]) + message_two_actual = json.loads(self.handler.messages[1]) + message_three_actual = json.loads(self.handler.messages[2]) + + self.check_messages(message_one_expected, message_one_actual) + self.check_messages(message_two_expected, message_two_actual) + self.check_messages(message_three_expected, message_three_actual) + + def test_unstructured_conversion(self): + """Tests that logging to a logger with a structured formatter + via the traditional logging interface works as expected.""" + self.logger.info("%s %s %d", "Message", "number", 1) + self.logger.error("Message number 2") + self.logger.debug( + "Message with %s", + "some extras", + extra={"params": {"action": "mozlog_test_output", "is_failure": False}}, + ) + message_one_expected = { + "_namespace": "test.Logger", + "_level": "INFO", + "_message": "Message number 1", + } + message_two_expected = { + "_namespace": "test.Logger", + "_level": "ERROR", + "_message": "Message number 2", + } + message_three_expected = { + "_namespace": "test.Logger", + "_level": "DEBUG", + "_message": "Message with some extras", + "action": "mozlog_test_output", + "is_failure": False, + } + + message_one_actual = json.loads(self.handler.messages[0]) + message_two_actual = json.loads(self.handler.messages[1]) + message_three_actual = json.loads(self.handler.messages[2]) + + self.check_messages(message_one_expected, message_one_actual) + self.check_messages(message_two_expected, message_two_actual) + self.check_messages(message_three_expected, message_three_actual) + + def message_callback(self): + if len(self.handler.messages) == 3: + message_one_expected = { + "_namespace": "test.Logger", + "_level": "DEBUG", + "_message": "socket message one", + "action": "test_message", + } + message_two_expected = { + "_namespace": "test.Logger", + "_level": "DEBUG", + "_message": "socket message two", + "action": "test_message", + } + message_three_expected = { + "_namespace": "test.Logger", + "_level": "DEBUG", + "_message": "socket message three", + "action": "test_message", + } + + message_one_actual = json.loads(self.handler.messages[0]) + + message_two_actual = json.loads(self.handler.messages[1]) + + message_three_actual = json.loads(self.handler.messages[2]) + + self.check_messages(message_one_expected, message_one_actual) + self.check_messages(message_two_expected, message_two_actual) + self.check_messages(message_three_expected, message_three_actual) + + def test_log_listener(self): + connection = "127.0.0.1", 0 + self.log_server = mozlog.LogMessageServer( + connection, self.logger, message_callback=self.message_callback, timeout=0.5 + ) + + message_string_one = json.dumps( + { + "_message": "socket message one", + "action": "test_message", + "_level": "DEBUG", + } + ) + message_string_two = json.dumps( + { + "_message": "socket message two", + "action": "test_message", + "_level": "DEBUG", + } + ) + + message_string_three = json.dumps( + { + "_message": "socket message three", + "action": "test_message", + "_level": "DEBUG", + } + ) + + message_string = ( + message_string_one + + "\n" + + message_string_two + + "\n" + + message_string_three + + "\n" + ) + + server_thread = threading.Thread(target=self.log_server.handle_request) + server_thread.start() + + host, port = self.log_server.server_address + + sock = socket.socket() + sock.connect((host, port)) + + # Sleeps prevent listener from receiving entire message in a single call + # to recv in order to test reconstruction of partial messages. + sock.sendall(message_string[:8].encode()) + time.sleep(0.01) + sock.sendall(message_string[8:32].encode()) + time.sleep(0.01) + sock.sendall(message_string[32:64].encode()) + time.sleep(0.01) + sock.sendall(message_string[64:128].encode()) + time.sleep(0.01) + sock.sendall(message_string[128:].encode()) + + server_thread.join() + + +class Loggable(mozlog.LoggingMixin): + """Trivial class inheriting from LoggingMixin""" + + pass + + +class TestLoggingMixin(unittest.TestCase): + """Tests basic use of LoggingMixin""" + + def test_mixin(self): + loggable = Loggable() + self.assertTrue(not hasattr(loggable, "_logger")) + loggable.log(mozlog.INFO, "This will instantiate the logger") + self.assertTrue(hasattr(loggable, "_logger")) + self.assertEqual(loggable._logger.name, "test_logger.Loggable") + + self.assertRaises(ValueError, loggable.set_logger, "not a logger") + + logger = mozlog.MozLogger("test.mixin") + handler = ListHandler() + logger.addHandler(handler) + loggable.set_logger(logger) + self.assertTrue(isinstance(loggable._logger.handlers[0], ListHandler)) + self.assertEqual(loggable._logger.name, "test.mixin") + + loggable.log(mozlog.WARN, 'message for "log" method') + loggable.info('message for "info" method') + loggable.error('message for "error" method') + loggable.log_structured( + "test_message", + params={"_message": "message for " + '"log_structured" method'}, + ) + + expected_messages = [ + 'message for "log" method', + 'message for "info" method', + 'message for "error" method', + 'message for "log_structured" method', + ] + + actual_messages = loggable._logger.handlers[0].messages + self.assertEqual(expected_messages, actual_messages) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozlog/tests/test_logtypes.py b/testing/mozbase/mozlog/tests/test_logtypes.py new file mode 100644 index 0000000000..177d25cdb0 --- /dev/null +++ b/testing/mozbase/mozlog/tests/test_logtypes.py @@ -0,0 +1,106 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +import unittest + +import mozunit +from mozlog.logtypes import Any, Dict, Int, List, TestList, Tuple, Unicode + + +class TestContainerTypes(unittest.TestCase): + def test_dict_type_basic(self): + d = Dict("name") + with self.assertRaises(ValueError): + d({"foo": "bar"}) + + d = Dict(Any, "name") + d({"foo": "bar"}) # doesn't raise + + def test_dict_type_with_dictionary_item_type(self): + d = Dict({Int: Int}, "name") + with self.assertRaises(ValueError): + d({"foo": 1}) + + with self.assertRaises(ValueError): + d({1: "foo"}) + + d({1: 2}) # doesn't raise + + def test_dict_type_with_recursive_item_types(self): + d = Dict(Dict({Unicode: List(Int)}), "name") + with self.assertRaises(ValueError): + d({"foo": "bar"}) + + with self.assertRaises(ValueError): + d({"foo": {"bar": "baz"}}) + + with self.assertRaises(ValueError): + d({"foo": {"bar": ["baz"]}}) + + d({"foo": {"bar": [1]}}) # doesn't raise + + def test_list_type_basic(self): + l = List("name") + with self.assertRaises(ValueError): + l(["foo"]) + + l = List(Any, "name") + l(["foo", 1]) # doesn't raise + + def test_list_type_with_recursive_item_types(self): + l = List(Dict(List(Tuple((Unicode, Int)))), "name") + with self.assertRaises(ValueError): + l(["foo"]) + + with self.assertRaises(ValueError): + l([{"foo": "bar"}]) + + with self.assertRaises(ValueError): + l([{"foo": ["bar"]}]) + + l([{"foo": [("bar", 1)]}]) # doesn't raise + + def test_tuple_type_basic(self): + t = Tuple("name") + with self.assertRaises(ValueError): + t((1,)) + + t = Tuple(Any, "name") + t((1,)) # doesn't raise + + def test_tuple_type_with_tuple_item_type(self): + t = Tuple((Unicode, Int)) + with self.assertRaises(ValueError): + t(("foo", "bar")) + + t(("foo", 1)) # doesn't raise + + def test_tuple_type_with_recursive_item_types(self): + t = Tuple((Dict(List(Any)), List(Dict(Any)), Unicode), "name") + with self.assertRaises(ValueError): + t(({"foo": "bar"}, [{"foo": "bar"}], "foo")) + + with self.assertRaises(ValueError): + t(({"foo": ["bar"]}, ["foo"], "foo")) + + t(({"foo": ["bar"]}, [{"foo": "bar"}], "foo")) # doesn't raise + + +class TestDataTypes(unittest.TestCase): + def test_test_list(self): + t = TestList("name") + with self.assertRaises(ValueError): + t("foo") + + with self.assertRaises(ValueError): + t({"foo": 1}) + + d1 = t({"default": ["bar"]}) # doesn't raise + d2 = t(["bar"]) # doesn't raise + + self.assertDictContainsSubset(d1, d2) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozlog/tests/test_structured.py b/testing/mozbase/mozlog/tests/test_structured.py new file mode 100644 index 0000000000..f97f3baf42 --- /dev/null +++ b/testing/mozbase/mozlog/tests/test_structured.py @@ -0,0 +1,1097 @@ +# -*- coding: utf-8 -*- + +import argparse +import json +import optparse +import os +import sys +import unittest + +import mozfile +import mozunit +import six +from mozlog import commandline, formatters, handlers, reader, stdadapter, structuredlog +from six import StringIO + + +class TestHandler(object): + def __init__(self): + self.items = [] + + def __call__(self, data): + self.items.append(data) + + @property + def last_item(self): + return self.items[-1] + + @property + def empty(self): + return not self.items + + +class BaseStructuredTest(unittest.TestCase): + def setUp(self): + self.logger = structuredlog.StructuredLogger("test") + self.handler = TestHandler() + self.logger.add_handler(self.handler) + + def pop_last_item(self): + return self.handler.items.pop() + + def assert_log_equals(self, expected, actual=None): + if actual is None: + actual = self.pop_last_item() + + all_expected = {"pid": os.getpid(), "thread": "MainThread", "source": "test"} + specials = set(["time"]) + + all_expected.update(expected) + for key, value in six.iteritems(all_expected): + self.assertEqual(actual[key], value) + + self.assertEqual(set(all_expected.keys()) | specials, set(actual.keys())) + + +class TestStatusHandler(BaseStructuredTest): + def setUp(self): + super(TestStatusHandler, self).setUp() + self.handler = handlers.StatusHandler() + self.logger.add_handler(self.handler) + + def test_failure_run(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status("test1", "sub1", status="PASS") + self.logger.test_status("test1", "sub2", status="TIMEOUT") + self.logger.test_status( + "test1", "sub3", status="FAIL", expected="PASS", known_intermittent=["FAIL"] + ) + self.logger.test_end("test1", status="OK") + self.logger.suite_end() + summary = self.handler.summarize() + self.assertIn("TIMEOUT", summary.unexpected_statuses) + self.assertEqual(1, summary.unexpected_statuses["TIMEOUT"]) + self.assertIn("PASS", summary.expected_statuses) + self.assertEqual(1, summary.expected_statuses["PASS"]) + self.assertIn("OK", summary.expected_statuses) + self.assertEqual(1, summary.expected_statuses["OK"]) + self.assertIn("FAIL", summary.expected_statuses) + self.assertEqual(1, summary.expected_statuses["FAIL"]) + self.assertIn("FAIL", summary.known_intermittent_statuses) + self.assertEqual(1, summary.known_intermittent_statuses["FAIL"]) + self.assertEqual(3, summary.action_counts["test_status"]) + self.assertEqual(1, summary.action_counts["test_end"]) + + def test_precondition_failed_run(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end("test1", status="PRECONDITION_FAILED") + self.logger.test_start("test2") + self.logger.test_status("test2", "sub1", status="PRECONDITION_FAILED") + self.logger.test_end("test2", status="OK") + self.logger.suite_end() + summary = self.handler.summarize() + self.assertEqual(1, summary.expected_statuses["OK"]) + self.assertEqual(2, summary.unexpected_statuses["PRECONDITION_FAILED"]) + + def test_error_run(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.error("ERRR!") + self.logger.test_end("test1", status="OK") + self.logger.test_start("test2") + self.logger.test_end("test2", status="OK") + self.logger.suite_end() + summary = self.handler.summarize() + self.assertIn("ERROR", summary.log_level_counts) + self.assertEqual(1, summary.log_level_counts["ERROR"]) + self.assertIn("OK", summary.expected_statuses) + self.assertEqual(2, summary.expected_statuses["OK"]) + + +class TestSummaryHandler(BaseStructuredTest): + def setUp(self): + super(TestSummaryHandler, self).setUp() + self.handler = handlers.SummaryHandler() + self.logger.add_handler(self.handler) + + def test_failure_run(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status("test1", "sub1", status="PASS") + self.logger.test_status("test1", "sub2", status="TIMEOUT") + self.logger.assertion_count("test1", 5, 1, 10) + self.logger.assertion_count("test1", 5, 10, 15) + self.logger.test_end("test1", status="OK") + self.logger.suite_end() + + counts = self.handler.current["counts"] + self.assertIn("timeout", counts["subtest"]["unexpected"]) + self.assertEqual(1, counts["subtest"]["unexpected"]["timeout"]) + self.assertIn("pass", counts["subtest"]["expected"]) + self.assertEqual(1, counts["subtest"]["expected"]["pass"]) + self.assertIn("ok", counts["test"]["expected"]) + self.assertEqual(1, counts["test"]["expected"]["ok"]) + self.assertIn("pass", counts["assert"]["unexpected"]) + self.assertEqual(1, counts["assert"]["unexpected"]["pass"]) + self.assertIn("fail", counts["assert"]["expected"]) + self.assertEqual(1, counts["assert"]["expected"]["fail"]) + + logs = self.handler.current["unexpected_logs"] + self.assertEqual(1, len(logs)) + self.assertIn("test1", logs) + self.assertEqual(1, len(logs["test1"])) + self.assertEqual("sub2", logs["test1"][0]["subtest"]) + + def test_precondition_failed_run(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status("test1", "sub1", status="PASS") + self.logger.test_end("test1", status="PRECONDITION_FAILED") + self.logger.test_start("test2") + self.logger.test_status("test2", "sub1", status="PRECONDITION_FAILED") + self.logger.test_status("test2", "sub2", status="PRECONDITION_FAILED") + self.logger.test_end("test2", status="OK") + self.logger.suite_end() + + counts = self.handler.current["counts"] + self.assertIn("precondition_failed", counts["test"]["unexpected"]) + self.assertEqual(1, counts["test"]["unexpected"]["precondition_failed"]) + self.assertIn("pass", counts["subtest"]["expected"]) + self.assertEqual(1, counts["subtest"]["expected"]["pass"]) + self.assertIn("ok", counts["test"]["expected"]) + self.assertEqual(1, counts["test"]["expected"]["ok"]) + self.assertIn("precondition_failed", counts["subtest"]["unexpected"]) + self.assertEqual(2, counts["subtest"]["unexpected"]["precondition_failed"]) + + +class TestStructuredLog(BaseStructuredTest): + def test_suite_start(self): + self.logger.suite_start(["test"], "logtest") + self.assert_log_equals( + {"action": "suite_start", "name": "logtest", "tests": {"default": ["test"]}} + ) + self.logger.suite_end() + + def test_suite_end(self): + self.logger.suite_start([]) + self.logger.suite_end() + self.assert_log_equals({"action": "suite_end"}) + + def test_start(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.assert_log_equals({"action": "test_start", "test": "test1"}) + + self.logger.test_start(("test1", "==", "test1-ref"), path="path/to/test") + self.assert_log_equals( + { + "action": "test_start", + "test": ("test1", "==", "test1-ref"), + "path": "path/to/test", + } + ) + self.logger.suite_end() + + def test_start_inprogress(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_start("test1") + self.assert_log_equals( + { + "action": "log", + "message": "test_start for test1 logged while in progress.", + "level": "ERROR", + } + ) + self.logger.suite_end() + + def test_status(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status( + "test1", "subtest name", "fail", expected="FAIL", message="Test message" + ) + self.assert_log_equals( + { + "action": "test_status", + "subtest": "subtest name", + "status": "FAIL", + "message": "Test message", + "test": "test1", + } + ) + self.logger.test_end("test1", "OK") + self.logger.suite_end() + + def test_status_1(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status("test1", "subtest name", "fail") + self.assert_log_equals( + { + "action": "test_status", + "subtest": "subtest name", + "status": "FAIL", + "expected": "PASS", + "test": "test1", + } + ) + self.logger.test_end("test1", "OK") + self.logger.suite_end() + + def test_status_2(self): + self.assertRaises( + ValueError, + self.logger.test_status, + "test1", + "subtest name", + "XXXUNKNOWNXXX", + ) + + def test_status_extra(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status( + "test1", "subtest name", "FAIL", expected="PASS", extra={"data": 42} + ) + self.assert_log_equals( + { + "action": "test_status", + "subtest": "subtest name", + "status": "FAIL", + "expected": "PASS", + "test": "test1", + "extra": {"data": 42}, + } + ) + self.logger.test_end("test1", "OK") + self.logger.suite_end() + + def test_status_stack(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status( + "test1", + "subtest name", + "FAIL", + expected="PASS", + stack="many\nlines\nof\nstack", + ) + self.assert_log_equals( + { + "action": "test_status", + "subtest": "subtest name", + "status": "FAIL", + "expected": "PASS", + "test": "test1", + "stack": "many\nlines\nof\nstack", + } + ) + self.logger.test_end("test1", "OK") + self.logger.suite_end() + + def test_status_known_intermittent(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status( + "test1", "subtest name", "fail", known_intermittent=["FAIL"] + ) + self.assert_log_equals( + { + "action": "test_status", + "subtest": "subtest name", + "status": "FAIL", + "expected": "PASS", + "known_intermittent": ["FAIL"], + "test": "test1", + } + ) + self.logger.test_end("test1", "OK") + self.logger.suite_end() + + def test_status_not_started(self): + self.logger.test_status("test_UNKNOWN", "subtest", "PASS") + self.assertTrue( + self.pop_last_item()["message"].startswith( + "test_status for test_UNKNOWN logged while not in progress. Logged with data: {" + ) + ) + + def test_remove_optional_defaults(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status( + "test1", "subtest name", "fail", message=None, stack=None + ) + self.assert_log_equals( + { + "action": "test_status", + "subtest": "subtest name", + "status": "FAIL", + "expected": "PASS", + "test": "test1", + } + ) + self.logger.test_end("test1", "OK") + self.logger.suite_end() + + def test_remove_optional_defaults_raw_log(self): + self.logger.log_raw({"action": "suite_start", "tests": [1], "name": None}) + self.assert_log_equals({"action": "suite_start", "tests": {"default": ["1"]}}) + self.logger.suite_end() + + def test_end(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end("test1", "fail", message="Test message") + self.assert_log_equals( + { + "action": "test_end", + "status": "FAIL", + "expected": "OK", + "message": "Test message", + "test": "test1", + } + ) + self.logger.suite_end() + + def test_end_1(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end("test1", "PASS", expected="PASS", extra={"data": 123}) + self.assert_log_equals( + { + "action": "test_end", + "status": "PASS", + "extra": {"data": 123}, + "test": "test1", + } + ) + self.logger.suite_end() + + def test_end_2(self): + self.assertRaises(ValueError, self.logger.test_end, "test1", "XXXUNKNOWNXXX") + + def test_end_stack(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end( + "test1", "PASS", expected="PASS", stack="many\nlines\nof\nstack" + ) + self.assert_log_equals( + { + "action": "test_end", + "status": "PASS", + "test": "test1", + "stack": "many\nlines\nof\nstack", + } + ) + self.logger.suite_end() + + def test_end_no_start(self): + self.logger.test_end("test1", "PASS", expected="PASS") + self.assertTrue( + self.pop_last_item()["message"].startswith( + "test_end for test1 logged while not in progress. Logged with data: {" + ) + ) + self.logger.suite_end() + + def test_end_twice(self): + self.logger.suite_start([]) + self.logger.test_start("test2") + self.logger.test_end("test2", "PASS", expected="PASS") + self.assert_log_equals( + {"action": "test_end", "status": "PASS", "test": "test2"} + ) + self.logger.test_end("test2", "PASS", expected="PASS") + last_item = self.pop_last_item() + self.assertEqual(last_item["action"], "log") + self.assertEqual(last_item["level"], "ERROR") + self.assertTrue( + last_item["message"].startswith( + "test_end for test2 logged while not in progress. Logged with data: {" + ) + ) + self.logger.suite_end() + + def test_suite_start_twice(self): + self.logger.suite_start([]) + self.assert_log_equals({"action": "suite_start", "tests": {"default": []}}) + self.logger.suite_start([]) + last_item = self.pop_last_item() + self.assertEqual(last_item["action"], "log") + self.assertEqual(last_item["level"], "ERROR") + self.logger.suite_end() + + def test_suite_end_no_start(self): + self.logger.suite_start([]) + self.assert_log_equals({"action": "suite_start", "tests": {"default": []}}) + self.logger.suite_end() + self.assert_log_equals({"action": "suite_end"}) + self.logger.suite_end() + last_item = self.pop_last_item() + self.assertEqual(last_item["action"], "log") + self.assertEqual(last_item["level"], "ERROR") + + def test_multiple_loggers_suite_start(self): + logger1 = structuredlog.StructuredLogger("test") + self.logger.suite_start([]) + logger1.suite_start([]) + last_item = self.pop_last_item() + self.assertEqual(last_item["action"], "log") + self.assertEqual(last_item["level"], "ERROR") + + def test_multiple_loggers_test_start(self): + logger1 = structuredlog.StructuredLogger("test") + self.logger.suite_start([]) + self.logger.test_start("test") + logger1.test_start("test") + last_item = self.pop_last_item() + self.assertEqual(last_item["action"], "log") + self.assertEqual(last_item["level"], "ERROR") + + def test_process(self): + self.logger.process_output(1234, "test output") + self.assert_log_equals( + {"action": "process_output", "process": "1234", "data": "test output"} + ) + + def test_process_start(self): + self.logger.process_start(1234) + self.assert_log_equals({"action": "process_start", "process": "1234"}) + + def test_process_exit(self): + self.logger.process_exit(1234, 0) + self.assert_log_equals( + {"action": "process_exit", "process": "1234", "exitcode": 0} + ) + + def test_log(self): + for level in ["critical", "error", "warning", "info", "debug"]: + getattr(self.logger, level)("message") + self.assert_log_equals( + {"action": "log", "level": level.upper(), "message": "message"} + ) + + def test_logging_adapter(self): + import logging + + logging.basicConfig(level="DEBUG") + old_level = logging.root.getEffectiveLevel() + logging.root.setLevel("DEBUG") + + std_logger = logging.getLogger("test") + std_logger.setLevel("DEBUG") + + logger = stdadapter.std_logging_adapter(std_logger) + + try: + for level in ["critical", "error", "warning", "info", "debug"]: + getattr(logger, level)("message") + self.assert_log_equals( + {"action": "log", "level": level.upper(), "message": "message"} + ) + finally: + logging.root.setLevel(old_level) + + def test_add_remove_handlers(self): + handler = TestHandler() + self.logger.add_handler(handler) + self.logger.info("test1") + + self.assert_log_equals({"action": "log", "level": "INFO", "message": "test1"}) + + self.assert_log_equals( + {"action": "log", "level": "INFO", "message": "test1"}, + actual=handler.last_item, + ) + + self.logger.remove_handler(handler) + self.logger.info("test2") + + self.assert_log_equals({"action": "log", "level": "INFO", "message": "test2"}) + + self.assert_log_equals( + {"action": "log", "level": "INFO", "message": "test1"}, + actual=handler.last_item, + ) + + def test_wrapper(self): + file_like = structuredlog.StructuredLogFileLike(self.logger) + + file_like.write("line 1") + + self.assert_log_equals({"action": "log", "level": "INFO", "message": "line 1"}) + + file_like.write("line 2\n") + + self.assert_log_equals({"action": "log", "level": "INFO", "message": "line 2"}) + + file_like.write("line 3\r") + + self.assert_log_equals({"action": "log", "level": "INFO", "message": "line 3"}) + + file_like.write("line 4\r\n") + + self.assert_log_equals({"action": "log", "level": "INFO", "message": "line 4"}) + + def test_shutdown(self): + # explicit shutdown + log = structuredlog.StructuredLogger("test 1") + log.add_handler(self.handler) + log.info("line 1") + self.assert_log_equals( + {"action": "log", "level": "INFO", "message": "line 1", "source": "test 1"} + ) + log.shutdown() + self.assert_log_equals({"action": "shutdown", "source": "test 1"}) + with self.assertRaises(structuredlog.LoggerShutdownError): + log.info("bad log") + with self.assertRaises(structuredlog.LoggerShutdownError): + log.log_raw({"action": "log", "level": "info", "message": "bad log"}) + + # shutdown still applies to new instances + del log + log = structuredlog.StructuredLogger("test 1") + with self.assertRaises(structuredlog.LoggerShutdownError): + log.info("bad log") + + # context manager shutdown + with structuredlog.StructuredLogger("test 2") as log: + log.add_handler(self.handler) + log.info("line 2") + self.assert_log_equals( + { + "action": "log", + "level": "INFO", + "message": "line 2", + "source": "test 2", + } + ) + self.assert_log_equals({"action": "shutdown", "source": "test 2"}) + + # shutdown prevents logging across instances + log1 = structuredlog.StructuredLogger("test 3") + log2 = structuredlog.StructuredLogger("test 3", component="bar") + log1.shutdown() + with self.assertRaises(structuredlog.LoggerShutdownError): + log2.info("line 3") + + +class TestTypeConversions(BaseStructuredTest): + def test_raw(self): + self.logger.log_raw({"action": "suite_start", "tests": [1], "time": "1234"}) + self.assert_log_equals( + {"action": "suite_start", "tests": {"default": ["1"]}, "time": 1234} + ) + self.logger.suite_end() + + def test_tuple(self): + self.logger.suite_start([]) + if six.PY3: + self.logger.test_start( + ( + b"\xf0\x90\x8d\x84\xf0\x90\x8c\xb4\xf0\x90" + b"\x8d\x83\xf0\x90\x8d\x84".decode(), + 42, + u"\u16a4", + ) + ) + else: + self.logger.test_start( + ( + "\xf0\x90\x8d\x84\xf0\x90\x8c\xb4\xf0\x90" + "\x8d\x83\xf0\x90\x8d\x84", + 42, + u"\u16a4", + ) + ) + self.assert_log_equals( + { + "action": "test_start", + "test": (u"\U00010344\U00010334\U00010343\U00010344", u"42", u"\u16a4"), + } + ) + self.logger.suite_end() + + def test_non_string_messages(self): + self.logger.suite_start([]) + self.logger.info(1) + self.assert_log_equals({"action": "log", "message": "1", "level": "INFO"}) + self.logger.info([1, (2, "3"), "s", "s" + chr(255)]) + if six.PY3: + self.assert_log_equals( + { + "action": "log", + "message": "[1, (2, '3'), 's', 's\xff']", + "level": "INFO", + } + ) + else: + self.assert_log_equals( + { + "action": "log", + "message": "[1, (2, '3'), 's', 's\\xff']", + "level": "INFO", + } + ) + + self.logger.suite_end() + + def test_utf8str_write(self): + with mozfile.NamedTemporaryFile() as logfile: + _fmt = formatters.TbplFormatter() + _handler = handlers.StreamHandler(logfile, _fmt) + self.logger.add_handler(_handler) + self.logger.suite_start([]) + self.logger.info("☺") + logfile.seek(0) + data = logfile.readlines()[-1].strip() + if six.PY3: + self.assertEqual(data.decode(), "☺") + else: + self.assertEqual(data, "☺") + self.logger.suite_end() + self.logger.remove_handler(_handler) + + def test_arguments(self): + self.logger.info(message="test") + self.assert_log_equals({"action": "log", "message": "test", "level": "INFO"}) + + self.logger.suite_start([], run_info={}) + self.assert_log_equals( + {"action": "suite_start", "tests": {"default": []}, "run_info": {}} + ) + self.logger.test_start(test="test1") + self.logger.test_status("subtest1", "FAIL", test="test1", status="PASS") + self.assert_log_equals( + { + "action": "test_status", + "test": "test1", + "subtest": "subtest1", + "status": "PASS", + "expected": "FAIL", + } + ) + self.logger.process_output(123, "data", "test") + self.assert_log_equals( + { + "action": "process_output", + "process": "123", + "command": "test", + "data": "data", + } + ) + self.assertRaises( + TypeError, + self.logger.test_status, + subtest="subtest2", + status="FAIL", + expected="PASS", + ) + self.assertRaises( + TypeError, + self.logger.test_status, + "test1", + "subtest1", + "PASS", + "FAIL", + "message", + "stack", + {}, + [], + "unexpected", + ) + self.assertRaises(TypeError, self.logger.test_status, "test1", test="test2") + self.logger.suite_end() + + +class TestComponentFilter(BaseStructuredTest): + def test_filter_component(self): + component_logger = structuredlog.StructuredLogger( + self.logger.name, "test_component" + ) + component_logger.component_filter = handlers.LogLevelFilter(lambda x: x, "info") + + self.logger.debug("Test") + self.assertFalse(self.handler.empty) + self.assert_log_equals({"action": "log", "level": "DEBUG", "message": "Test"}) + self.assertTrue(self.handler.empty) + + component_logger.info("Test 1") + self.assertFalse(self.handler.empty) + self.assert_log_equals( + { + "action": "log", + "level": "INFO", + "message": "Test 1", + "component": "test_component", + } + ) + + component_logger.debug("Test 2") + self.assertTrue(self.handler.empty) + + component_logger.component_filter = None + + component_logger.debug("Test 3") + self.assertFalse(self.handler.empty) + self.assert_log_equals( + { + "action": "log", + "level": "DEBUG", + "message": "Test 3", + "component": "test_component", + } + ) + + def test_filter_default_component(self): + component_logger = structuredlog.StructuredLogger( + self.logger.name, "test_component" + ) + + self.logger.debug("Test") + self.assertFalse(self.handler.empty) + self.assert_log_equals({"action": "log", "level": "DEBUG", "message": "Test"}) + + self.logger.component_filter = handlers.LogLevelFilter(lambda x: x, "info") + + self.logger.debug("Test 1") + self.assertTrue(self.handler.empty) + + component_logger.debug("Test 2") + self.assertFalse(self.handler.empty) + self.assert_log_equals( + { + "action": "log", + "level": "DEBUG", + "message": "Test 2", + "component": "test_component", + } + ) + + self.logger.component_filter = None + + self.logger.debug("Test 3") + self.assertFalse(self.handler.empty) + self.assert_log_equals({"action": "log", "level": "DEBUG", "message": "Test 3"}) + + def test_filter_message_mutuate(self): + def filter_mutate(msg): + if msg["action"] == "log": + msg["message"] = "FILTERED! %s" % msg["message"] + return msg + + self.logger.component_filter = filter_mutate + self.logger.debug("Test") + self.assert_log_equals( + {"action": "log", "level": "DEBUG", "message": "FILTERED! Test"} + ) + self.logger.component_filter = None + + +class TestCommandline(unittest.TestCase): + def setUp(self): + self.logfile = mozfile.NamedTemporaryFile() + + @property + def loglines(self): + self.logfile.seek(0) + return [line.rstrip() for line in self.logfile.readlines()] + + def test_setup_logging(self): + parser = argparse.ArgumentParser() + commandline.add_logging_group(parser) + args = parser.parse_args(["--log-raw=-"]) + logger = commandline.setup_logging("test_setup_logging", args, {}) + self.assertEqual(len(logger.handlers), 1) + + def test_setup_logging_optparse(self): + parser = optparse.OptionParser() + commandline.add_logging_group(parser) + args, _ = parser.parse_args(["--log-raw=-"]) + logger = commandline.setup_logging("test_optparse", args, {}) + self.assertEqual(len(logger.handlers), 1) + self.assertIsInstance(logger.handlers[0], handlers.StreamHandler) + + def test_limit_formatters(self): + parser = argparse.ArgumentParser() + commandline.add_logging_group(parser, include_formatters=["raw"]) + other_formatters = [fmt for fmt in commandline.log_formatters if fmt != "raw"] + # check that every formatter except raw is not present + for fmt in other_formatters: + with self.assertRaises(SystemExit): + parser.parse_args(["--log-%s=-" % fmt]) + with self.assertRaises(SystemExit): + parser.parse_args(["--log-%s-level=error" % fmt]) + # raw is still ok + args = parser.parse_args(["--log-raw=-"]) + logger = commandline.setup_logging("test_setup_logging2", args, {}) + self.assertEqual(len(logger.handlers), 1) + + def test_setup_logging_optparse_unicode(self): + parser = optparse.OptionParser() + commandline.add_logging_group(parser) + args, _ = parser.parse_args([u"--log-raw=-"]) + logger = commandline.setup_logging("test_optparse_unicode", args, {}) + self.assertEqual(len(logger.handlers), 1) + self.assertEqual(logger.handlers[0].stream, sys.stdout) + self.assertIsInstance(logger.handlers[0], handlers.StreamHandler) + + def test_logging_defaultlevel(self): + parser = argparse.ArgumentParser() + commandline.add_logging_group(parser) + + args = parser.parse_args(["--log-tbpl=%s" % self.logfile.name]) + logger = commandline.setup_logging("test_fmtopts", args, {}) + logger.info("INFO message") + logger.debug("DEBUG message") + logger.error("ERROR message") + # The debug level is not logged by default. + self.assertEqual([b"INFO message", b"ERROR message"], self.loglines) + + def test_logging_errorlevel(self): + parser = argparse.ArgumentParser() + commandline.add_logging_group(parser) + args = parser.parse_args( + ["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=error"] + ) + logger = commandline.setup_logging("test_fmtopts", args, {}) + logger.info("INFO message") + logger.debug("DEBUG message") + logger.error("ERROR message") + + # Only the error level and above were requested. + self.assertEqual([b"ERROR message"], self.loglines) + + def test_logging_debuglevel(self): + parser = argparse.ArgumentParser() + commandline.add_logging_group(parser) + args = parser.parse_args( + ["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=debug"] + ) + logger = commandline.setup_logging("test_fmtopts", args, {}) + logger.info("INFO message") + logger.debug("DEBUG message") + logger.error("ERROR message") + # Requesting a lower log level than default works as expected. + self.assertEqual( + [b"INFO message", b"DEBUG message", b"ERROR message"], self.loglines + ) + + def test_unused_options(self): + parser = argparse.ArgumentParser() + commandline.add_logging_group(parser) + args = parser.parse_args(["--log-tbpl-level=error"]) + self.assertRaises( + ValueError, commandline.setup_logging, "test_fmtopts", args, {} + ) + + +class TestBuffer(BaseStructuredTest): + def assert_log_equals(self, expected, actual=None): + if actual is None: + actual = self.pop_last_item() + + all_expected = { + "pid": os.getpid(), + "thread": "MainThread", + "source": "testBuffer", + } + specials = set(["time"]) + + all_expected.update(expected) + for key, value in six.iteritems(all_expected): + self.assertEqual(actual[key], value) + + self.assertEqual(set(all_expected.keys()) | specials, set(actual.keys())) + + def setUp(self): + self.logger = structuredlog.StructuredLogger("testBuffer") + self.handler = handlers.BufferHandler(TestHandler(), message_limit=4) + self.logger.add_handler(self.handler) + + def tearDown(self): + self.logger.remove_handler(self.handler) + + def pop_last_item(self): + return self.handler.inner.items.pop() + + def test_buffer_messages(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.send_message("buffer", "off") + self.logger.test_status("test1", "sub1", status="PASS") + # Even for buffered actions, the buffer does not interfere if + # buffering is turned off. + self.assert_log_equals( + { + "action": "test_status", + "test": "test1", + "status": "PASS", + "subtest": "sub1", + } + ) + self.logger.send_message("buffer", "on") + self.logger.test_status("test1", "sub2", status="PASS") + self.logger.test_status("test1", "sub3", status="PASS") + self.logger.test_status("test1", "sub4", status="PASS") + self.logger.test_status("test1", "sub5", status="PASS") + self.logger.test_status("test1", "sub6", status="PASS") + self.logger.test_status("test1", "sub7", status="PASS") + self.logger.test_end("test1", status="OK") + self.logger.send_message("buffer", "clear") + self.assert_log_equals({"action": "test_end", "test": "test1", "status": "OK"}) + self.logger.suite_end() + + def test_buffer_size(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status("test1", "sub1", status="PASS") + self.logger.test_status("test1", "sub2", status="PASS") + self.logger.test_status("test1", "sub3", status="PASS") + self.logger.test_status("test1", "sub4", status="PASS") + self.logger.test_status("test1", "sub5", status="PASS") + self.logger.test_status("test1", "sub6", status="PASS") + self.logger.test_status("test1", "sub7", status="PASS") + + # No test status messages made it to the underlying handler. + self.assert_log_equals({"action": "test_start", "test": "test1"}) + + # The buffer's actual size never grows beyond the specified limit. + self.assertEqual(len(self.handler._buffer), 4) + + self.logger.test_status("test1", "sub8", status="FAIL") + # The number of messages deleted comes back in a list. + self.assertEqual([4], self.logger.send_message("buffer", "flush")) + + # When the buffer is dumped, the failure is the last thing logged + self.assert_log_equals( + { + "action": "test_status", + "test": "test1", + "subtest": "sub8", + "status": "FAIL", + "expected": "PASS", + } + ) + # Three additional messages should have been retained for context + self.assert_log_equals( + { + "action": "test_status", + "test": "test1", + "status": "PASS", + "subtest": "sub7", + } + ) + self.assert_log_equals( + { + "action": "test_status", + "test": "test1", + "status": "PASS", + "subtest": "sub6", + } + ) + self.assert_log_equals( + { + "action": "test_status", + "test": "test1", + "status": "PASS", + "subtest": "sub5", + } + ) + self.assert_log_equals({"action": "suite_start", "tests": {"default": []}}) + + +class TestReader(unittest.TestCase): + def to_file_like(self, obj): + data_str = "\n".join(json.dumps(item) for item in obj) + return StringIO(data_str) + + def test_read(self): + data = [ + {"action": "action_0", "data": "data_0"}, + {"action": "action_1", "data": "data_1"}, + ] + + f = self.to_file_like(data) + self.assertEqual(data, list(reader.read(f))) + + def test_imap_log(self): + data = [ + {"action": "action_0", "data": "data_0"}, + {"action": "action_1", "data": "data_1"}, + ] + + f = self.to_file_like(data) + + def f_action_0(item): + return ("action_0", item["data"]) + + def f_action_1(item): + return ("action_1", item["data"]) + + res_iter = reader.imap_log( + reader.read(f), {"action_0": f_action_0, "action_1": f_action_1} + ) + self.assertEqual( + [("action_0", "data_0"), ("action_1", "data_1")], list(res_iter) + ) + + def test_each_log(self): + data = [ + {"action": "action_0", "data": "data_0"}, + {"action": "action_1", "data": "data_1"}, + ] + + f = self.to_file_like(data) + + count = {"action_0": 0, "action_1": 0} + + def f_action_0(item): + count[item["action"]] += 1 + + def f_action_1(item): + count[item["action"]] += 2 + + reader.each_log( + reader.read(f), {"action_0": f_action_0, "action_1": f_action_1} + ) + + self.assertEqual({"action_0": 1, "action_1": 2}, count) + + def test_handler(self): + data = [ + {"action": "action_0", "data": "data_0"}, + {"action": "action_1", "data": "data_1"}, + ] + + f = self.to_file_like(data) + + test = self + + class ReaderTestHandler(reader.LogHandler): + def __init__(self): + self.action_0_count = 0 + self.action_1_count = 0 + + def action_0(self, item): + test.assertEqual(item["action"], "action_0") + self.action_0_count += 1 + + def action_1(self, item): + test.assertEqual(item["action"], "action_1") + self.action_1_count += 1 + + handler = ReaderTestHandler() + reader.handle_log(reader.read(f), handler) + + self.assertEqual(handler.action_0_count, 1) + self.assertEqual(handler.action_1_count, 1) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozlog/tests/test_terminal_colors.py b/testing/mozbase/mozlog/tests/test_terminal_colors.py new file mode 100644 index 0000000000..2dd72b7d53 --- /dev/null +++ b/testing/mozbase/mozlog/tests/test_terminal_colors.py @@ -0,0 +1,62 @@ +# encoding: utf-8 + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import sys +from io import StringIO + +import mozunit +import pytest +from mozterm import Terminal + + +@pytest.fixture +def terminal(): + blessed = pytest.importorskip("blessed") + + kind = "xterm-256color" + try: + term = Terminal(stream=StringIO(), force_styling=True, kind=kind) + except blessed.curses.error: + pytest.skip("terminal '{}' not found".format(kind)) + + return term + + +EXPECTED_DICT = { + "log_test_status_fail": "\x1b[31mlog_test_status_fail\x1b(B\x1b[m", + "log_process_output": "\x1b[34mlog_process_output\x1b(B\x1b[m", + "log_test_status_pass": "\x1b[32mlog_test_status_pass\x1b(B\x1b[m", + "log_test_status_unexpected_fail": "\x1b[31mlog_test_status_unexpected_fail\x1b(B\x1b[m", + "log_test_status_known_intermittent": "\x1b[33mlog_test_status_known_intermittent\x1b(B\x1b[m", + "time": "\x1b[36mtime\x1b(B\x1b[m", + "action": "\x1b[33maction\x1b(B\x1b[m", + "pid": "\x1b[36mpid\x1b(B\x1b[m", + "heading": "\x1b[1m\x1b[33mheading\x1b(B\x1b[m", + "sub_heading": "\x1b[33msub_heading\x1b(B\x1b[m", + "error": "\x1b[31merror\x1b(B\x1b[m", + "warning": "\x1b[33mwarning\x1b(B\x1b[m", + "bold": "\x1b[1mbold\x1b(B\x1b[m", + "grey": "\x1b[38;2;190;190;190mgrey\x1b(B\x1b[m", + "normal": "\x1b[90mnormal\x1b(B\x1b[m", + "bright_black": "\x1b[90mbright_black\x1b(B\x1b[m", +} + + +@pytest.mark.skipif( + not sys.platform.startswith("win"), + reason="Only do ANSI Escape Sequence comparisons on Windows.", +) +def test_terminal_colors(terminal): + from mozlog.formatters.machformatter import TerminalColors, color_dict + + actual_dict = TerminalColors(terminal, color_dict) + + for key in color_dict: + assert getattr(actual_dict, key)(key) == EXPECTED_DICT[key] + + +if __name__ == "__main__": + mozunit.main() |