summaryrefslogtreecommitdiffstats
path: root/testing/mozbase/moztest/moztest/results.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--testing/mozbase/moztest/moztest/results.py366
1 files changed, 366 insertions, 0 deletions
diff --git a/testing/mozbase/moztest/moztest/results.py b/testing/mozbase/moztest/moztest/results.py
new file mode 100644
index 0000000000..5193a9db2b
--- /dev/null
+++ b/testing/mozbase/moztest/moztest/results.py
@@ -0,0 +1,366 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import os
+import time
+
+import mozinfo
+import six
+
+
+class TestContext(object):
+ """Stores context data about the test"""
+
+ attrs = [
+ "hostname",
+ "arch",
+ "env",
+ "os",
+ "os_version",
+ "tree",
+ "revision",
+ "product",
+ "logfile",
+ "testgroup",
+ "harness",
+ "buildtype",
+ ]
+
+ def __init__(
+ self,
+ hostname="localhost",
+ tree="",
+ revision="",
+ product="",
+ logfile=None,
+ arch="",
+ operating_system="",
+ testgroup="",
+ harness="moztest",
+ buildtype="",
+ ):
+ self.hostname = hostname
+ self.arch = arch or mozinfo.processor
+ self.env = os.environ.copy()
+ self.os = operating_system or mozinfo.os
+ self.os_version = mozinfo.version
+ self.tree = tree
+ self.revision = revision
+ self.product = product
+ self.logfile = logfile
+ self.testgroup = testgroup
+ self.harness = harness
+ self.buildtype = buildtype
+
+ def __str__(self):
+ return "%s (%s, %s)" % (self.hostname, self.os, self.arch)
+
+ def __repr__(self):
+ return "<%s>" % self.__str__()
+
+ def __eq__(self, other):
+ if not isinstance(other, TestContext):
+ return False
+ diffs = [a for a in self.attrs if getattr(self, a) != getattr(other, a)]
+ return len(diffs) == 0
+
+ def __hash__(self):
+ def get(attr):
+ value = getattr(self, attr)
+ if isinstance(value, dict):
+ value = frozenset(six.iteritems(value))
+ return value
+
+ return hash(frozenset([get(a) for a in self.attrs]))
+
+
+class TestResult(object):
+ """Stores test result data"""
+
+ FAIL_RESULTS = [
+ "UNEXPECTED-PASS",
+ "UNEXPECTED-FAIL",
+ "ERROR",
+ ]
+ COMPUTED_RESULTS = FAIL_RESULTS + [
+ "PASS",
+ "KNOWN-FAIL",
+ "SKIPPED",
+ ]
+ POSSIBLE_RESULTS = [
+ "PASS",
+ "FAIL",
+ "SKIP",
+ "ERROR",
+ ]
+
+ def __init__(
+ self, name, test_class="", time_start=None, context=None, result_expected="PASS"
+ ):
+ """Create a TestResult instance.
+ name = name of the test that is running
+ test_class = the class that the test belongs to
+ time_start = timestamp (seconds since UNIX epoch) of when the test started
+ running; if not provided, defaults to the current time
+ ! Provide 0 if you only have the duration
+ context = TestContext instance; can be None
+ result_expected = string representing the expected outcome of the test"""
+
+ msg = "Result '%s' not in possible results: %s" % (
+ result_expected,
+ ", ".join(self.POSSIBLE_RESULTS),
+ )
+ assert isinstance(name, six.string_types), "name has to be a string"
+ assert result_expected in self.POSSIBLE_RESULTS, msg
+
+ self.name = name
+ self.test_class = test_class
+ self.context = context
+ self.time_start = time_start if time_start is not None else time.time()
+ self.time_end = None
+ self._result_expected = result_expected
+ self._result_actual = None
+ self.result = None
+ self.filename = None
+ self.description = None
+ self.output = []
+ self.reason = None
+
+ @property
+ def test_name(self):
+ return "%s.py %s.%s" % (
+ self.test_class.split(".")[0],
+ self.test_class,
+ self.name,
+ )
+
+ def __str__(self):
+ return "%s | %s (%s) | %s" % (
+ self.result or "PENDING",
+ self.name,
+ self.test_class,
+ self.reason,
+ )
+
+ def __repr__(self):
+ return "<%s>" % self.__str__()
+
+ def calculate_result(self, expected, actual):
+ if actual == "ERROR":
+ return "ERROR"
+ if actual == "SKIP":
+ return "SKIPPED"
+
+ if expected == "PASS":
+ if actual == "PASS":
+ return "PASS"
+ if actual == "FAIL":
+ return "UNEXPECTED-FAIL"
+
+ if expected == "FAIL":
+ if actual == "PASS":
+ return "UNEXPECTED-PASS"
+ if actual == "FAIL":
+ return "KNOWN-FAIL"
+
+ # if actual is skip or error, we return at the beginning, so if we get
+ # here it is definitely some kind of error
+ return "ERROR"
+
+ def infer_results(self, computed_result):
+ assert computed_result in self.COMPUTED_RESULTS
+ if computed_result == "UNEXPECTED-PASS":
+ expected = "FAIL"
+ actual = "PASS"
+ elif computed_result == "UNEXPECTED-FAIL":
+ expected = "PASS"
+ actual = "FAIL"
+ elif computed_result == "KNOWN-FAIL":
+ expected = actual = "FAIL"
+ elif computed_result == "SKIPPED":
+ expected = actual = "SKIP"
+ else:
+ return
+ self._result_expected = expected
+ self._result_actual = actual
+
+ def finish(self, result, time_end=None, output=None, reason=None):
+ """Marks the test as finished, storing its end time and status
+ ! Provide the duration as time_end if you only have that."""
+
+ if result in self.POSSIBLE_RESULTS:
+ self._result_actual = result
+ self.result = self.calculate_result(
+ self._result_expected, self._result_actual
+ )
+ elif result in self.COMPUTED_RESULTS:
+ self.infer_results(result)
+ self.result = result
+ else:
+ valid = self.POSSIBLE_RESULTS + self.COMPUTED_RESULTS
+ msg = "Result '%s' not valid. Need one of: %s" % (result, ", ".join(valid))
+ raise ValueError(msg)
+
+ # use lists instead of multiline strings
+ if isinstance(output, six.string_types):
+ output = output.splitlines()
+
+ self.time_end = time_end if time_end is not None else time.time()
+ self.output = output or self.output
+ self.reason = reason
+
+ @property
+ def finished(self):
+ """Boolean saying if the test is finished or not"""
+ return self.result is not None
+
+ @property
+ def duration(self):
+ """Returns the time it took for the test to finish. If the test is
+ not finished, returns the elapsed time so far"""
+ if self.result is not None:
+ return self.time_end - self.time_start
+ else:
+ # returns the elapsed time
+ return time.time() - self.time_start
+
+
+class TestResultCollection(list):
+ """Container class that stores test results"""
+
+ resultClass = TestResult
+
+ def __init__(self, suite_name, time_taken=0, resultClass=None):
+ list.__init__(self)
+ self.suite_name = suite_name
+ self.time_taken = time_taken
+ if resultClass is not None:
+ self.resultClass = resultClass
+
+ def __str__(self):
+ return "%s (%.2fs)\n%s" % (self.suite_name, self.time_taken, list.__str__(self))
+
+ def subset(self, predicate):
+ tests = self.filter(predicate)
+ duration = 0
+ sub = TestResultCollection(self.suite_name)
+ for t in tests:
+ sub.append(t)
+ duration += t.duration
+ sub.time_taken = duration
+ return sub
+
+ @property
+ def contexts(self):
+ """List of unique contexts for the test results contained"""
+ cs = [tr.context for tr in self]
+ return list(set(cs))
+
+ def filter(self, predicate):
+ """Returns a generator of TestResults that satisfy a given predicate"""
+ return (tr for tr in self if predicate(tr))
+
+ def tests_with_result(self, result):
+ """Returns a generator of TestResults with the given result"""
+ msg = "Result '%s' not in possible results: %s" % (
+ result,
+ ", ".join(self.resultClass.COMPUTED_RESULTS),
+ )
+ assert result in self.resultClass.COMPUTED_RESULTS, msg
+ return self.filter(lambda t: t.result == result)
+
+ @property
+ def tests(self):
+ """Generator of all tests in the collection"""
+ return (t for t in self)
+
+ def add_result(
+ self,
+ test,
+ result_expected="PASS",
+ result_actual="PASS",
+ output="",
+ context=None,
+ ):
+ def get_class(test):
+ return test.__class__.__module__ + "." + test.__class__.__name__
+
+ t = self.resultClass(
+ name=str(test).split()[0],
+ test_class=get_class(test),
+ time_start=0,
+ result_expected=result_expected,
+ context=context,
+ )
+ t.finish(result_actual, time_end=0, reason=relevant_line(output), output=output)
+ self.append(t)
+
+ @property
+ def num_failures(self):
+ fails = 0
+ for t in self:
+ if t.result in self.resultClass.FAIL_RESULTS:
+ fails += 1
+ return fails
+
+ def add_unittest_result(self, result, context=None):
+ """Adds the python unittest result provided to the collection"""
+ if hasattr(result, "time_taken"):
+ self.time_taken += result.time_taken
+
+ for test, output in result.errors:
+ self.add_result(test, result_actual="ERROR", output=output)
+
+ for test, output in result.failures:
+ self.add_result(test, result_actual="FAIL", output=output)
+
+ if hasattr(result, "unexpectedSuccesses"):
+ for test in result.unexpectedSuccesses:
+ self.add_result(test, result_expected="FAIL", result_actual="PASS")
+
+ if hasattr(result, "skipped"):
+ for test, output in result.skipped:
+ self.add_result(
+ test, result_expected="SKIP", result_actual="SKIP", output=output
+ )
+
+ if hasattr(result, "expectedFailures"):
+ for test, output in result.expectedFailures:
+ self.add_result(
+ test, result_expected="FAIL", result_actual="FAIL", output=output
+ )
+
+ # unittest does not store these by default
+ if hasattr(result, "tests_passed"):
+ for test in result.tests_passed:
+ self.add_result(test)
+
+ @classmethod
+ def from_unittest_results(cls, context, *results):
+ """Creates a TestResultCollection containing the given python
+ unittest results"""
+
+ if not results:
+ return cls("from unittest")
+
+ # all the TestResult instances share the same context
+ context = context or TestContext()
+
+ collection = cls("from %s" % results[0].__class__.__name__)
+
+ for result in results:
+ collection.add_unittest_result(result, context)
+
+ return collection
+
+
+# used to get exceptions/errors from tracebacks
+def relevant_line(s):
+ KEYWORDS = ("Error:", "Exception:", "error:", "exception:")
+ lines = s.splitlines()
+ for line in lines:
+ for keyword in KEYWORDS:
+ if keyword in line:
+ return line
+ return "N/A"