From 92240acb5cc600eec60624ece9ed4b9ec43b386f Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 23 May 2024 07:06:46 +0200 Subject: Adding upstream version 0.15.0. Signed-off-by: Daniel Baumann --- tests/units/test_runner.py | 140 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 137 insertions(+), 3 deletions(-) (limited to 'tests/units/test_runner.py') diff --git a/tests/units/test_runner.py b/tests/units/test_runner.py index d5bb892..955149d 100644 --- a/tests/units/test_runner.py +++ b/tests/units/test_runner.py @@ -6,6 +6,9 @@ from __future__ import annotations import logging +import resource +from pathlib import Path +from unittest.mock import patch import pytest @@ -13,10 +16,11 @@ from anta import logger from anta.catalog import AntaCatalog from anta.inventory import AntaInventory from anta.result_manager import ResultManager -from anta.runner import main +from anta.runner import adjust_rlimit_nofile, main, prepare_tests from .test_models import FakeTest +DATA_DIR: Path = Path(__file__).parent.parent.resolve() / "data" FAKE_CATALOG: AntaCatalog = AntaCatalog.from_list([(FakeTest, None)]) @@ -47,8 +51,8 @@ async def test_runner_empty_inventory(caplog: pytest.LogCaptureFixture) -> None: manager = ResultManager() inventory = AntaInventory() await main(manager, inventory, FAKE_CATALOG) - assert len(caplog.record_tuples) == 1 - assert "The inventory is empty, exiting" in caplog.records[0].message + assert len(caplog.record_tuples) == 3 + assert "The inventory is empty, exiting" in caplog.records[1].message @pytest.mark.asyncio() @@ -70,3 +74,133 @@ async def test_runner_no_selected_device(caplog: pytest.LogCaptureFixture, test_ await main(manager, test_inventory, FAKE_CATALOG, tags={"toto"}) assert "No reachable device matching the tags {'toto'} was found." in [record.message for record in caplog.records] + + +def test_adjust_rlimit_nofile_valid_env(caplog: pytest.LogCaptureFixture) -> None: + """Test adjust_rlimit_nofile with valid environment variables.""" + with ( + caplog.at_level(logging.DEBUG), + patch.dict("os.environ", {"ANTA_NOFILE": "20480"}), + patch("anta.runner.resource.getrlimit") as getrlimit_mock, + patch("anta.runner.resource.setrlimit") as setrlimit_mock, + ): + # Simulate the default system limits + system_limits = (8192, 1048576) + + # Setup getrlimit mock return value + getrlimit_mock.return_value = system_limits + + # Simulate setrlimit behavior + def side_effect_setrlimit(resource_id: int, limits: tuple[int, int]) -> None: + _ = resource_id + getrlimit_mock.return_value = (limits[0], limits[1]) + + setrlimit_mock.side_effect = side_effect_setrlimit + + result = adjust_rlimit_nofile() + + # Assert the limits were updated as expected + assert result == (20480, 1048576) + assert "Initial limit numbers for open file descriptors for the current ANTA process: Soft Limit: 8192 | Hard Limit: 1048576" in caplog.text + assert "Setting soft limit for open file descriptors for the current ANTA process to 20480" in caplog.text + + setrlimit_mock.assert_called_once_with(resource.RLIMIT_NOFILE, (20480, 1048576)) + + +def test_adjust_rlimit_nofile_invalid_env(caplog: pytest.LogCaptureFixture) -> None: + """Test adjust_rlimit_nofile with valid environment variables.""" + with ( + caplog.at_level(logging.DEBUG), + patch.dict("os.environ", {"ANTA_NOFILE": "invalid"}), + patch("anta.runner.resource.getrlimit") as getrlimit_mock, + patch("anta.runner.resource.setrlimit") as setrlimit_mock, + ): + # Simulate the default system limits + system_limits = (8192, 1048576) + + # Setup getrlimit mock return value + getrlimit_mock.return_value = system_limits + + # Simulate setrlimit behavior + def side_effect_setrlimit(resource_id: int, limits: tuple[int, int]) -> None: + _ = resource_id + getrlimit_mock.return_value = (limits[0], limits[1]) + + setrlimit_mock.side_effect = side_effect_setrlimit + + result = adjust_rlimit_nofile() + + # Assert the limits were updated as expected + assert result == (16384, 1048576) + assert "The ANTA_NOFILE environment variable value is invalid" in caplog.text + assert caplog.records[0].levelname == "WARNING" + assert "Initial limit numbers for open file descriptors for the current ANTA process: Soft Limit: 8192 | Hard Limit: 1048576" in caplog.text + assert "Setting soft limit for open file descriptors for the current ANTA process to 16384" in caplog.text + + setrlimit_mock.assert_called_once_with(resource.RLIMIT_NOFILE, (16384, 1048576)) + + +@pytest.mark.asyncio() +@pytest.mark.parametrize( + ("tags", "expected_tests_count", "expected_devices_count"), + [ + (None, 22, 3), + ({"leaf"}, 9, 3), + ({"invalid_tag"}, 0, 0), + ], + ids=["no_tags", "leaf_tag", "invalid_tag"], +) +async def test_prepare_tests( + caplog: pytest.LogCaptureFixture, + test_inventory: AntaInventory, + tags: set[str] | None, + expected_tests_count: int, + expected_devices_count: int, +) -> None: + """Test the runner prepare_tests function.""" + logger.setup_logging(logger.Log.INFO) + caplog.set_level(logging.INFO) + + catalog: AntaCatalog = AntaCatalog.parse(str(DATA_DIR / "test_catalog_with_tags.yml")) + selected_tests = prepare_tests(inventory=test_inventory, catalog=catalog, tags=tags, tests=None) + + if selected_tests is None: + assert expected_tests_count == 0 + expected_log = f"There are no tests matching the tags {tags} to run in the current test catalog and device inventory, please verify your inputs." + assert expected_log in caplog.text + else: + assert len(selected_tests) == expected_devices_count + assert sum(len(tests) for tests in selected_tests.values()) == expected_tests_count + + +@pytest.mark.asyncio() +async def test_prepare_tests_with_specific_tests(caplog: pytest.LogCaptureFixture, test_inventory: AntaInventory) -> None: + """Test the runner prepare_tests function with specific tests.""" + logger.setup_logging(logger.Log.INFO) + caplog.set_level(logging.INFO) + + catalog: AntaCatalog = AntaCatalog.parse(str(DATA_DIR / "test_catalog_with_tags.yml")) + selected_tests = prepare_tests(inventory=test_inventory, catalog=catalog, tags=None, tests={"VerifyMlagStatus", "VerifyUptime"}) + + assert selected_tests is not None + assert len(selected_tests) == 3 + assert sum(len(tests) for tests in selected_tests.values()) == 5 + + +@pytest.mark.asyncio() +async def test_runner_dry_run(caplog: pytest.LogCaptureFixture, test_inventory: AntaInventory) -> None: + """Test that when dry_run is True, no tests are run. + + caplog is the pytest fixture to capture logs + test_inventory is a fixture that gives a default inventory for tests + """ + logger.setup_logging(logger.Log.INFO) + caplog.set_level(logging.INFO) + manager = ResultManager() + catalog_path = Path(__file__).parent.parent / "data" / "test_catalog.yml" + catalog = AntaCatalog.parse(catalog_path) + + await main(manager, test_inventory, catalog, dry_run=True) + + # Check that the last log contains Dry-run + assert "Dry-run" in caplog.records[-1].message -- cgit v1.2.3