summaryrefslogtreecommitdiffstats
path: root/tests/units/test_runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/units/test_runner.py')
-rw-r--r--tests/units/test_runner.py140
1 files changed, 137 insertions, 3 deletions
diff --git a/tests/units/test_runner.py b/tests/units/test_runner.py
index d5bb892..955149d 100644
--- a/tests/units/test_runner.py
+++ b/tests/units/test_runner.py
@@ -6,6 +6,9 @@
from __future__ import annotations
import logging
+import resource
+from pathlib import Path
+from unittest.mock import patch
import pytest
@@ -13,10 +16,11 @@ from anta import logger
from anta.catalog import AntaCatalog
from anta.inventory import AntaInventory
from anta.result_manager import ResultManager
-from anta.runner import main
+from anta.runner import adjust_rlimit_nofile, main, prepare_tests
from .test_models import FakeTest
+DATA_DIR: Path = Path(__file__).parent.parent.resolve() / "data"
FAKE_CATALOG: AntaCatalog = AntaCatalog.from_list([(FakeTest, None)])
@@ -47,8 +51,8 @@ async def test_runner_empty_inventory(caplog: pytest.LogCaptureFixture) -> None:
manager = ResultManager()
inventory = AntaInventory()
await main(manager, inventory, FAKE_CATALOG)
- assert len(caplog.record_tuples) == 1
- assert "The inventory is empty, exiting" in caplog.records[0].message
+ assert len(caplog.record_tuples) == 3
+ assert "The inventory is empty, exiting" in caplog.records[1].message
@pytest.mark.asyncio()
@@ -70,3 +74,133 @@ async def test_runner_no_selected_device(caplog: pytest.LogCaptureFixture, test_
await main(manager, test_inventory, FAKE_CATALOG, tags={"toto"})
assert "No reachable device matching the tags {'toto'} was found." in [record.message for record in caplog.records]
+
+
+def test_adjust_rlimit_nofile_valid_env(caplog: pytest.LogCaptureFixture) -> None:
+ """Test adjust_rlimit_nofile with valid environment variables."""
+ with (
+ caplog.at_level(logging.DEBUG),
+ patch.dict("os.environ", {"ANTA_NOFILE": "20480"}),
+ patch("anta.runner.resource.getrlimit") as getrlimit_mock,
+ patch("anta.runner.resource.setrlimit") as setrlimit_mock,
+ ):
+ # Simulate the default system limits
+ system_limits = (8192, 1048576)
+
+ # Setup getrlimit mock return value
+ getrlimit_mock.return_value = system_limits
+
+ # Simulate setrlimit behavior
+ def side_effect_setrlimit(resource_id: int, limits: tuple[int, int]) -> None:
+ _ = resource_id
+ getrlimit_mock.return_value = (limits[0], limits[1])
+
+ setrlimit_mock.side_effect = side_effect_setrlimit
+
+ result = adjust_rlimit_nofile()
+
+ # Assert the limits were updated as expected
+ assert result == (20480, 1048576)
+ assert "Initial limit numbers for open file descriptors for the current ANTA process: Soft Limit: 8192 | Hard Limit: 1048576" in caplog.text
+ assert "Setting soft limit for open file descriptors for the current ANTA process to 20480" in caplog.text
+
+ setrlimit_mock.assert_called_once_with(resource.RLIMIT_NOFILE, (20480, 1048576))
+
+
+def test_adjust_rlimit_nofile_invalid_env(caplog: pytest.LogCaptureFixture) -> None:
+ """Test adjust_rlimit_nofile with valid environment variables."""
+ with (
+ caplog.at_level(logging.DEBUG),
+ patch.dict("os.environ", {"ANTA_NOFILE": "invalid"}),
+ patch("anta.runner.resource.getrlimit") as getrlimit_mock,
+ patch("anta.runner.resource.setrlimit") as setrlimit_mock,
+ ):
+ # Simulate the default system limits
+ system_limits = (8192, 1048576)
+
+ # Setup getrlimit mock return value
+ getrlimit_mock.return_value = system_limits
+
+ # Simulate setrlimit behavior
+ def side_effect_setrlimit(resource_id: int, limits: tuple[int, int]) -> None:
+ _ = resource_id
+ getrlimit_mock.return_value = (limits[0], limits[1])
+
+ setrlimit_mock.side_effect = side_effect_setrlimit
+
+ result = adjust_rlimit_nofile()
+
+ # Assert the limits were updated as expected
+ assert result == (16384, 1048576)
+ assert "The ANTA_NOFILE environment variable value is invalid" in caplog.text
+ assert caplog.records[0].levelname == "WARNING"
+ assert "Initial limit numbers for open file descriptors for the current ANTA process: Soft Limit: 8192 | Hard Limit: 1048576" in caplog.text
+ assert "Setting soft limit for open file descriptors for the current ANTA process to 16384" in caplog.text
+
+ setrlimit_mock.assert_called_once_with(resource.RLIMIT_NOFILE, (16384, 1048576))
+
+
+@pytest.mark.asyncio()
+@pytest.mark.parametrize(
+ ("tags", "expected_tests_count", "expected_devices_count"),
+ [
+ (None, 22, 3),
+ ({"leaf"}, 9, 3),
+ ({"invalid_tag"}, 0, 0),
+ ],
+ ids=["no_tags", "leaf_tag", "invalid_tag"],
+)
+async def test_prepare_tests(
+ caplog: pytest.LogCaptureFixture,
+ test_inventory: AntaInventory,
+ tags: set[str] | None,
+ expected_tests_count: int,
+ expected_devices_count: int,
+) -> None:
+ """Test the runner prepare_tests function."""
+ logger.setup_logging(logger.Log.INFO)
+ caplog.set_level(logging.INFO)
+
+ catalog: AntaCatalog = AntaCatalog.parse(str(DATA_DIR / "test_catalog_with_tags.yml"))
+ selected_tests = prepare_tests(inventory=test_inventory, catalog=catalog, tags=tags, tests=None)
+
+ if selected_tests is None:
+ assert expected_tests_count == 0
+ expected_log = f"There are no tests matching the tags {tags} to run in the current test catalog and device inventory, please verify your inputs."
+ assert expected_log in caplog.text
+ else:
+ assert len(selected_tests) == expected_devices_count
+ assert sum(len(tests) for tests in selected_tests.values()) == expected_tests_count
+
+
+@pytest.mark.asyncio()
+async def test_prepare_tests_with_specific_tests(caplog: pytest.LogCaptureFixture, test_inventory: AntaInventory) -> None:
+ """Test the runner prepare_tests function with specific tests."""
+ logger.setup_logging(logger.Log.INFO)
+ caplog.set_level(logging.INFO)
+
+ catalog: AntaCatalog = AntaCatalog.parse(str(DATA_DIR / "test_catalog_with_tags.yml"))
+ selected_tests = prepare_tests(inventory=test_inventory, catalog=catalog, tags=None, tests={"VerifyMlagStatus", "VerifyUptime"})
+
+ assert selected_tests is not None
+ assert len(selected_tests) == 3
+ assert sum(len(tests) for tests in selected_tests.values()) == 5
+
+
+@pytest.mark.asyncio()
+async def test_runner_dry_run(caplog: pytest.LogCaptureFixture, test_inventory: AntaInventory) -> None:
+ """Test that when dry_run is True, no tests are run.
+
+ caplog is the pytest fixture to capture logs
+ test_inventory is a fixture that gives a default inventory for tests
+ """
+ logger.setup_logging(logger.Log.INFO)
+ caplog.set_level(logging.INFO)
+ manager = ResultManager()
+ catalog_path = Path(__file__).parent.parent / "data" / "test_catalog.yml"
+ catalog = AntaCatalog.parse(catalog_path)
+
+ await main(manager, test_inventory, catalog, dry_run=True)
+
+ # Check that the last log contains Dry-run
+ assert "Dry-run" in caplog.records[-1].message