summaryrefslogtreecommitdiffstats
path: root/ansible_collections/splunk/es/tests/unit/plugins
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:04:41 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:04:41 +0000
commit975f66f2eebe9dadba04f275774d4ab83f74cf25 (patch)
tree89bd26a93aaae6a25749145b7e4bca4a1e75b2be /ansible_collections/splunk/es/tests/unit/plugins
parentInitial commit. (diff)
downloadansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.tar.xz
ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.zip
Adding upstream version 7.7.0+dfsg.upstream/7.7.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/splunk/es/tests/unit/plugins')
-rw-r--r--ansible_collections/splunk/es/tests/unit/plugins/action/__init__.py0
-rw-r--r--ansible_collections/splunk/es/tests/unit/plugins/action/test_es_adaptive_response_notable_events.py443
-rw-r--r--ansible_collections/splunk/es/tests/unit/plugins/action/test_es_correlation_searches.py373
-rw-r--r--ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_monitors.py357
-rw-r--r--ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_network.py711
-rw-r--r--ansible_collections/splunk/es/tests/unit/plugins/modules/__init__.py0
-rw-r--r--ansible_collections/splunk/es/tests/unit/plugins/modules/conftest.py40
-rw-r--r--ansible_collections/splunk/es/tests/unit/plugins/modules/utils.py51
8 files changed, 1975 insertions, 0 deletions
diff --git a/ansible_collections/splunk/es/tests/unit/plugins/action/__init__.py b/ansible_collections/splunk/es/tests/unit/plugins/action/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ansible_collections/splunk/es/tests/unit/plugins/action/__init__.py
diff --git a/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_adaptive_response_notable_events.py b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_adaptive_response_notable_events.py
new file mode 100644
index 000000000..b6a84fc78
--- /dev/null
+++ b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_adaptive_response_notable_events.py
@@ -0,0 +1,443 @@
+# Copyright (c) 2022 Red Hat
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from __future__ import absolute_import, division, print_function
+
+__metaclass__ = type
+
+from ansible.module_utils.six import PY2
+
+builtin_import = "builtins.__import__"
+if PY2:
+ builtin_import = "__builtin__.__import__"
+
+import tempfile
+from ansible.playbook.task import Task
+from ansible.template import Templar
+from ansible_collections.splunk.es.plugins.action.splunk_adaptive_response_notable_events import (
+ ActionModule,
+)
+from ansible_collections.splunk.es.plugins.module_utils.splunk import (
+ SplunkRequest,
+)
+from ansible_collections.ansible.utils.tests.unit.compat.mock import (
+ MagicMock,
+ patch,
+)
+
+RESPONSE_PAYLOAD = [
+ {
+ "entry": [
+ {
+ "content": {
+ "action.notable.param.default_owner": "",
+ "action.notable.param.default_status": "0",
+ "action.notable.param.drilldown_name": "test_drill_name",
+ "action.notable.param.drilldown_search": "test_drill",
+ "action.notable.param.drilldown_earliest_offset": "$info_min_time$",
+ "action.notable.param.drilldown_latest_offset": "$info_max_time$",
+ "action.notable.param.extract_artifacts": '{"asset": ["src", "dest", "dvc", "orig_host"],"identity": '
+ '["src_user", "user", "src_user_id", "src_user_role", "user_id", "user_role", "vendor_account"]}',
+ "action.notable.param.investigation_profiles": '{"profile://test profile 1":{}, "profile://test profile 2":{}, '
+ '"profile://test profile 3":{}}',
+ "action.notable.param.next_steps": '{"version": 1, "data": "[[action|makestreams]][[action|nbtstat]][[action|nslookup]]"}',
+ "action.notable.param.recommended_actions": "email,logevent,makestreams,nbtstat",
+ "action.notable.param.rule_description": "test notable event",
+ "action.notable.param.rule_title": "ansible_test_notable",
+ "action.notable.param.security_domain": "threat",
+ "action.notable.param.severity": "high",
+ "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent'
+ 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai'
+ 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio'
+ 'n.src" as "src" | where "count">=6',
+ "actions": "notable",
+ },
+ "name": "Ansible Test",
+ }
+ ]
+ },
+ {
+ "entry": [
+ {
+ "content": {
+ "action.notable.param.default_owner": "",
+ "action.notable.param.default_status": "",
+ "action.notable.param.drilldown_name": "test_drill_name",
+ "action.notable.param.drilldown_search": "test_drill",
+ "action.notable.param.drilldown_earliest_offset": "$info_min_time$",
+ "action.notable.param.drilldown_latest_offset": "$info_max_time$",
+ "action.notable.param.extract_artifacts": '{"asset": ["src", "dest"],"identity": ["src_user", "user", "src_user_id"]}',
+ "action.notable.param.investigation_profiles": '{"profile://test profile 1":{}, "profile://test profile 2":{}, '
+ '"profile://test profile 3":{}}',
+ "action.notable.param.next_steps": '{"version": 1, "data": "[[action|makestreams]]"}',
+ "action.notable.param.recommended_actions": "email,logevent",
+ "action.notable.param.rule_description": "test notable event",
+ "action.notable.param.rule_title": "ansible_test_notable",
+ "action.notable.param.security_domain": "threat",
+ "action.notable.param.severity": "high",
+ "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent'
+ 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai'
+ 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio'
+ 'n.src" as "src" | where "count">=6',
+ "actions": "notable",
+ },
+ "name": "Ansible Test",
+ }
+ ]
+ },
+]
+
+REQUEST_PAYLOAD = [
+ {
+ "correlation_search_name": "Ansible Test",
+ "default_status": "unassigned",
+ "description": "test notable event",
+ "drilldown_earliest_offset": "$info_min_time$",
+ "drilldown_latest_offset": "$info_max_time$",
+ "drilldown_name": "test_drill_name",
+ "drilldown_search": "test_drill",
+ "extract_artifacts": {
+ "asset": ["src", "dest", "dvc", "orig_host"],
+ "identity": [
+ "src_user",
+ "user",
+ "src_user_id",
+ "src_user_role",
+ "user_id",
+ "user_role",
+ "vendor_account",
+ ],
+ },
+ "investigation_profiles": [
+ "test profile 1",
+ "test profile 2",
+ "test profile 3",
+ ],
+ "next_steps": ["makestreams", "nbtstat", "nslookup"],
+ "name": "ansible_test_notable",
+ "recommended_actions": ["email", "logevent", "makestreams", "nbtstat"],
+ "security_domain": "threat",
+ "severity": "high",
+ },
+ {
+ "correlation_search_name": "Ansible Test",
+ "description": "test notable event",
+ "drilldown_earliest_offset": "$info_min_time$",
+ "drilldown_latest_offset": "$info_max_time$",
+ "extract_artifacts": {
+ "asset": ["src", "dest"],
+ "identity": ["src_user", "user", "src_user_id"],
+ },
+ "next_steps": ["makestreams"],
+ "name": "ansible_test_notable",
+ "recommended_actions": ["email", "logevent"],
+ "security_domain": "threat",
+ "severity": "high",
+ },
+]
+
+
+class TestSplunkEsAdaptiveResponseNotableEvents:
+ def setup(self):
+ task = MagicMock(Task)
+ # Ansible > 2.13 looks for check_mode in task
+ task.check_mode = False
+ play_context = MagicMock()
+ # Ansible <= 2.13 looks for check_mode in play_context
+ play_context.check_mode = False
+ connection = patch(
+ "ansible_collections.splunk.es.plugins.module_utils.splunk.Connection"
+ )
+ connection._socket_path = tempfile.NamedTemporaryFile().name
+ fake_loader = {}
+ templar = Templar(loader=fake_loader)
+ self._plugin = ActionModule(
+ task=task,
+ connection=connection,
+ play_context=play_context,
+ loader=fake_loader,
+ templar=templar,
+ shared_loader_obj=None,
+ )
+ self._plugin._task.action = "adaptive_response_notable_events"
+ self._plugin._task.async_val = False
+ self._task_vars = {}
+ self.metadata = {
+ "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent'
+ 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai'
+ 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio'
+ 'n.src" as "src" | where "count">=6',
+ "actions": "notable",
+ }
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_adaptive_response_notable_events_merged_01(
+ self, connection, monkeypatch
+ ):
+ metadata = {
+ "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent'
+ 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai'
+ 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio'
+ 'n.src" as "src" | where "count">=6',
+ "actions": "",
+ }
+ self._plugin.api_response = RESPONSE_PAYLOAD[0]
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = {}, metadata
+
+ def create_update(self, rest_path, data=None):
+ return RESPONSE_PAYLOAD[0]
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD[0]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_adaptive_response_notable_events_merged_02(
+ self, connection, monkeypatch
+ ):
+ self._plugin.api_response = RESPONSE_PAYLOAD[0]
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = (
+ RESPONSE_PAYLOAD[0],
+ self.metadata,
+ )
+
+ def create_update(self, rest_path, data=None):
+ return RESPONSE_PAYLOAD[1]
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD[1]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_adaptive_response_notable_events_merged_idempotent(
+ self, conn, monkeypatch
+ ):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ def create_update(self, rest_path, data=None):
+ return RESPONSE_PAYLOAD[0]
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD[0]
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD[0]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_adaptive_response_notable_events_replaced_01(
+ self, conn, monkeypatch
+ ):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = (
+ RESPONSE_PAYLOAD[0],
+ self.metadata,
+ )
+
+ def create_update(self, rest_path, data=None):
+ return RESPONSE_PAYLOAD[0]
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD[0]
+
+ def delete_by_path(self, path):
+ return {}
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+ monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path)
+
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REQUEST_PAYLOAD[1]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_adaptive_response_notable_events_replaced_02(
+ self, conn, monkeypatch
+ ):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = (
+ RESPONSE_PAYLOAD[0],
+ self.metadata,
+ )
+
+ def create_update(self, rest_path, data=None):
+ return RESPONSE_PAYLOAD[0]
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD[0]
+
+ def delete_by_path(self, path):
+ return {}
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+ monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path)
+
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REQUEST_PAYLOAD[1]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_adaptive_response_notable_events_replaced_idempotent(
+ self, conn, monkeypatch
+ ):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ def create_update(self, rest_path, data=None):
+ return RESPONSE_PAYLOAD[0]
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD[0]
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REQUEST_PAYLOAD[0]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_adaptive_response_notable_events_deleted(
+ self, conn, monkeypatch
+ ):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = (
+ RESPONSE_PAYLOAD[0],
+ self.metadata,
+ )
+
+ def create_update(self, rest_path, data=None):
+ return RESPONSE_PAYLOAD[0]
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [
+ {
+ "correlation_search_name": "Ansible Test",
+ }
+ ],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_adaptive_response_notable_events_deleted_idempotent(
+ self, connection
+ ):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = {}, {}
+
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [
+ {
+ "correlation_search_name": "Ansible Test",
+ }
+ ],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_adaptive_response_notable_events_gathered(
+ self, conn, monkeypatch
+ ):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = (
+ RESPONSE_PAYLOAD[0],
+ self.metadata,
+ )
+
+ self._plugin._task.args = {
+ "state": "gathered",
+ "config": [
+ {
+ "correlation_search_name": "Ansible Test",
+ }
+ ],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
diff --git a/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_correlation_searches.py b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_correlation_searches.py
new file mode 100644
index 000000000..fca268c98
--- /dev/null
+++ b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_correlation_searches.py
@@ -0,0 +1,373 @@
+# Copyright (c) 2022 Red Hat
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from __future__ import absolute_import, division, print_function
+
+__metaclass__ = type
+
+from ansible.module_utils.six import PY2
+
+builtin_import = "builtins.__import__"
+if PY2:
+ builtin_import = "__builtin__.__import__"
+
+import tempfile
+from ansible.playbook.task import Task
+from ansible.template import Templar
+from ansible_collections.splunk.es.plugins.action.splunk_correlation_searches import (
+ ActionModule,
+)
+from ansible_collections.splunk.es.plugins.module_utils.splunk import (
+ SplunkRequest,
+)
+from ansible_collections.ansible.utils.tests.unit.compat.mock import (
+ MagicMock,
+ patch,
+)
+
+RESPONSE_PAYLOAD = {
+ "entry": [
+ {
+ "acl": {"app": "DA-ESS-EndpointProtection"},
+ "content": {
+ "action.correlationsearch.annotations": '{"cis20": ["test1"], "mitre_attack": ["test2"], "kill_chain_phases": ["test3"], '
+ '"nist": ["test4"], "test_framework": ["test5"]}',
+ "action.correlationsearch.enabled": "1",
+ "action.correlationsearch.label": "Ansible Test",
+ "alert.digest_mode": True,
+ "alert.suppress": False,
+ "alert.suppress.fields": "test_field1",
+ "alert.suppress.period": "5s",
+ "alert_comparator": "greater than",
+ "alert_threshold": "10",
+ "alert_type": "number of events",
+ "cron_schedule": "*/5 * * * *",
+ "description": "test description",
+ "disabled": False,
+ "dispatch.earliest_time": "-24h",
+ "dispatch.latest_time": "now",
+ "dispatch.rt_backfill": True,
+ "is_scheduled": True,
+ "realtime_schedule": True,
+ "request.ui_dispatch_app": "SplunkEnterpriseSecuritySuite",
+ "schedule_priority": "default",
+ "schedule_window": "0",
+ "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent'
+ 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai'
+ 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio'
+ 'n.src" as "src" | where "count">=6',
+ },
+ "name": "Ansible Test",
+ }
+ ]
+}
+
+REQUEST_PAYLOAD = [
+ {
+ "name": "Ansible Test",
+ "disabled": False,
+ "description": "test description",
+ "app": "DA-ESS-EndpointProtection",
+ "annotations": {
+ "cis20": ["test1"],
+ "mitre_attack": ["test2"],
+ "kill_chain_phases": ["test3"],
+ "nist": ["test4"],
+ "custom": [
+ {
+ "framework": "test_framework",
+ "custom_annotations": ["test5"],
+ }
+ ],
+ },
+ "ui_dispatch_context": "SplunkEnterpriseSecuritySuite",
+ "time_earliest": "-24h",
+ "time_latest": "now",
+ "cron_schedule": "*/5 * * * *",
+ "scheduling": "realtime",
+ "schedule_window": "0",
+ "schedule_priority": "default",
+ "trigger_alert": "once",
+ "trigger_alert_when": "number of events",
+ "trigger_alert_when_condition": "greater than",
+ "trigger_alert_when_value": "10",
+ "throttle_window_duration": "5s",
+ "throttle_fields_to_group_by": ["test_field1"],
+ "suppress_alerts": False,
+ "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent'
+ 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai'
+ 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio'
+ 'n.src" as "src" | where "count">=6',
+ },
+ {
+ "name": "Ansible Test",
+ "disabled": False,
+ "description": "test description",
+ "app": "SplunkEnterpriseSecuritySuite",
+ "annotations": {
+ "cis20": ["test1", "test2"],
+ "mitre_attack": ["test3", "test4"],
+ "kill_chain_phases": ["test5", "test6"],
+ "nist": ["test7", "test8"],
+ "custom": [
+ {
+ "framework": "test_framework2",
+ "custom_annotations": ["test9", "test10"],
+ }
+ ],
+ },
+ "ui_dispatch_context": "SplunkEnterpriseSecuritySuite",
+ "time_earliest": "-24h",
+ "time_latest": "now",
+ "cron_schedule": "*/5 * * * *",
+ "scheduling": "continuous",
+ "schedule_window": "auto",
+ "schedule_priority": "default",
+ "trigger_alert": "once",
+ "trigger_alert_when": "number of events",
+ "trigger_alert_when_condition": "greater than",
+ "trigger_alert_when_value": "10",
+ "throttle_window_duration": "5s",
+ "throttle_fields_to_group_by": ["test_field1", "test_field2"],
+ "suppress_alerts": True,
+ "search": '| tstats summariesonly=true values("Authentication.tag") as "tag",dc("Authentication.user") as "user_count",dc("Authent'
+ 'ication.dest") as "dest_count",count from datamodel="Authentication"."Authentication" where nodename="Authentication.Fai'
+ 'led_Authentication" by "Authentication.app","Authentication.src" | rename "Authentication.app" as "app","Authenticatio'
+ 'n.src" as "src" | where "count">=6',
+ },
+]
+
+
+class TestSplunkEsCorrelationSearches:
+ def setup(self):
+ task = MagicMock(Task)
+ # Ansible > 2.13 looks for check_mode in task
+ task.check_mode = False
+ play_context = MagicMock()
+ # Ansible <= 2.13 looks for check_mode in play_context
+ play_context.check_mode = False
+ connection = patch(
+ "ansible_collections.splunk.es.plugins.module_utils.splunk.Connection"
+ )
+ connection._socket_path = tempfile.NamedTemporaryFile().name
+ fake_loader = {}
+ templar = Templar(loader=fake_loader)
+ self._plugin = ActionModule(
+ task=task,
+ connection=connection,
+ play_context=play_context,
+ loader=fake_loader,
+ templar=templar,
+ shared_loader_obj=None,
+ )
+ self._plugin._task.action = "correlation_searches"
+ self._plugin._task.async_val = False
+ self._task_vars = {}
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_correlation_searches_merged(self, connection, monkeypatch):
+ self._plugin.api_response = RESPONSE_PAYLOAD
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = {}
+
+ def create_update(self, rest_path, data=None):
+ return RESPONSE_PAYLOAD
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD[0]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_correlation_searches_merged_idempotent(
+ self, conn, monkeypatch
+ ):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ def create_update(self, rest_path, data=None):
+ return RESPONSE_PAYLOAD
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD[0]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_correlation_searches_replaced_01(self, conn, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = RESPONSE_PAYLOAD
+
+ def create_update(self, rest_path, data=None):
+ return RESPONSE_PAYLOAD
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD
+
+ def delete_by_path(self, path):
+ return {}
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+ monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path)
+
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REQUEST_PAYLOAD[1]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_correlation_searches_replaced_02(self, conn, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = RESPONSE_PAYLOAD
+
+ def create_update(self, rest_path, data=None):
+ return RESPONSE_PAYLOAD
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD
+
+ def delete_by_path(self, path):
+ return {}
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+ monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path)
+
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REQUEST_PAYLOAD[1]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_correlation_searches_replaced_idempotent(
+ self, conn, monkeypatch
+ ):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ def create_update(self, rest_path, data=None):
+ return RESPONSE_PAYLOAD
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD
+
+ def delete_by_path(self, path):
+ return {}
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+ monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path)
+
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REQUEST_PAYLOAD[0]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_correlation_searches_deleted(self, conn, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD
+
+ def delete_by_path(self, path):
+ return {}
+
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+ monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path)
+
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [{"name": "Ansible Test"}],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_correlation_searches_deleted_idempotent(self, connection):
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = {}
+
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [{"name": "Ansible Test"}],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_correlation_searches_gathered(self, conn, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD
+
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ self._plugin._task.args = {
+ "state": "gathered",
+ "config": [{"name": "Ansible Test"}],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
diff --git a/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_monitors.py b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_monitors.py
new file mode 100644
index 000000000..068fe638d
--- /dev/null
+++ b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_monitors.py
@@ -0,0 +1,357 @@
+# Copyright (c) 2022 Red Hat
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from __future__ import absolute_import, division, print_function
+
+__metaclass__ = type
+
+from ansible.module_utils.six import PY2
+
+builtin_import = "builtins.__import__"
+if PY2:
+ builtin_import = "__builtin__.__import__"
+
+import tempfile
+from ansible.playbook.task import Task
+from ansible.template import Templar
+from ansible_collections.splunk.es.plugins.action.splunk_data_inputs_monitor import (
+ ActionModule,
+)
+from ansible_collections.splunk.es.plugins.module_utils.splunk import (
+ SplunkRequest,
+)
+from ansible_collections.ansible.utils.tests.unit.compat.mock import (
+ MagicMock,
+ patch,
+)
+
+RESPONSE_PAYLOAD = {
+ "entry": [
+ {
+ "content": {
+ "_rcvbuf": 1572864,
+ "blacklist": "//var/log/[a-z]/gm",
+ "check-index": None,
+ "crcSalt": "<SOURCE>",
+ "disabled": False,
+ "eai:acl": None,
+ "filecount": 74,
+ "filestatecount": 82,
+ "followTail": False,
+ "host": "$decideOnStartup",
+ "host_regex": "/(test_host)/gm",
+ "host_resolved": "ip-172-31-52-131.us-west-2.compute.internal",
+ "host_segment": 3,
+ "ignoreOlderThan": "5d",
+ "index": "default",
+ "recursive": True,
+ "source": "test",
+ "sourcetype": "test_source_type",
+ "time_before_close": 4,
+ "whitelist": "//var/log/[0-9]/gm",
+ },
+ "name": "/var/log",
+ }
+ ]
+}
+
+REQUEST_PAYLOAD = [
+ {
+ "blacklist": "//var/log/[a-z]/gm",
+ "crc_salt": "<SOURCE>",
+ "disabled": False,
+ "follow_tail": False,
+ "host": "$decideOnStartup",
+ "host_regex": "/(test_host)/gm",
+ "host_segment": 3,
+ "index": "default",
+ "name": "/var/log",
+ "recursive": True,
+ "sourcetype": "test_source_type",
+ "whitelist": "//var/log/[0-9]/gm",
+ },
+ {
+ "blacklist": "//var/log/[a-z0-9]/gm",
+ "crc_salt": "<SOURCE>",
+ "disabled": False,
+ "follow_tail": False,
+ "host": "$decideOnStartup",
+ "index": "default",
+ "name": "/var/log",
+ "recursive": True,
+ },
+]
+
+
+class TestSplunkEsDataInputsMonitorRules:
+ def setup(self):
+ task = MagicMock(Task)
+ # Ansible > 2.13 looks for check_mode in task
+ task.check_mode = False
+ play_context = MagicMock()
+ # Ansible <= 2.13 looks for check_mode in play_context
+ play_context.check_mode = False
+ connection = patch(
+ "ansible_collections.splunk.es.plugins.module_utils.splunk.Connection"
+ )
+ connection._socket_path = tempfile.NamedTemporaryFile().name
+ fake_loader = {}
+ templar = Templar(loader=fake_loader)
+ self._plugin = ActionModule(
+ task=task,
+ connection=connection,
+ play_context=play_context,
+ loader=fake_loader,
+ templar=templar,
+ shared_loader_obj=None,
+ )
+ self._plugin._task.action = "data_inputs_monitor"
+ self._plugin._task.async_val = False
+ self._task_vars = {}
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_monitor_merged(self, connection, monkeypatch):
+ self._plugin.api_response = RESPONSE_PAYLOAD
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = {}
+
+ def create_update(
+ self, rest_path, data=None, mock=None, mock_data=None
+ ):
+ return RESPONSE_PAYLOAD
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD[0]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_monitor_merged_idempotent(self, conn, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ def create_update(
+ self, rest_path, data=None, mock=None, mock_data=None
+ ):
+ return RESPONSE_PAYLOAD
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [
+ {
+ "blacklist": "//var/log/[a-z]/gm",
+ "crc_salt": "<SOURCE>",
+ "disabled": False,
+ "follow_tail": False,
+ "host": "$decideOnStartup",
+ "host_regex": "/(test_host)/gm",
+ "host_segment": 3,
+ "index": "default",
+ "name": "/var/log",
+ "recursive": True,
+ "sourcetype": "test_source_type",
+ "whitelist": "//var/log/[0-9]/gm",
+ }
+ ],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_monitor_replaced(self, conn, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = RESPONSE_PAYLOAD
+
+ def create_update(
+ self, rest_path, data=None, mock=None, mock_data=None
+ ):
+ return RESPONSE_PAYLOAD
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [
+ {
+ "blacklist": "//var/log/[a-z0-9]/gm",
+ "crc_salt": "<SOURCE>",
+ "disabled": False,
+ "follow_tail": False,
+ "host": "$decideOnStartup",
+ "index": "default",
+ "name": "/var/log",
+ "recursive": True,
+ }
+ ],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_monitor_replaced_idempotent(
+ self, conn, monkeypatch
+ ):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ def create_update(
+ self, rest_path, data=None, mock=None, mock_data=None
+ ):
+ return RESPONSE_PAYLOAD
+
+ def get_by_path(self, path):
+ return {
+ "entry": [
+ {
+ "content": {
+ "_rcvbuf": 1572864,
+ "blacklist": "//var/log/[a-z]/gm",
+ "check-index": None,
+ "crcSalt": "<SOURCE>",
+ "disabled": False,
+ "eai:acl": None,
+ "filecount": 74,
+ "filestatecount": 82,
+ "followTail": False,
+ "host": "$decideOnStartup",
+ "host_regex": "/(test_host)/gm",
+ "host_resolved": "ip-172-31-52-131.us-west-2.compute.internal",
+ "host_segment": 3,
+ "ignoreOlderThan": "5d",
+ "index": "default",
+ "recursive": True,
+ "source": "test",
+ "sourcetype": "test_source_type",
+ "time_before_close": 4,
+ "whitelist": "//var/log/[0-9]/gm",
+ },
+ "name": "/var/log",
+ }
+ ]
+ }
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [
+ {
+ "blacklist": "//var/log/[a-z]/gm",
+ "crc_salt": "<SOURCE>",
+ "disabled": False,
+ "follow_tail": False,
+ "host": "$decideOnStartup",
+ "host_regex": "/(test_host)/gm",
+ "host_segment": 3,
+ "index": "default",
+ "name": "/var/log",
+ "recursive": True,
+ "sourcetype": "test_source_type",
+ "whitelist": "//var/log/[0-9]/gm",
+ }
+ ],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_monitor_deleted(self, conn, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ def create_update(
+ self, rest_path, data=None, mock=None, mock_data=None
+ ):
+ return RESPONSE_PAYLOAD
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [{"name": "/var/log"}],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_monitor_deleted_idempotent(self, connection):
+ self._plugin.search_for_resource_name = MagicMock()
+ self._plugin.search_for_resource_name.return_value = {}
+
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [{"name": "/var/log"}],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_monitor_gathered(self, conn, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ def get_by_path(self, path):
+ return RESPONSE_PAYLOAD
+
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ self._plugin._task.args = {
+ "state": "gathered",
+ "config": [{"name": "/var/log"}],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
diff --git a/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_network.py b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_network.py
new file mode 100644
index 000000000..dbadf9052
--- /dev/null
+++ b/ansible_collections/splunk/es/tests/unit/plugins/action/test_es_data_inputs_network.py
@@ -0,0 +1,711 @@
+# Copyright (c) 2022 Red Hat
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from __future__ import absolute_import, division, print_function
+
+__metaclass__ = type
+
+from ansible.module_utils.six import PY2
+
+builtin_import = "builtins.__import__"
+if PY2:
+ builtin_import = "__builtin__.__import__"
+
+import tempfile
+from ansible.playbook.task import Task
+from ansible.template import Templar
+from ansible_collections.splunk.es.plugins.action.splunk_data_inputs_network import (
+ ActionModule,
+)
+from ansible_collections.splunk.es.plugins.module_utils.splunk import (
+ SplunkRequest,
+)
+from ansible_collections.ansible.utils.tests.unit.compat.mock import (
+ MagicMock,
+ patch,
+)
+
+RESPONSE_PAYLOAD = {
+ "tcp_cooked": {
+ "entry": [
+ {
+ "name": "default:8100",
+ "content": {
+ "connection_host": "ip",
+ "disabled": False,
+ "host": "$decideOnStartup",
+ "restrictToHost": "default",
+ },
+ }
+ ],
+ },
+ "tcp_raw": {
+ "entry": [
+ {
+ "name": "default:8101",
+ "content": {
+ "connection_host": "ip",
+ "disabled": True,
+ "host": "$decideOnStartup",
+ "index": "default",
+ "queue": "parsingQueue",
+ "rawTcpDoneTimeout": 9,
+ "restrictToHost": "default",
+ "source": "test_source",
+ "sourcetype": "test_source_type",
+ },
+ }
+ ],
+ },
+ "udp": {
+ "entry": [
+ {
+ "name": "default:7890",
+ "content": {
+ "connection_host": "ip",
+ "disabled": True,
+ "host": "$decideOnStartup",
+ "index": "default",
+ "no_appending_timestamp": True,
+ "no_priority_stripping": True,
+ "queue": "parsingQueue",
+ "restrictToHost": "default",
+ "source": "test_source",
+ "sourcetype": "test_source_type",
+ },
+ }
+ ],
+ },
+ "splunktcptoken": {
+ "entry": [
+ {
+ "name": "splunktcptoken://test_token",
+ "content": {
+ "token": "01234567-0123-0123-0123-012345678901",
+ },
+ }
+ ],
+ },
+ "ssl": {
+ "entry": [
+ {
+ "name": "test_host",
+ "content": {},
+ }
+ ],
+ },
+}
+
+REQUEST_PAYLOAD = {
+ "tcp_cooked": {
+ "protocol": "tcp",
+ "datatype": "cooked",
+ "name": 8100,
+ "connection_host": "ip",
+ "disabled": False,
+ "host": "$decideOnStartup",
+ "restrict_to_host": "default",
+ },
+ "tcp_raw": {
+ "protocol": "tcp",
+ "datatype": "raw",
+ "name": 8101,
+ "connection_host": "ip",
+ "disabled": True,
+ "host": "$decideOnStartup",
+ "index": "default",
+ "queue": "parsingQueue",
+ "raw_tcp_done_timeout": 9,
+ "restrict_to_host": "default",
+ "source": "test_source",
+ "sourcetype": "test_source_type",
+ },
+ "udp": {
+ "protocol": "udp",
+ "name": 7890,
+ "connection_host": "ip",
+ "disabled": True,
+ "host": "$decideOnStartup",
+ "index": "default",
+ "no_appending_timestamp": True,
+ "no_priority_stripping": True,
+ "queue": "parsingQueue",
+ "restrict_to_host": "default",
+ "source": "test_source",
+ "sourcetype": "test_source_type",
+ },
+ "splunktcptoken": {
+ "protocol": "tcp",
+ "datatype": "splunktcptoken",
+ "name": "test_token",
+ "token": "01234567-0123-0123-0123-012345678901",
+ },
+ "ssl": {
+ "protocol": "tcp",
+ "datatype": "ssl",
+ "name": "test_host",
+ },
+}
+
+REPLACED_RESPONSE_PAYLOAD = {
+ "tcp_cooked": {
+ "entry": [
+ {
+ "name": "default:8100",
+ "content": {
+ "connection_host": "ip",
+ "disabled": True,
+ "host": "$decideOnStartup",
+ "restrictToHost": "default",
+ },
+ }
+ ],
+ },
+ "tcp_raw": {
+ "entry": [
+ {
+ "name": "default:8101",
+ "content": {
+ "connection_host": "ip",
+ "disabled": True,
+ "host": "$decideOnStartup",
+ "index": "default",
+ "queue": "parsingQueue",
+ "rawTcpDoneTimeout": 10,
+ "restrictToHost": "default",
+ "source": "test_source",
+ "sourcetype": "test_source_type",
+ },
+ }
+ ],
+ },
+ "udp": {
+ "entry": [
+ {
+ "name": "default:7890",
+ "content": {
+ "connection_host": "ip",
+ "disabled": True,
+ "host": "$decideOnStartup",
+ "index": "default",
+ "no_appending_timestamp": False,
+ "no_priority_stripping": False,
+ "queue": "parsingQueue",
+ "restrictToHost": "default",
+ "source": "test_source",
+ "sourcetype": "test_source_type",
+ },
+ }
+ ],
+ },
+ "splunktcptoken": {
+ "entry": [
+ {
+ "name": "splunktcptoken://test_token",
+ "content": {
+ "token": "01234567-0123-0123-0123-012345678900",
+ },
+ }
+ ],
+ },
+}
+
+REPLACED_REQUEST_PAYLOAD = {
+ "tcp_cooked": {
+ "protocol": "tcp",
+ "datatype": "cooked",
+ "name": "default:8100",
+ "connection_host": "ip",
+ "disabled": True,
+ "host": "$decideOnStartup",
+ "restrict_to_host": "default",
+ },
+ "tcp_raw": {
+ "protocol": "tcp",
+ "datatype": "raw",
+ "name": "default:8101",
+ "connection_host": "ip",
+ "disabled": True,
+ "host": "$decideOnStartup",
+ "index": "default",
+ "queue": "parsingQueue",
+ "raw_tcp_done_timeout": 10,
+ "restrict_to_host": "default",
+ "source": "test_source",
+ "sourcetype": "test_source_type",
+ },
+ "udp": {
+ "protocol": "udp",
+ "name": "default:7890",
+ "connection_host": "ip",
+ "disabled": True,
+ "host": "$decideOnStartup",
+ "index": "default",
+ "no_appending_timestamp": False,
+ "no_priority_stripping": False,
+ "queue": "parsingQueue",
+ "restrict_to_host": "default",
+ "source": "test_source",
+ "sourcetype": "test_source_type",
+ },
+ "splunktcptoken": {
+ "protocol": "tcp",
+ "datatype": "splunktcptoken",
+ "name": "splunktcptoken://test_token",
+ "token": "01234567-0123-0123-0123-012345678900",
+ },
+}
+
+
+class TestSplunkEsDataInputsNetworksRules:
+ def setup(self):
+ task = MagicMock(Task)
+ # Ansible > 2.13 looks for check_mode in task
+ task.check_mode = False
+ play_context = MagicMock()
+ # Ansible <= 2.13 looks for check_mode in play_context
+ play_context.check_mode = False
+ connection = patch(
+ "ansible_collections.splunk.es.plugins.module_utils.splunk.Connection"
+ )
+ connection._socket_path = tempfile.NamedTemporaryFile().name
+ fake_loader = {}
+ templar = Templar(loader=fake_loader)
+ self._plugin = ActionModule(
+ task=task,
+ connection=connection,
+ play_context=play_context,
+ loader=fake_loader,
+ templar=templar,
+ shared_loader_obj=None,
+ )
+ self._plugin._task.action = "data_inputs_network"
+ self._plugin._task.async_val = False
+ self._task_vars = {}
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_network_merged(self, connection, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ # patch update operation
+ update_response = RESPONSE_PAYLOAD["tcp_cooked"]
+
+ def get_by_path(self, path):
+ return {}
+
+ def create_update(
+ self, rest_path, data=None, mock=None, mock_data=None
+ ):
+ return update_response
+
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+
+ # tcp_cooked
+ update_response = RESPONSE_PAYLOAD["tcp_cooked"]
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD["tcp_cooked"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ # tcp_raw
+ update_response = RESPONSE_PAYLOAD["tcp_raw"]
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD["tcp_raw"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ # udp
+ update_response = RESPONSE_PAYLOAD["udp"]
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD["udp"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ # splunktcptoken
+ update_response = RESPONSE_PAYLOAD["splunktcptoken"]
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD["splunktcptoken"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ # ssl
+ update_response = RESPONSE_PAYLOAD["ssl"]
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD["ssl"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_network_merged_idempotent(self, conn, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ # patch get operation
+ get_response = RESPONSE_PAYLOAD["tcp_cooked"]
+
+ def get_by_path(self, path):
+ return get_response
+
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ # tcp_cooked
+ get_response = RESPONSE_PAYLOAD["tcp_cooked"]
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD["tcp_cooked"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # tcp_raw
+ get_response = RESPONSE_PAYLOAD["tcp_raw"]
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD["tcp_raw"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # udp
+ get_response = RESPONSE_PAYLOAD["udp"]
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD["udp"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # splunktcptoken
+ get_response = RESPONSE_PAYLOAD["splunktcptoken"]
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD["splunktcptoken"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # ssl
+ get_response = RESPONSE_PAYLOAD["ssl"]
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD["ssl"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_network_replaced(self, conn, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ # patch get operation
+ get_response = RESPONSE_PAYLOAD["tcp_cooked"]
+ # patch update operation
+ update_response = REPLACED_RESPONSE_PAYLOAD["tcp_cooked"]
+
+ get_response = RESPONSE_PAYLOAD["tcp_cooked"]
+
+ def delete_by_path(
+ self, rest_path, data=None, mock=None, mock_data=None
+ ):
+ return {}
+
+ def create_update(
+ self, rest_path, data=None, mock=None, mock_data=None
+ ):
+ return update_response
+
+ def get_by_path(self, path):
+ return get_response
+
+ monkeypatch.setattr(SplunkRequest, "create_update", create_update)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+ monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path)
+
+ # tcp_cooked
+ get_response = RESPONSE_PAYLOAD["tcp_cooked"]
+ update_response = REPLACED_RESPONSE_PAYLOAD["tcp_cooked"]
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REPLACED_REQUEST_PAYLOAD["tcp_cooked"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ # tcp_raw
+ get_response = RESPONSE_PAYLOAD["tcp_raw"]
+ update_response = REPLACED_RESPONSE_PAYLOAD["tcp_raw"]
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REPLACED_REQUEST_PAYLOAD["tcp_raw"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ # udp
+ get_response = RESPONSE_PAYLOAD["udp"]
+ update_response = REPLACED_RESPONSE_PAYLOAD["udp"]
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REPLACED_REQUEST_PAYLOAD["udp"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ # splunktcptoken
+ get_response = RESPONSE_PAYLOAD["splunktcptoken"]
+ update_response = REPLACED_RESPONSE_PAYLOAD["splunktcptoken"]
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REPLACED_REQUEST_PAYLOAD["splunktcptoken"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_network_replaced_idempotent(
+ self, conn, monkeypatch
+ ):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ # patch get operation
+ get_response = RESPONSE_PAYLOAD["tcp_cooked"]
+
+ def get_by_path(self, path):
+ return get_response
+
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ # tcp_cooked
+ get_response = REPLACED_RESPONSE_PAYLOAD["tcp_cooked"]
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REPLACED_REQUEST_PAYLOAD["tcp_cooked"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # tcp_raw
+ get_response = REPLACED_RESPONSE_PAYLOAD["tcp_raw"]
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REPLACED_REQUEST_PAYLOAD["tcp_raw"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # udp
+ get_response = REPLACED_RESPONSE_PAYLOAD["udp"]
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REPLACED_REQUEST_PAYLOAD["udp"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # splunktcptoken
+ get_response = REPLACED_RESPONSE_PAYLOAD["splunktcptoken"]
+ self._plugin._task.args = {
+ "state": "replaced",
+ "config": [REPLACED_REQUEST_PAYLOAD["splunktcptoken"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_network_deleted(self, conn, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ def delete_by_path(
+ self, rest_path, data=None, mock=None, mock_data=None
+ ):
+ return {}
+
+ get_response = RESPONSE_PAYLOAD["tcp_cooked"]
+
+ def get_by_path(self, path):
+ return get_response
+
+ monkeypatch.setattr(SplunkRequest, "delete_by_path", delete_by_path)
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ # tcp_cooked
+ get_response = RESPONSE_PAYLOAD["tcp_cooked"]
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [REQUEST_PAYLOAD["tcp_cooked"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ # tcp_raw
+ get_response = RESPONSE_PAYLOAD["tcp_raw"]
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [REQUEST_PAYLOAD["tcp_raw"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ # udp
+ get_response = RESPONSE_PAYLOAD["udp"]
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [REQUEST_PAYLOAD["udp"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ # splunktcptoken
+ get_response = RESPONSE_PAYLOAD["splunktcptoken"]
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [REQUEST_PAYLOAD["splunktcptoken"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is True
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_network_deleted_idempotent(
+ self, conn, monkeypatch
+ ):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ def get_by_path(self, path):
+ return {}
+
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ # tcp_cooked
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [REQUEST_PAYLOAD["tcp_cooked"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # tcp_raw
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [REQUEST_PAYLOAD["tcp_raw"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # udp
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [REQUEST_PAYLOAD["udp"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # splunktcptoken
+ self._plugin._task.args = {
+ "state": "deleted",
+ "config": [REQUEST_PAYLOAD["splunktcptoken"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ @patch("ansible.module_utils.connection.Connection.__rpc__")
+ def test_es_data_inputs_network_gathered(self, conn, monkeypatch):
+ self._plugin._connection.socket_path = (
+ tempfile.NamedTemporaryFile().name
+ )
+ self._plugin._connection._shell = MagicMock()
+
+ # patch get operation
+ get_response = RESPONSE_PAYLOAD["tcp_cooked"]
+
+ def get_by_path(self, path):
+ return get_response
+
+ monkeypatch.setattr(SplunkRequest, "get_by_path", get_by_path)
+
+ # tcp_cooked
+ get_response = RESPONSE_PAYLOAD["tcp_cooked"]
+ self._plugin._task.args = {
+ "state": "gathered",
+ "config": [REQUEST_PAYLOAD["tcp_cooked"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # tcp_raw
+ get_response = RESPONSE_PAYLOAD["tcp_raw"]
+ self._plugin._task.args = {
+ "state": "gathered",
+ "config": [REQUEST_PAYLOAD["tcp_raw"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # udp
+ get_response = RESPONSE_PAYLOAD["udp"]
+ self._plugin._task.args = {
+ "state": "gathered",
+ "config": [REQUEST_PAYLOAD["udp"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # splunktcptoken
+ get_response = RESPONSE_PAYLOAD["splunktcptoken"]
+ self._plugin._task.args = {
+ "state": "gathered",
+ "config": [REQUEST_PAYLOAD["splunktcptoken"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
+
+ # ssl
+ get_response = RESPONSE_PAYLOAD["ssl"]
+ self._plugin._task.args = {
+ "state": "merged",
+ "config": [REQUEST_PAYLOAD["ssl"]],
+ }
+ result = self._plugin.run(task_vars=self._task_vars)
+ assert result["changed"] is False
diff --git a/ansible_collections/splunk/es/tests/unit/plugins/modules/__init__.py b/ansible_collections/splunk/es/tests/unit/plugins/modules/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ansible_collections/splunk/es/tests/unit/plugins/modules/__init__.py
diff --git a/ansible_collections/splunk/es/tests/unit/plugins/modules/conftest.py b/ansible_collections/splunk/es/tests/unit/plugins/modules/conftest.py
new file mode 100644
index 000000000..e19a1e04c
--- /dev/null
+++ b/ansible_collections/splunk/es/tests/unit/plugins/modules/conftest.py
@@ -0,0 +1,40 @@
+# Copyright (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import absolute_import, division, print_function
+
+__metaclass__ = type
+
+import json
+
+import pytest
+
+from ansible.module_utils.six import string_types
+from ansible.module_utils._text import to_bytes
+from ansible.module_utils.common._collections_compat import MutableMapping
+
+
+@pytest.fixture
+def patch_ansible_module(request, mocker):
+ if isinstance(request.param, string_types):
+ args = request.param
+ elif isinstance(request.param, MutableMapping):
+ if "ANSIBLE_MODULE_ARGS" not in request.param:
+ request.param = {"ANSIBLE_MODULE_ARGS": request.param}
+ if "_ansible_remote_tmp" not in request.param["ANSIBLE_MODULE_ARGS"]:
+ request.param["ANSIBLE_MODULE_ARGS"][
+ "_ansible_remote_tmp"
+ ] = "/tmp"
+ if (
+ "_ansible_keep_remote_files"
+ not in request.param["ANSIBLE_MODULE_ARGS"]
+ ):
+ request.param["ANSIBLE_MODULE_ARGS"][
+ "_ansible_keep_remote_files"
+ ] = False
+ args = json.dumps(request.param)
+ else:
+ raise Exception(
+ "Malformed data to the patch_ansible_module pytest fixture"
+ )
+
+ mocker.patch("ansible.module_utils.basic._ANSIBLE_ARGS", to_bytes(args))
diff --git a/ansible_collections/splunk/es/tests/unit/plugins/modules/utils.py b/ansible_collections/splunk/es/tests/unit/plugins/modules/utils.py
new file mode 100644
index 000000000..d55afc0b3
--- /dev/null
+++ b/ansible_collections/splunk/es/tests/unit/plugins/modules/utils.py
@@ -0,0 +1,51 @@
+from __future__ import absolute_import, division, print_function
+
+__metaclass__ = type
+import json
+
+from ansible_collections.trendmicro.deepsec.tests.unit.compat import unittest
+from ansible_collections.trendmicro.deepsec.tests.unit.compat.mock import patch
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+
+
+def set_module_args(args):
+ if "_ansible_remote_tmp" not in args:
+ args["_ansible_remote_tmp"] = "/tmp"
+ if "_ansible_keep_remote_files" not in args:
+ args["_ansible_keep_remote_files"] = False
+
+ args = json.dumps({"ANSIBLE_MODULE_ARGS": args})
+ basic._ANSIBLE_ARGS = to_bytes(args)
+
+
+class AnsibleExitJson(Exception):
+ pass
+
+
+class AnsibleFailJson(Exception):
+ pass
+
+
+def exit_json(*args, **kwargs):
+ if "changed" not in kwargs:
+ kwargs["changed"] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs):
+ kwargs["failed"] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class ModuleTestCase(unittest.TestCase):
+ def setUp(self):
+ self.mock_module = patch.multiple(
+ basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json
+ )
+ self.mock_module.start()
+ self.mock_sleep = patch("time.sleep")
+ self.mock_sleep.start()
+ set_module_args({})
+ self.addCleanup(self.mock_module.stop)
+ self.addCleanup(self.mock_sleep.stop)