summaryrefslogtreecommitdiffstats
path: root/testing/mozbase/mozlog/mozlog/formatters/grouping.py
diff options
context:
space:
mode:
Diffstat (limited to 'testing/mozbase/mozlog/mozlog/formatters/grouping.py')
-rw-r--r--testing/mozbase/mozlog/mozlog/formatters/grouping.py391
1 files changed, 391 insertions, 0 deletions
diff --git a/testing/mozbase/mozlog/mozlog/formatters/grouping.py b/testing/mozbase/mozlog/mozlog/formatters/grouping.py
new file mode 100644
index 0000000000..36993b5595
--- /dev/null
+++ b/testing/mozbase/mozlog/mozlog/formatters/grouping.py
@@ -0,0 +1,391 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+import collections
+import os
+import platform
+import subprocess
+import sys
+
+import six
+
+from mozlog.formatters import base
+
+DEFAULT_MOVE_UP_CODE = u"\x1b[A"
+DEFAULT_CLEAR_EOL_CODE = u"\x1b[K"
+
+
+class GroupingFormatter(base.BaseFormatter):
+ """Formatter designed to produce unexpected test results grouped
+ together in a readable format."""
+
+ def __init__(self):
+ super(GroupingFormatter, self).__init__()
+ self.number_of_tests = 0
+ self.completed_tests = 0
+ self.need_to_erase_last_line = False
+ self.current_display = ""
+ self.running_tests = {}
+ self.test_output = collections.defaultdict(str)
+ self.subtest_failures = collections.defaultdict(list)
+ self.test_failure_text = ""
+ self.tests_with_failing_subtests = []
+ self.interactive = os.isatty(sys.stdout.fileno())
+ self.show_logs = False
+
+ self.message_handler.register_message_handlers(
+ "show_logs",
+ {
+ "on": self._enable_show_logs,
+ "off": self._disable_show_logs,
+ },
+ )
+
+ # TODO(mrobinson, 8313): We need to add support for Windows terminals here.
+ if self.interactive:
+ self.move_up, self.clear_eol = self.get_move_up_and_clear_eol_codes()
+ if platform.system() != "Windows":
+ self.line_width = int(
+ subprocess.check_output(["stty", "size"]).split()[1]
+ )
+ else:
+ # Until we figure out proper Windows support,
+ # this makes things work well enough to run.
+ self.line_width = 80
+
+ self.expected = {
+ "OK": 0,
+ "PASS": 0,
+ "FAIL": 0,
+ "PRECONDITION_FAILED": 0,
+ "ERROR": 0,
+ "TIMEOUT": 0,
+ "SKIP": 0,
+ "CRASH": 0,
+ }
+
+ self.unexpected_tests = {
+ "OK": [],
+ "PASS": [],
+ "FAIL": [],
+ "PRECONDITION_FAILED": [],
+ "ERROR": [],
+ "TIMEOUT": [],
+ "CRASH": [],
+ }
+
+ # Follows the format of {(<test>, <subtest>): <data>}, where
+ # (<test>, None) represents a top level test.
+ self.known_intermittent_results = {}
+
+ def _enable_show_logs(self):
+ self.show_logs = True
+
+ def _disable_show_logs(self):
+ self.show_logs = False
+
+ def get_move_up_and_clear_eol_codes(self):
+ try:
+ import blessed
+ except ImportError:
+ return DEFAULT_MOVE_UP_CODE, DEFAULT_CLEAR_EOL_CODE
+
+ try:
+ self.terminal = blessed.Terminal()
+ return self.terminal.move_up, self.terminal.clear_eol
+ except Exception as exception:
+ sys.stderr.write(
+ "GroupingFormatter: Could not get terminal "
+ "control characters: %s\n" % exception
+ )
+ return DEFAULT_MOVE_UP_CODE, DEFAULT_CLEAR_EOL_CODE
+
+ def text_to_erase_display(self):
+ if not self.interactive or not self.current_display:
+ return ""
+ return (self.move_up + self.clear_eol) * self.current_display.count("\n")
+
+ def generate_output(self, text=None, new_display=None):
+ if not self.interactive:
+ return text
+
+ output = self.text_to_erase_display()
+ if text:
+ output += text
+ if new_display is not None:
+ self.current_display = new_display
+ return output + self.current_display
+
+ def build_status_line(self):
+ if self.number_of_tests == 0:
+ new_display = " [%i] " % self.completed_tests
+ else:
+ new_display = " [%i/%i] " % (self.completed_tests, self.number_of_tests)
+
+ if self.running_tests:
+ indent = " " * len(new_display)
+ if self.interactive:
+ max_width = self.line_width - len(new_display)
+ else:
+ max_width = sys.maxsize
+ return (
+ new_display
+ + ("\n%s" % indent).join(
+ val[:max_width] for val in self.running_tests.values()
+ )
+ + "\n"
+ )
+ else:
+ return new_display + "No tests running.\n"
+
+ def suite_start(self, data):
+ self.number_of_tests = sum(
+ len(tests) for tests in six.itervalues(data["tests"])
+ )
+ self.start_time = data["time"]
+
+ if self.number_of_tests == 0:
+ return "Running tests in %s\n\n" % data[u"source"]
+ else:
+ return "Running %i tests in %s\n\n" % (
+ self.number_of_tests,
+ data[u"source"],
+ )
+
+ def test_start(self, data):
+ self.running_tests[data["thread"]] = data["test"]
+ return self.generate_output(text=None, new_display=self.build_status_line())
+
+ def wrap_and_indent_lines(self, lines, indent):
+ assert len(lines) > 0
+
+ output = indent + u"\u25B6 %s\n" % lines[0]
+ for line in lines[1:-1]:
+ output += indent + u"\u2502 %s\n" % line
+ if len(lines) > 1:
+ output += indent + u"\u2514 %s\n" % lines[-1]
+ return output
+
+ def get_lines_for_unexpected_result(
+ self, test_name, status, expected, message, stack
+ ):
+ # Test names sometimes contain control characters, which we want
+ # to be printed in their raw form, and not their interpreted form.
+ test_name = test_name.encode("unicode-escape").decode("utf-8")
+
+ if expected:
+ expected_text = u" [expected %s]" % expected
+ else:
+ expected_text = u""
+
+ lines = [u"%s%s %s" % (status, expected_text, test_name)]
+ if message:
+ lines.append(u" \u2192 %s" % message)
+ if stack:
+ lines.append("")
+ lines += [stackline for stackline in stack.splitlines()]
+ return lines
+
+ def get_lines_for_known_intermittents(self, known_intermittent_results):
+ lines = []
+
+ for (test, subtest), data in six.iteritems(self.known_intermittent_results):
+ status = data["status"]
+ known_intermittent = ", ".join(data["known_intermittent"])
+ expected = " [expected %s, known intermittent [%s]" % (
+ data["expected"],
+ known_intermittent,
+ )
+ lines += [
+ u"%s%s %s%s"
+ % (
+ status,
+ expected,
+ test,
+ (", %s" % subtest) if subtest is not None else "",
+ )
+ ]
+ output = self.wrap_and_indent_lines(lines, " ") + "\n"
+ return output
+
+ def get_output_for_unexpected_subtests(self, test_name, unexpected_subtests):
+ if not unexpected_subtests:
+ return ""
+
+ def add_subtest_failure(lines, subtest, stack=None):
+ lines += self.get_lines_for_unexpected_result(
+ subtest.get("subtest", None),
+ subtest.get("status", None),
+ subtest.get("expected", None),
+ subtest.get("message", None),
+ stack,
+ )
+
+ def make_subtests_failure(test_name, subtests, stack=None):
+ lines = [u"Unexpected subtest result in %s:" % test_name]
+ for subtest in subtests[:-1]:
+ add_subtest_failure(lines, subtest, None)
+ add_subtest_failure(lines, subtests[-1], stack)
+ return self.wrap_and_indent_lines(lines, " ") + "\n"
+
+ # Organize the failures by stack trace so we don't print the same stack trace
+ # more than once. They are really tall and we don't want to flood the screen
+ # with duplicate information.
+ output = ""
+ failures_by_stack = collections.defaultdict(list)
+ for failure in unexpected_subtests:
+ # Print stackless results first. They are all separate.
+ if "stack" not in failure:
+ output += make_subtests_failure(test_name, [failure], None)
+ else:
+ failures_by_stack[failure["stack"]].append(failure)
+
+ for (stack, failures) in six.iteritems(failures_by_stack):
+ output += make_subtests_failure(test_name, failures, stack)
+ return output
+
+ def test_end(self, data):
+ self.completed_tests += 1
+ test_status = data["status"]
+ test_name = data["test"]
+ known_intermittent_statuses = data.get("known_intermittent", [])
+ subtest_failures = self.subtest_failures.pop(test_name, [])
+ if "expected" in data and test_status not in known_intermittent_statuses:
+ had_unexpected_test_result = True
+ else:
+ had_unexpected_test_result = False
+
+ del self.running_tests[data["thread"]]
+ new_display = self.build_status_line()
+
+ if not had_unexpected_test_result and not subtest_failures:
+ self.expected[test_status] += 1
+ if self.interactive:
+ return self.generate_output(text=None, new_display=new_display)
+ else:
+ return self.generate_output(
+ text=" %s\n" % test_name, new_display=new_display
+ )
+
+ if test_status in known_intermittent_statuses:
+ self.known_intermittent_results[(test_name, None)] = data
+
+ # If the test crashed or timed out, we also include any process output,
+ # because there is a good chance that the test produced a stack trace
+ # or other error messages.
+ if test_status in ("CRASH", "TIMEOUT"):
+ stack = self.test_output[test_name] + data.get("stack", "")
+ else:
+ stack = data.get("stack", None)
+
+ output = ""
+ if had_unexpected_test_result:
+ self.unexpected_tests[test_status].append(data)
+ lines = self.get_lines_for_unexpected_result(
+ test_name,
+ test_status,
+ data.get("expected", None),
+ data.get("message", None),
+ stack,
+ )
+ output += self.wrap_and_indent_lines(lines, " ") + "\n"
+
+ if subtest_failures:
+ self.tests_with_failing_subtests.append(test_name)
+ output += self.get_output_for_unexpected_subtests(
+ test_name, subtest_failures
+ )
+ self.test_failure_text += output
+
+ return self.generate_output(text=output, new_display=new_display)
+
+ def test_status(self, data):
+ if "expected" in data and data["status"] not in data.get(
+ "known_intermittent", []
+ ):
+ self.subtest_failures[data["test"]].append(data)
+ elif data["status"] in data.get("known_intermittent", []):
+ self.known_intermittent_results[(data["test"], data["subtest"])] = data
+
+ def suite_end(self, data):
+ self.end_time = data["time"]
+
+ if not self.interactive:
+ output = u"\n"
+ else:
+ output = ""
+
+ output += u"Ran %i tests finished in %.1f seconds.\n" % (
+ self.completed_tests,
+ (self.end_time - self.start_time) / 1000.0,
+ )
+ output += u" \u2022 %i ran as expected. %i tests skipped.\n" % (
+ sum(self.expected.values()),
+ self.expected["SKIP"],
+ )
+ if self.known_intermittent_results:
+ output += u" \u2022 %i known intermittent results.\n" % (
+ len(self.known_intermittent_results)
+ )
+
+ def text_for_unexpected_list(text, section):
+ tests = self.unexpected_tests[section]
+ if not tests:
+ return u""
+ return u" \u2022 %i tests %s\n" % (len(tests), text)
+
+ output += text_for_unexpected_list(u"crashed unexpectedly", "CRASH")
+ output += text_for_unexpected_list(u"had errors unexpectedly", "ERROR")
+ output += text_for_unexpected_list(u"failed unexpectedly", "FAIL")
+ output += text_for_unexpected_list(
+ u"precondition failed unexpectedly", "PRECONDITION_FAILED"
+ )
+ output += text_for_unexpected_list(u"timed out unexpectedly", "TIMEOUT")
+ output += text_for_unexpected_list(u"passed unexpectedly", "PASS")
+ output += text_for_unexpected_list(u"unexpectedly okay", "OK")
+
+ num_with_failing_subtests = len(self.tests_with_failing_subtests)
+ if num_with_failing_subtests:
+ output += (
+ u" \u2022 %i tests had unexpected subtest results\n"
+ % num_with_failing_subtests
+ )
+ output += "\n"
+
+ # Repeat failing test output, so that it is easier to find, since the
+ # non-interactive version prints all the test names.
+ if not self.interactive and self.test_failure_text:
+ output += u"Tests with unexpected results:\n" + self.test_failure_text
+
+ if self.known_intermittent_results:
+ results = self.get_lines_for_known_intermittents(
+ self.known_intermittent_results
+ )
+ output += u"Tests with known intermittent results:\n" + results
+
+ return self.generate_output(text=output, new_display="")
+
+ def process_output(self, data):
+ if data["thread"] not in self.running_tests:
+ return
+ test_name = self.running_tests[data["thread"]]
+ self.test_output[test_name] += data["data"] + "\n"
+
+ def log(self, data):
+ if data.get("component"):
+ message = "%s %s %s" % (data["component"], data["level"], data["message"])
+ else:
+ message = "%s %s" % (data["level"], data["message"])
+ if "stack" in data:
+ message += "\n%s" % data["stack"]
+
+ # We are logging messages that begin with STDERR, because that is how exceptions
+ # in this formatter are indicated.
+ if data["message"].startswith("STDERR"):
+ return self.generate_output(text=message + "\n")
+
+ if data["level"] in ("CRITICAL", "ERROR"):
+ return self.generate_output(text=message + "\n")
+ # Show all messages if show_logs switched on.
+ if self.show_logs:
+ return self.generate_output(text=message + "\n")