diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:04:41 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:04:41 +0000 |
commit | 975f66f2eebe9dadba04f275774d4ab83f74cf25 (patch) | |
tree | 89bd26a93aaae6a25749145b7e4bca4a1e75b2be /ansible_collections/splunk/es/tests/unit/plugins | |
parent | Initial commit. (diff) | |
download | ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.tar.xz ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.zip |
Adding upstream version 7.7.0+dfsg.upstream/7.7.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/splunk/es/tests/unit/plugins')
8 files changed, 1975 insertions, 0 deletions
diff --git a/ansible_collections/splunk/es/tests/unit/plugins/action/__init__.py b/ansible_collections/splunk/es/tests/unit/plugins/action/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/splunk/es/tests/unit/plugins/action/__init__.py diff --git a/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_adaptive_response_notable_events.py b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_adaptive_response_notable_events.py new file mode 100644 index 000000000..b6a84fc78 --- /dev/null +++ b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_adaptive_response_notable_events.py @@ -0,0 +1,443 @@ +# Copyright (c) 2022 Red Hat +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. +# + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +from ansible.module_utils.six import PY2 + +builtin_import = "builtins.__import__" +if PY2: + builtin_import = "__builtin__.__import__" + +import tempfile +from ansible.playbook.task import Task +from ansible.template import Templar +from ansible_collections.splunk.es.plugins.action.splunk_adaptive_response_notable_events import ( + ActionModule, +) +from ansible_collections.splunk.es.plugins.module_utils.splunk import ( + SplunkRequest, +) +from ansible_collections.ansible.utils.tests.unit.compat.mock import ( + MagicMock, + patch, +) + +RESPONSE_PAYLOAD = [ + { + "entry": [ + { + "content": { + "action.notable.param.default_owner": "", + "action.notable.param.default_status": "0", + "action.notable.param.drilldown_name": "test_drill_name", + "action.notable.param.drilldown_search": "test_drill", + "action.notable.param.drilldown_earliest_offset": "$info_min_time$", + "action.notable.param.drilldown_latest_offset": "$info_max_time$", + "action.notable.param.extract_artifacts": '{"asset": ["src", "dest", "dvc", "orig_host"],"identity": ' + '["src_user", "user", "src_user_id", "src_user_role", "user_id", "user_role", "vendor_account"]}', + "action.notable.param.investigation_profiles": '{"profile://test profile 1":{}, "profile://test profile 2":{}, ' + '"profile://test profile 3":{}}', + "action.notable.param.next_steps": '{"version": 1, "data": "[[action|makestreams]][[action|nbtstat]][[action|nslookup]]"}', + "action.notable.param.recommended_actions": "email,logevent,makestreams,nbtstat", + "action.notable.param.rule_description": "test notable event", + "action.notable.param.rule_title": "ansible_test_notable", + "action.notable.param.security_domain": "threat", + "action.notable.param.severity": "high", + "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent' + 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai' + 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio' + 'n.src" as "src" | where "count">=6', + "actions": "notable", + }, + "name": "Ansible Test", + } + ] + }, + { + "entry": [ + { + "content": { + "action.notable.param.default_owner": "", + "action.notable.param.default_status": "", + "action.notable.param.drilldown_name": "test_drill_name", + "action.notable.param.drilldown_search": "test_drill", + "action.notable.param.drilldown_earliest_offset": "$info_min_time$", + "action.notable.param.drilldown_latest_offset": "$info_max_time$", + "action.notable.param.extract_artifacts": '{"asset": ["src", "dest"],"identity": ["src_user", "user", "src_user_id"]}', + "action.notable.param.investigation_profiles": '{"profile://test profile 1":{}, "profile://test profile 2":{}, ' + '"profile://test profile 3":{}}', + "action.notable.param.next_steps": '{"version": 1, "data": "[[action|makestreams]]"}', + "action.notable.param.recommended_actions": "email,logevent", + "action.notable.param.rule_description": "test notable event", + "action.notable.param.rule_title": "ansible_test_notable", + "action.notable.param.security_domain": "threat", + "action.notable.param.severity": "high", + "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent' + 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai' + 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio' + 'n.src" as "src" | where "count">=6', + "actions": "notable", + }, + "name": "Ansible Test", + } + ] + }, +] + +REQUEST_PAYLOAD = [ + { + "correlation_search_name": "Ansible Test", + "default_status": "unassigned", + "description": "test notable event", + "drilldown_earliest_offset": "$info_min_time$", + "drilldown_latest_offset": "$info_max_time$", + "drilldown_name": "test_drill_name", + "drilldown_search": "test_drill", + "extract_artifacts": { + "asset": ["src", "dest", "dvc", "orig_host"], + "identity": [ + "src_user", + "user", + "src_user_id", + "src_user_role", + "user_id", + "user_role", + "vendor_account", + ], + }, + "investigation_profiles": [ + "test profile 1", + "test profile 2", + "test profile 3", + ], + "next_steps": ["makestreams", "nbtstat", "nslookup"], + "name": "ansible_test_notable", + "recommended_actions": ["email", "logevent", "makestreams", "nbtstat"], + "security_domain": "threat", + "severity": "high", + }, + { + "correlation_search_name": "Ansible Test", + "description": "test notable event", + "drilldown_earliest_offset": "$info_min_time$", + "drilldown_latest_offset": "$info_max_time$", + "extract_artifacts": { + "asset": ["src", "dest"], + "identity": ["src_user", "user", "src_user_id"], + }, + "next_steps": ["makestreams"], + "name": "ansible_test_notable", + "recommended_actions": ["email", "logevent"], + "security_domain": "threat", + "severity": "high", + }, +] + + +class TestSplunkEsAdaptiveResponseNotableEvents: + def setup(self): + task = MagicMock(Task) + # Ansible > 2.13 looks for check_mode in task + task.check_mode = False + play_context = MagicMock() + # Ansible <= 2.13 looks for check_mode in play_context + play_context.check_mode = False + connection = patch( + "ansible_collections.splunk.es.plugins.module_utils.splunk.Connection" + ) + connection._socket_path = tempfile.NamedTemporaryFile().name + fake_loader = {} + templar = Templar(loader=fake_loader) + self._plugin = ActionModule( + task=task, + connection=connection, + play_context=play_context, + loader=fake_loader, + templar=templar, + shared_loader_obj=None, + ) + self._plugin._task.action = "adaptive_response_notable_events" + self._plugin._task.async_val = False + self._task_vars = {} + self.metadata = { + "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent' + 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai' + 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio' + 'n.src" as "src" | where "count">=6', + "actions": "notable", + } + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_adaptive_response_notable_events_merged_01( + self, connection, monkeypatch + ): + metadata = { + "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent' + 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai' + 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio' + 'n.src" as "src" | where "count">=6', + "actions": "", + } + self._plugin.api_response = RESPONSE_PAYLOAD[0] + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = {}, metadata + + def create_update(self, rest_path, data=None): + return RESPONSE_PAYLOAD[0] + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD[0]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_adaptive_response_notable_events_merged_02( + self, connection, monkeypatch + ): + self._plugin.api_response = RESPONSE_PAYLOAD[0] + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = ( + RESPONSE_PAYLOAD[0], + self.metadata, + ) + + def create_update(self, rest_path, data=None): + return RESPONSE_PAYLOAD[1] + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD[1]], + } + result = self._plugin.run(task_vars=self._task_vars) + + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_adaptive_response_notable_events_merged_idempotent( + self, conn, monkeypatch + ): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + def create_update(self, rest_path, data=None): + return RESPONSE_PAYLOAD[0] + + def get_by_path(self, path): + return RESPONSE_PAYLOAD[0] + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD[0]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_adaptive_response_notable_events_replaced_01( + self, conn, monkeypatch + ): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = ( + RESPONSE_PAYLOAD[0], + self.metadata, + ) + + def create_update(self, rest_path, data=None): + return RESPONSE_PAYLOAD[0] + + def get_by_path(self, path): + return RESPONSE_PAYLOAD[0] + + def delete_by_path(self, path): + return {} + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path) + + self._plugin._task.args = { + "state": "replaced", + "config": [REQUEST_PAYLOAD[1]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_adaptive_response_notable_events_replaced_02( + self, conn, monkeypatch + ): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = ( + RESPONSE_PAYLOAD[0], + self.metadata, + ) + + def create_update(self, rest_path, data=None): + return RESPONSE_PAYLOAD[0] + + def get_by_path(self, path): + return RESPONSE_PAYLOAD[0] + + def delete_by_path(self, path): + return {} + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path) + + self._plugin._task.args = { + "state": "replaced", + "config": [REQUEST_PAYLOAD[1]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_adaptive_response_notable_events_replaced_idempotent( + self, conn, monkeypatch + ): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + def create_update(self, rest_path, data=None): + return RESPONSE_PAYLOAD[0] + + def get_by_path(self, path): + return RESPONSE_PAYLOAD[0] + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + self._plugin._task.args = { + "state": "replaced", + "config": [REQUEST_PAYLOAD[0]], + } + result = self._plugin.run(task_vars=self._task_vars) + + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_adaptive_response_notable_events_deleted( + self, conn, monkeypatch + ): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = ( + RESPONSE_PAYLOAD[0], + self.metadata, + ) + + def create_update(self, rest_path, data=None): + return RESPONSE_PAYLOAD[0] + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + + self._plugin._task.args = { + "state": "deleted", + "config": [ + { + "correlation_search_name": "Ansible Test", + } + ], + } + result = self._plugin.run(task_vars=self._task_vars) + + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_adaptive_response_notable_events_deleted_idempotent( + self, connection + ): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = {}, {} + + self._plugin._task.args = { + "state": "deleted", + "config": [ + { + "correlation_search_name": "Ansible Test", + } + ], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_adaptive_response_notable_events_gathered( + self, conn, monkeypatch + ): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = ( + RESPONSE_PAYLOAD[0], + self.metadata, + ) + + self._plugin._task.args = { + "state": "gathered", + "config": [ + { + "correlation_search_name": "Ansible Test", + } + ], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False diff --git a/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_correlation_searches.py b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_correlation_searches.py new file mode 100644 index 000000000..fca268c98 --- /dev/null +++ b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_correlation_searches.py @@ -0,0 +1,373 @@ +# Copyright (c) 2022 Red Hat +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. +# + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +from ansible.module_utils.six import PY2 + +builtin_import = "builtins.__import__" +if PY2: + builtin_import = "__builtin__.__import__" + +import tempfile +from ansible.playbook.task import Task +from ansible.template import Templar +from ansible_collections.splunk.es.plugins.action.splunk_correlation_searches import ( + ActionModule, +) +from ansible_collections.splunk.es.plugins.module_utils.splunk import ( + SplunkRequest, +) +from ansible_collections.ansible.utils.tests.unit.compat.mock import ( + MagicMock, + patch, +) + +RESPONSE_PAYLOAD = { + "entry": [ + { + "acl": {"app": "DA-ESS-EndpointProtection"}, + "content": { + "action.correlationsearch.annotations": '{"cis20": ["test1"], "mitre_attack": ["test2"], "kill_chain_phases": ["test3"], ' + '"nist": ["test4"], "test_framework": ["test5"]}', + "action.correlationsearch.enabled": "1", + "action.correlationsearch.label": "Ansible Test", + "alert.digest_mode": True, + "alert.suppress": False, + "alert.suppress.fields": "test_field1", + "alert.suppress.period": "5s", + "alert_comparator": "greater than", + "alert_threshold": "10", + "alert_type": "number of events", + "cron_schedule": "*/5 * * * *", + "description": "test description", + "disabled": False, + "dispatch.earliest_time": "-24h", + "dispatch.latest_time": "now", + "dispatch.rt_backfill": True, + "is_scheduled": True, + "realtime_schedule": True, + "request.ui_dispatch_app": "SplunkEnterpriseSecuritySuite", + "schedule_priority": "default", + "schedule_window": "0", + "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent' + 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai' + 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio' + 'n.src" as "src" | where "count">=6', + }, + "name": "Ansible Test", + } + ] +} + +REQUEST_PAYLOAD = [ + { + "name": "Ansible Test", + "disabled": False, + "description": "test description", + "app": "DA-ESS-EndpointProtection", + "annotations": { + "cis20": ["test1"], + "mitre_attack": ["test2"], + "kill_chain_phases": ["test3"], + "nist": ["test4"], + "custom": [ + { + "framework": "test_framework", + "custom_annotations": ["test5"], + } + ], + }, + "ui_dispatch_context": "SplunkEnterpriseSecuritySuite", + "time_earliest": "-24h", + "time_latest": "now", + "cron_schedule": "*/5 * * * *", + "scheduling": "realtime", + "schedule_window": "0", + "schedule_priority": "default", + "trigger_alert": "once", + "trigger_alert_when": "number of events", + "trigger_alert_when_condition": "greater than", + "trigger_alert_when_value": "10", + "throttle_window_duration": "5s", + "throttle_fields_to_group_by": ["test_field1"], + "suppress_alerts": False, + "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent' + 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai' + 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio' + 'n.src" as "src" | where "count">=6', + }, + { + "name": "Ansible Test", + "disabled": False, + "description": "test description", + "app": "SplunkEnterpriseSecuritySuite", + "annotations": { + "cis20": ["test1", "test2"], + "mitre_attack": ["test3", "test4"], + "kill_chain_phases": ["test5", "test6"], + "nist": ["test7", "test8"], + "custom": [ + { + "framework": "test_framework2", + "custom_annotations": ["test9", "test10"], + } + ], + }, + "ui_dispatch_context": "SplunkEnterpriseSecuritySuite", + "time_earliest": "-24h", + "time_latest": "now", + "cron_schedule": "*/5 * * * *", + "scheduling": "continuous", + "schedule_window": "auto", + "schedule_priority": "default", + "trigger_alert": "once", + "trigger_alert_when": "number of events", + "trigger_alert_when_condition": "greater than", + "trigger_alert_when_value": "10", + "throttle_window_duration": "5s", + "throttle_fields_to_group_by": ["test_field1", "test_field2"], + "suppress_alerts": True, + "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent' + 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai' + 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio' + 'n.src" as "src" | where "count">=6', + }, +] + + +class TestSplunkEsCorrelationSearches: + def setup(self): + task = MagicMock(Task) + # Ansible > 2.13 looks for check_mode in task + task.check_mode = False + play_context = MagicMock() + # Ansible <= 2.13 looks for check_mode in play_context + play_context.check_mode = False + connection = patch( + "ansible_collections.splunk.es.plugins.module_utils.splunk.Connection" + ) + connection._socket_path = tempfile.NamedTemporaryFile().name + fake_loader = {} + templar = Templar(loader=fake_loader) + self._plugin = ActionModule( + task=task, + connection=connection, + play_context=play_context, + loader=fake_loader, + templar=templar, + shared_loader_obj=None, + ) + self._plugin._task.action = "correlation_searches" + self._plugin._task.async_val = False + self._task_vars = {} + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_correlation_searches_merged(self, connection, monkeypatch): + self._plugin.api_response = RESPONSE_PAYLOAD + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = {} + + def create_update(self, rest_path, data=None): + return RESPONSE_PAYLOAD + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD[0]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_correlation_searches_merged_idempotent( + self, conn, monkeypatch + ): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + def create_update(self, rest_path, data=None): + return RESPONSE_PAYLOAD + + def get_by_path(self, path): + return RESPONSE_PAYLOAD + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD[0]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_correlation_searches_replaced_01(self, conn, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = RESPONSE_PAYLOAD + + def create_update(self, rest_path, data=None): + return RESPONSE_PAYLOAD + + def get_by_path(self, path): + return RESPONSE_PAYLOAD + + def delete_by_path(self, path): + return {} + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path) + + self._plugin._task.args = { + "state": "replaced", + "config": [REQUEST_PAYLOAD[1]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_correlation_searches_replaced_02(self, conn, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = RESPONSE_PAYLOAD + + def create_update(self, rest_path, data=None): + return RESPONSE_PAYLOAD + + def get_by_path(self, path): + return RESPONSE_PAYLOAD + + def delete_by_path(self, path): + return {} + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path) + + self._plugin._task.args = { + "state": "replaced", + "config": [REQUEST_PAYLOAD[1]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_correlation_searches_replaced_idempotent( + self, conn, monkeypatch + ): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + def create_update(self, rest_path, data=None): + return RESPONSE_PAYLOAD + + def get_by_path(self, path): + return RESPONSE_PAYLOAD + + def delete_by_path(self, path): + return {} + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path) + + self._plugin._task.args = { + "state": "replaced", + "config": [REQUEST_PAYLOAD[0]], + } + result = self._plugin.run(task_vars=self._task_vars) + + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_correlation_searches_deleted(self, conn, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + def get_by_path(self, path): + return RESPONSE_PAYLOAD + + def delete_by_path(self, path): + return {} + + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path) + + self._plugin._task.args = { + "state": "deleted", + "config": [{"name": "Ansible Test"}], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_correlation_searches_deleted_idempotent(self, connection): + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = {} + + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin._task.args = { + "state": "deleted", + "config": [{"name": "Ansible Test"}], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_correlation_searches_gathered(self, conn, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + def get_by_path(self, path): + return RESPONSE_PAYLOAD + + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + self._plugin._task.args = { + "state": "gathered", + "config": [{"name": "Ansible Test"}], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False diff --git a/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_monitors.py b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_monitors.py new file mode 100644 index 000000000..068fe638d --- /dev/null +++ b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_monitors.py @@ -0,0 +1,357 @@ +# Copyright (c) 2022 Red Hat +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. +# + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +from ansible.module_utils.six import PY2 + +builtin_import = "builtins.__import__" +if PY2: + builtin_import = "__builtin__.__import__" + +import tempfile +from ansible.playbook.task import Task +from ansible.template import Templar +from ansible_collections.splunk.es.plugins.action.splunk_data_inputs_monitor import ( + ActionModule, +) +from ansible_collections.splunk.es.plugins.module_utils.splunk import ( + SplunkRequest, +) +from ansible_collections.ansible.utils.tests.unit.compat.mock import ( + MagicMock, + patch, +) + +RESPONSE_PAYLOAD = { + "entry": [ + { + "content": { + "_rcvbuf": 1572864, + "blacklist": "//var/log/[a-z]/gm", + "check-index": None, + "crcSalt": "<SOURCE>", + "disabled": False, + "eai:acl": None, + "filecount": 74, + "filestatecount": 82, + "followTail": False, + "host": "$decideOnStartup", + "host_regex": "/(test_host)/gm", + "host_resolved": "ip-172-31-52-131.us-west-2.compute.internal", + "host_segment": 3, + "ignoreOlderThan": "5d", + "index": "default", + "recursive": True, + "source": "test", + "sourcetype": "test_source_type", + "time_before_close": 4, + "whitelist": "//var/log/[0-9]/gm", + }, + "name": "/var/log", + } + ] +} + +REQUEST_PAYLOAD = [ + { + "blacklist": "//var/log/[a-z]/gm", + "crc_salt": "<SOURCE>", + "disabled": False, + "follow_tail": False, + "host": "$decideOnStartup", + "host_regex": "/(test_host)/gm", + "host_segment": 3, + "index": "default", + "name": "/var/log", + "recursive": True, + "sourcetype": "test_source_type", + "whitelist": "//var/log/[0-9]/gm", + }, + { + "blacklist": "//var/log/[a-z0-9]/gm", + "crc_salt": "<SOURCE>", + "disabled": False, + "follow_tail": False, + "host": "$decideOnStartup", + "index": "default", + "name": "/var/log", + "recursive": True, + }, +] + + +class TestSplunkEsDataInputsMonitorRules: + def setup(self): + task = MagicMock(Task) + # Ansible > 2.13 looks for check_mode in task + task.check_mode = False + play_context = MagicMock() + # Ansible <= 2.13 looks for check_mode in play_context + play_context.check_mode = False + connection = patch( + "ansible_collections.splunk.es.plugins.module_utils.splunk.Connection" + ) + connection._socket_path = tempfile.NamedTemporaryFile().name + fake_loader = {} + templar = Templar(loader=fake_loader) + self._plugin = ActionModule( + task=task, + connection=connection, + play_context=play_context, + loader=fake_loader, + templar=templar, + shared_loader_obj=None, + ) + self._plugin._task.action = "data_inputs_monitor" + self._plugin._task.async_val = False + self._task_vars = {} + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_monitor_merged(self, connection, monkeypatch): + self._plugin.api_response = RESPONSE_PAYLOAD + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = {} + + def create_update( + self, rest_path, data=None, mock=None, mock_data=None + ): + return RESPONSE_PAYLOAD + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD[0]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_monitor_merged_idempotent(self, conn, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + def create_update( + self, rest_path, data=None, mock=None, mock_data=None + ): + return RESPONSE_PAYLOAD + + def get_by_path(self, path): + return RESPONSE_PAYLOAD + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + self._plugin._task.args = { + "state": "merged", + "config": [ + { + "blacklist": "//var/log/[a-z]/gm", + "crc_salt": "<SOURCE>", + "disabled": False, + "follow_tail": False, + "host": "$decideOnStartup", + "host_regex": "/(test_host)/gm", + "host_segment": 3, + "index": "default", + "name": "/var/log", + "recursive": True, + "sourcetype": "test_source_type", + "whitelist": "//var/log/[0-9]/gm", + } + ], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_monitor_replaced(self, conn, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = RESPONSE_PAYLOAD + + def create_update( + self, rest_path, data=None, mock=None, mock_data=None + ): + return RESPONSE_PAYLOAD + + def get_by_path(self, path): + return RESPONSE_PAYLOAD + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + self._plugin._task.args = { + "state": "replaced", + "config": [ + { + "blacklist": "//var/log/[a-z0-9]/gm", + "crc_salt": "<SOURCE>", + "disabled": False, + "follow_tail": False, + "host": "$decideOnStartup", + "index": "default", + "name": "/var/log", + "recursive": True, + } + ], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_monitor_replaced_idempotent( + self, conn, monkeypatch + ): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + def create_update( + self, rest_path, data=None, mock=None, mock_data=None + ): + return RESPONSE_PAYLOAD + + def get_by_path(self, path): + return { + "entry": [ + { + "content": { + "_rcvbuf": 1572864, + "blacklist": "//var/log/[a-z]/gm", + "check-index": None, + "crcSalt": "<SOURCE>", + "disabled": False, + "eai:acl": None, + "filecount": 74, + "filestatecount": 82, + "followTail": False, + "host": "$decideOnStartup", + "host_regex": "/(test_host)/gm", + "host_resolved": "ip-172-31-52-131.us-west-2.compute.internal", + "host_segment": 3, + "ignoreOlderThan": "5d", + "index": "default", + "recursive": True, + "source": "test", + "sourcetype": "test_source_type", + "time_before_close": 4, + "whitelist": "//var/log/[0-9]/gm", + }, + "name": "/var/log", + } + ] + } + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + self._plugin._task.args = { + "state": "replaced", + "config": [ + { + "blacklist": "//var/log/[a-z]/gm", + "crc_salt": "<SOURCE>", + "disabled": False, + "follow_tail": False, + "host": "$decideOnStartup", + "host_regex": "/(test_host)/gm", + "host_segment": 3, + "index": "default", + "name": "/var/log", + "recursive": True, + "sourcetype": "test_source_type", + "whitelist": "//var/log/[0-9]/gm", + } + ], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_monitor_deleted(self, conn, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + def create_update( + self, rest_path, data=None, mock=None, mock_data=None + ): + return RESPONSE_PAYLOAD + + def get_by_path(self, path): + return RESPONSE_PAYLOAD + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + self._plugin._task.args = { + "state": "deleted", + "config": [{"name": "/var/log"}], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_monitor_deleted_idempotent(self, connection): + self._plugin.search_for_resource_name = MagicMock() + self._plugin.search_for_resource_name.return_value = {} + + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + self._plugin._task.args = { + "state": "deleted", + "config": [{"name": "/var/log"}], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_monitor_gathered(self, conn, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + def get_by_path(self, path): + return RESPONSE_PAYLOAD + + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + self._plugin._task.args = { + "state": "gathered", + "config": [{"name": "/var/log"}], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False diff --git a/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_network.py b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_network.py new file mode 100644 index 000000000..dbadf9052 --- /dev/null +++ b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_network.py @@ -0,0 +1,711 @@ +# Copyright (c) 2022 Red Hat +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. +# + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +from ansible.module_utils.six import PY2 + +builtin_import = "builtins.__import__" +if PY2: + builtin_import = "__builtin__.__import__" + +import tempfile +from ansible.playbook.task import Task +from ansible.template import Templar +from ansible_collections.splunk.es.plugins.action.splunk_data_inputs_network import ( + ActionModule, +) +from ansible_collections.splunk.es.plugins.module_utils.splunk import ( + SplunkRequest, +) +from ansible_collections.ansible.utils.tests.unit.compat.mock import ( + MagicMock, + patch, +) + +RESPONSE_PAYLOAD = { + "tcp_cooked": { + "entry": [ + { + "name": "default:8100", + "content": { + "connection_host": "ip", + "disabled": False, + "host": "$decideOnStartup", + "restrictToHost": "default", + }, + } + ], + }, + "tcp_raw": { + "entry": [ + { + "name": "default:8101", + "content": { + "connection_host": "ip", + "disabled": True, + "host": "$decideOnStartup", + "index": "default", + "queue": "parsingQueue", + "rawTcpDoneTimeout": 9, + "restrictToHost": "default", + "source": "test_source", + "sourcetype": "test_source_type", + }, + } + ], + }, + "udp": { + "entry": [ + { + "name": "default:7890", + "content": { + "connection_host": "ip", + "disabled": True, + "host": "$decideOnStartup", + "index": "default", + "no_appending_timestamp": True, + "no_priority_stripping": True, + "queue": "parsingQueue", + "restrictToHost": "default", + "source": "test_source", + "sourcetype": "test_source_type", + }, + } + ], + }, + "splunktcptoken": { + "entry": [ + { + "name": "splunktcptoken://test_token", + "content": { + "token": "01234567-0123-0123-0123-012345678901", + }, + } + ], + }, + "ssl": { + "entry": [ + { + "name": "test_host", + "content": {}, + } + ], + }, +} + +REQUEST_PAYLOAD = { + "tcp_cooked": { + "protocol": "tcp", + "datatype": "cooked", + "name": 8100, + "connection_host": "ip", + "disabled": False, + "host": "$decideOnStartup", + "restrict_to_host": "default", + }, + "tcp_raw": { + "protocol": "tcp", + "datatype": "raw", + "name": 8101, + "connection_host": "ip", + "disabled": True, + "host": "$decideOnStartup", + "index": "default", + "queue": "parsingQueue", + "raw_tcp_done_timeout": 9, + "restrict_to_host": "default", + "source": "test_source", + "sourcetype": "test_source_type", + }, + "udp": { + "protocol": "udp", + "name": 7890, + "connection_host": "ip", + "disabled": True, + "host": "$decideOnStartup", + "index": "default", + "no_appending_timestamp": True, + "no_priority_stripping": True, + "queue": "parsingQueue", + "restrict_to_host": "default", + "source": "test_source", + "sourcetype": "test_source_type", + }, + "splunktcptoken": { + "protocol": "tcp", + "datatype": "splunktcptoken", + "name": "test_token", + "token": "01234567-0123-0123-0123-012345678901", + }, + "ssl": { + "protocol": "tcp", + "datatype": "ssl", + "name": "test_host", + }, +} + +REPLACED_RESPONSE_PAYLOAD = { + "tcp_cooked": { + "entry": [ + { + "name": "default:8100", + "content": { + "connection_host": "ip", + "disabled": True, + "host": "$decideOnStartup", + "restrictToHost": "default", + }, + } + ], + }, + "tcp_raw": { + "entry": [ + { + "name": "default:8101", + "content": { + "connection_host": "ip", + "disabled": True, + "host": "$decideOnStartup", + "index": "default", + "queue": "parsingQueue", + "rawTcpDoneTimeout": 10, + "restrictToHost": "default", + "source": "test_source", + "sourcetype": "test_source_type", + }, + } + ], + }, + "udp": { + "entry": [ + { + "name": "default:7890", + "content": { + "connection_host": "ip", + "disabled": True, + "host": "$decideOnStartup", + "index": "default", + "no_appending_timestamp": False, + "no_priority_stripping": False, + "queue": "parsingQueue", + "restrictToHost": "default", + "source": "test_source", + "sourcetype": "test_source_type", + }, + } + ], + }, + "splunktcptoken": { + "entry": [ + { + "name": "splunktcptoken://test_token", + "content": { + "token": "01234567-0123-0123-0123-012345678900", + }, + } + ], + }, +} + +REPLACED_REQUEST_PAYLOAD = { + "tcp_cooked": { + "protocol": "tcp", + "datatype": "cooked", + "name": "default:8100", + "connection_host": "ip", + "disabled": True, + "host": "$decideOnStartup", + "restrict_to_host": "default", + }, + "tcp_raw": { + "protocol": "tcp", + "datatype": "raw", + "name": "default:8101", + "connection_host": "ip", + "disabled": True, + "host": "$decideOnStartup", + "index": "default", + "queue": "parsingQueue", + "raw_tcp_done_timeout": 10, + "restrict_to_host": "default", + "source": "test_source", + "sourcetype": "test_source_type", + }, + "udp": { + "protocol": "udp", + "name": "default:7890", + "connection_host": "ip", + "disabled": True, + "host": "$decideOnStartup", + "index": "default", + "no_appending_timestamp": False, + "no_priority_stripping": False, + "queue": "parsingQueue", + "restrict_to_host": "default", + "source": "test_source", + "sourcetype": "test_source_type", + }, + "splunktcptoken": { + "protocol": "tcp", + "datatype": "splunktcptoken", + "name": "splunktcptoken://test_token", + "token": "01234567-0123-0123-0123-012345678900", + }, +} + + +class TestSplunkEsDataInputsNetworksRules: + def setup(self): + task = MagicMock(Task) + # Ansible > 2.13 looks for check_mode in task + task.check_mode = False + play_context = MagicMock() + # Ansible <= 2.13 looks for check_mode in play_context + play_context.check_mode = False + connection = patch( + "ansible_collections.splunk.es.plugins.module_utils.splunk.Connection" + ) + connection._socket_path = tempfile.NamedTemporaryFile().name + fake_loader = {} + templar = Templar(loader=fake_loader) + self._plugin = ActionModule( + task=task, + connection=connection, + play_context=play_context, + loader=fake_loader, + templar=templar, + shared_loader_obj=None, + ) + self._plugin._task.action = "data_inputs_network" + self._plugin._task.async_val = False + self._task_vars = {} + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_network_merged(self, connection, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + # patch update operation + update_response = RESPONSE_PAYLOAD["tcp_cooked"] + + def get_by_path(self, path): + return {} + + def create_update( + self, rest_path, data=None, mock=None, mock_data=None + ): + return update_response + + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + + # tcp_cooked + update_response = RESPONSE_PAYLOAD["tcp_cooked"] + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD["tcp_cooked"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + # tcp_raw + update_response = RESPONSE_PAYLOAD["tcp_raw"] + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD["tcp_raw"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + # udp + update_response = RESPONSE_PAYLOAD["udp"] + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD["udp"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + # splunktcptoken + update_response = RESPONSE_PAYLOAD["splunktcptoken"] + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD["splunktcptoken"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + # ssl + update_response = RESPONSE_PAYLOAD["ssl"] + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD["ssl"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_network_merged_idempotent(self, conn, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + # patch get operation + get_response = RESPONSE_PAYLOAD["tcp_cooked"] + + def get_by_path(self, path): + return get_response + + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + # tcp_cooked + get_response = RESPONSE_PAYLOAD["tcp_cooked"] + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD["tcp_cooked"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # tcp_raw + get_response = RESPONSE_PAYLOAD["tcp_raw"] + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD["tcp_raw"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # udp + get_response = RESPONSE_PAYLOAD["udp"] + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD["udp"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # splunktcptoken + get_response = RESPONSE_PAYLOAD["splunktcptoken"] + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD["splunktcptoken"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # ssl + get_response = RESPONSE_PAYLOAD["ssl"] + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD["ssl"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_network_replaced(self, conn, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + # patch get operation + get_response = RESPONSE_PAYLOAD["tcp_cooked"] + # patch update operation + update_response = REPLACED_RESPONSE_PAYLOAD["tcp_cooked"] + + get_response = RESPONSE_PAYLOAD["tcp_cooked"] + + def delete_by_path( + self, rest_path, data=None, mock=None, mock_data=None + ): + return {} + + def create_update( + self, rest_path, data=None, mock=None, mock_data=None + ): + return update_response + + def get_by_path(self, path): + return get_response + + monkeypatch.setattr(SplunkRequest, "create_update", create_update) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path) + + # tcp_cooked + get_response = RESPONSE_PAYLOAD["tcp_cooked"] + update_response = REPLACED_RESPONSE_PAYLOAD["tcp_cooked"] + self._plugin._task.args = { + "state": "replaced", + "config": [REPLACED_REQUEST_PAYLOAD["tcp_cooked"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + # tcp_raw + get_response = RESPONSE_PAYLOAD["tcp_raw"] + update_response = REPLACED_RESPONSE_PAYLOAD["tcp_raw"] + self._plugin._task.args = { + "state": "replaced", + "config": [REPLACED_REQUEST_PAYLOAD["tcp_raw"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + # udp + get_response = RESPONSE_PAYLOAD["udp"] + update_response = REPLACED_RESPONSE_PAYLOAD["udp"] + self._plugin._task.args = { + "state": "replaced", + "config": [REPLACED_REQUEST_PAYLOAD["udp"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + # splunktcptoken + get_response = RESPONSE_PAYLOAD["splunktcptoken"] + update_response = REPLACED_RESPONSE_PAYLOAD["splunktcptoken"] + self._plugin._task.args = { + "state": "replaced", + "config": [REPLACED_REQUEST_PAYLOAD["splunktcptoken"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_network_replaced_idempotent( + self, conn, monkeypatch + ): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + # patch get operation + get_response = RESPONSE_PAYLOAD["tcp_cooked"] + + def get_by_path(self, path): + return get_response + + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + # tcp_cooked + get_response = REPLACED_RESPONSE_PAYLOAD["tcp_cooked"] + self._plugin._task.args = { + "state": "replaced", + "config": [REPLACED_REQUEST_PAYLOAD["tcp_cooked"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # tcp_raw + get_response = REPLACED_RESPONSE_PAYLOAD["tcp_raw"] + self._plugin._task.args = { + "state": "replaced", + "config": [REPLACED_REQUEST_PAYLOAD["tcp_raw"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # udp + get_response = REPLACED_RESPONSE_PAYLOAD["udp"] + self._plugin._task.args = { + "state": "replaced", + "config": [REPLACED_REQUEST_PAYLOAD["udp"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # splunktcptoken + get_response = REPLACED_RESPONSE_PAYLOAD["splunktcptoken"] + self._plugin._task.args = { + "state": "replaced", + "config": [REPLACED_REQUEST_PAYLOAD["splunktcptoken"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_network_deleted(self, conn, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + def delete_by_path( + self, rest_path, data=None, mock=None, mock_data=None + ): + return {} + + get_response = RESPONSE_PAYLOAD["tcp_cooked"] + + def get_by_path(self, path): + return get_response + + monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path) + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + # tcp_cooked + get_response = RESPONSE_PAYLOAD["tcp_cooked"] + self._plugin._task.args = { + "state": "deleted", + "config": [REQUEST_PAYLOAD["tcp_cooked"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + # tcp_raw + get_response = RESPONSE_PAYLOAD["tcp_raw"] + self._plugin._task.args = { + "state": "deleted", + "config": [REQUEST_PAYLOAD["tcp_raw"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + # udp + get_response = RESPONSE_PAYLOAD["udp"] + self._plugin._task.args = { + "state": "deleted", + "config": [REQUEST_PAYLOAD["udp"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + # splunktcptoken + get_response = RESPONSE_PAYLOAD["splunktcptoken"] + self._plugin._task.args = { + "state": "deleted", + "config": [REQUEST_PAYLOAD["splunktcptoken"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is True + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_network_deleted_idempotent( + self, conn, monkeypatch + ): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + def get_by_path(self, path): + return {} + + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + # tcp_cooked + self._plugin._task.args = { + "state": "deleted", + "config": [REQUEST_PAYLOAD["tcp_cooked"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # tcp_raw + self._plugin._task.args = { + "state": "deleted", + "config": [REQUEST_PAYLOAD["tcp_raw"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # udp + self._plugin._task.args = { + "state": "deleted", + "config": [REQUEST_PAYLOAD["udp"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # splunktcptoken + self._plugin._task.args = { + "state": "deleted", + "config": [REQUEST_PAYLOAD["splunktcptoken"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + @patch("ansible.module_utils.connection.Connection.__rpc__") + def test_es_data_inputs_network_gathered(self, conn, monkeypatch): + self._plugin._connection.socket_path = ( + tempfile.NamedTemporaryFile().name + ) + self._plugin._connection._shell = MagicMock() + + # patch get operation + get_response = RESPONSE_PAYLOAD["tcp_cooked"] + + def get_by_path(self, path): + return get_response + + monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path) + + # tcp_cooked + get_response = RESPONSE_PAYLOAD["tcp_cooked"] + self._plugin._task.args = { + "state": "gathered", + "config": [REQUEST_PAYLOAD["tcp_cooked"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # tcp_raw + get_response = RESPONSE_PAYLOAD["tcp_raw"] + self._plugin._task.args = { + "state": "gathered", + "config": [REQUEST_PAYLOAD["tcp_raw"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # udp + get_response = RESPONSE_PAYLOAD["udp"] + self._plugin._task.args = { + "state": "gathered", + "config": [REQUEST_PAYLOAD["udp"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # splunktcptoken + get_response = RESPONSE_PAYLOAD["splunktcptoken"] + self._plugin._task.args = { + "state": "gathered", + "config": [REQUEST_PAYLOAD["splunktcptoken"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False + + # ssl + get_response = RESPONSE_PAYLOAD["ssl"] + self._plugin._task.args = { + "state": "merged", + "config": [REQUEST_PAYLOAD["ssl"]], + } + result = self._plugin.run(task_vars=self._task_vars) + assert result["changed"] is False diff --git a/ansible_collections/splunk/es/tests/unit/plugins/modules/__init__.py b/ansible_collections/splunk/es/tests/unit/plugins/modules/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/splunk/es/tests/unit/plugins/modules/__init__.py diff --git a/ansible_collections/splunk/es/tests/unit/plugins/modules/conftest.py b/ansible_collections/splunk/es/tests/unit/plugins/modules/conftest.py new file mode 100644 index 000000000..e19a1e04c --- /dev/null +++ b/ansible_collections/splunk/es/tests/unit/plugins/modules/conftest.py @@ -0,0 +1,40 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import json + +import pytest + +from ansible.module_utils.six import string_types +from ansible.module_utils._text import to_bytes +from ansible.module_utils.common._collections_compat import MutableMapping + + +@pytest.fixture +def patch_ansible_module(request, mocker): + if isinstance(request.param, string_types): + args = request.param + elif isinstance(request.param, MutableMapping): + if "ANSIBLE_MODULE_ARGS" not in request.param: + request.param = {"ANSIBLE_MODULE_ARGS": request.param} + if "_ansible_remote_tmp" not in request.param["ANSIBLE_MODULE_ARGS"]: + request.param["ANSIBLE_MODULE_ARGS"][ + "_ansible_remote_tmp" + ] = "/tmp" + if ( + "_ansible_keep_remote_files" + not in request.param["ANSIBLE_MODULE_ARGS"] + ): + request.param["ANSIBLE_MODULE_ARGS"][ + "_ansible_keep_remote_files" + ] = False + args = json.dumps(request.param) + else: + raise Exception( + "Malformed data to the patch_ansible_module pytest fixture" + ) + + mocker.patch("ansible.module_utils.basic._ANSIBLE_ARGS", to_bytes(args)) diff --git a/ansible_collections/splunk/es/tests/unit/plugins/modules/utils.py b/ansible_collections/splunk/es/tests/unit/plugins/modules/utils.py new file mode 100644 index 000000000..d55afc0b3 --- /dev/null +++ b/ansible_collections/splunk/es/tests/unit/plugins/modules/utils.py @@ -0,0 +1,51 @@ +from __future__ import absolute_import, division, print_function + +__metaclass__ = type +import json + +from ansible_collections.trendmicro.deepsec.tests.unit.compat import unittest +from ansible_collections.trendmicro.deepsec.tests.unit.compat.mock import patch +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes + + +def set_module_args(args): + if "_ansible_remote_tmp" not in args: + args["_ansible_remote_tmp"] = "/tmp" + if "_ansible_keep_remote_files" not in args: + args["_ansible_keep_remote_files"] = False + + args = json.dumps({"ANSIBLE_MODULE_ARGS": args}) + basic._ANSIBLE_ARGS = to_bytes(args) + + +class AnsibleExitJson(Exception): + pass + + +class AnsibleFailJson(Exception): + pass + + +def exit_json(*args, **kwargs): + if "changed" not in kwargs: + kwargs["changed"] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): + kwargs["failed"] = True + raise AnsibleFailJson(kwargs) + + +class ModuleTestCase(unittest.TestCase): + def setUp(self): + self.mock_module = patch.multiple( + basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json + ) + self.mock_module.start() + self.mock_sleep = patch("time.sleep") + self.mock_sleep.start() + set_module_args({}) + self.addCleanup(self.mock_module.stop) + self.addCleanup(self.mock_sleep.stop) |