summaryrefslogtreecommitdiffstats
path: root/tests/deckard/deckard_pytest.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/deckard/deckard_pytest.py')
-rwxr-xr-xtests/deckard/deckard_pytest.py67
1 files changed, 67 insertions, 0 deletions
diff --git a/tests/deckard/deckard_pytest.py b/tests/deckard/deckard_pytest.py
new file mode 100755
index 0000000..3c5f4b8
--- /dev/null
+++ b/tests/deckard/deckard_pytest.py
@@ -0,0 +1,67 @@
+import logging
+import os
+import subprocess
+import random
+import sys
+import time
+
+import pytest
+
+import deckard
+
+
+def set_coverage_env(path, qmin):
+ """Sets up enviroment variables so code coverage utility can work."""
+ if os.environ.get("COVERAGE"):
+ exports = subprocess.check_output([os.environ["COVERAGE_ENV_SCRIPT"],
+ os.environ["DAEMONSRCDIR"],
+ os.environ["COVERAGE_STATSDIR"],
+ path + "-qmin-" + str(qmin)]).decode()
+ for export in exports.split():
+ key, value = export.split("=", 1)
+ value = value.strip('"')
+ os.environ[key] = value
+
+
+def check_platform():
+ if sys.platform == 'windows':
+ pytest.exit('Not supported at all on Windows')
+
+
+# Suppress extensive Augeas logging
+logging.getLogger("augeas").setLevel(logging.ERROR)
+
+
+check_platform()
+
+
+def run_test(path, qmin, config, max_retries, retries=0):
+ set_coverage_env(path, qmin)
+ try:
+ del os.environ["SOCKET_WRAPPER_DIR"]
+ except KeyError:
+ pass
+ try:
+ deckard.process_file(path, qmin, config)
+ except deckard.DeckardUnderLoadError as e:
+ if retries < max_retries:
+ logging.error("Deckard under load. Retrying…")
+ # Exponential backoff
+ time.sleep((2 ** retries) + random.random())
+ run_test(path, qmin, config, max_retries, retries + 1)
+ else:
+ raise e
+
+
+def test_passes_qmin_on(scenario, max_retries):
+ if scenario.qmin is True or scenario.qmin is None:
+ run_test(scenario.path, True, scenario.config, max_retries)
+ else:
+ pytest.skip("Query minimization is off in test config")
+
+
+def test_passes_qmin_off(scenario, max_retries):
+ if scenario.qmin is False or scenario.qmin is None:
+ run_test(scenario.path, False, scenario.config, max_retries)
+ else:
+ pytest.skip("Query minimization is on in test config")