summaryrefslogtreecommitdiffstats
path: root/ansible_collections/ibm/qradar/plugins/module_utils
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:04:41 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:04:41 +0000
commit975f66f2eebe9dadba04f275774d4ab83f74cf25 (patch)
tree89bd26a93aaae6a25749145b7e4bca4a1e75b2be /ansible_collections/ibm/qradar/plugins/module_utils
parentInitial commit. (diff)
downloadansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.tar.xz
ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.zip
Adding upstream version 7.7.0+dfsg.upstream/7.7.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/ibm/qradar/plugins/module_utils')
-rw-r--r--ansible_collections/ibm/qradar/plugins/module_utils/qradar.py254
1 files changed, 254 insertions, 0 deletions
diff --git a/ansible_collections/ibm/qradar/plugins/module_utils/qradar.py b/ansible_collections/ibm/qradar/plugins/module_utils/qradar.py
new file mode 100644
index 000000000..e1569863f
--- /dev/null
+++ b/ansible_collections/ibm/qradar/plugins/module_utils/qradar.py
@@ -0,0 +1,254 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# (c) 2019, Adam Miller (admiller@redhat.com)
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+
+__metaclass__ = type
+from ansible.module_utils.urls import CertificateError
+from ansible.module_utils.six.moves.urllib.parse import quote_plus
+from ansible.module_utils.connection import ConnectionError
+from ansible.module_utils.connection import Connection
+from ansible.module_utils._text import to_text
+from ansible.module_utils.six import iteritems
+from copy import copy
+import json
+
+
+BASE_HEADERS = {"Content-Type": "application/json", "Version": "9.1"}
+
+
+def find_dict_in_list(some_list, key, value):
+ text_type = False
+ try:
+ to_text(value)
+ text_type = True
+ except TypeError:
+ pass
+ for some_dict in some_list:
+ if key in some_dict:
+ if text_type:
+ if to_text(some_dict[key]).strip() == to_text(value).strip():
+ return some_dict, some_list.index(some_dict)
+ else:
+ if some_dict[key] == value:
+ return some_dict, some_list.index(some_dict)
+ return None
+
+
+def set_offense_values(module, qradar_request):
+ if module.params["closing_reason"]:
+ found_closing_reason = qradar_request.get_by_path(
+ "api/siem/offense_closing_reasons?filter={0}".format(
+ quote_plus(
+ 'text="{0}"'.format(module.params["closing_reason"])
+ )
+ )
+ )
+ if found_closing_reason:
+ module.params["closing_reason_id"] = found_closing_reason[0]["id"]
+ else:
+ module.fail_json(
+ "Unable to find closing_reason text: {0}".format(
+ module.params["closing_reason"]
+ )
+ )
+
+ if module.params["status"]:
+ module.params["status"] = module.params["status"].upper()
+
+
+def remove_unsupported_keys_from_payload_dict(payload, supported_key_list):
+ """The fn to remove the unsupported keys from payload
+ :param payload: Payload from response
+ :param supported_key_list: Module supported params
+ :rtype: A dict
+ :returns: payload with only supported and module expected params
+ """
+ temp_payload = []
+ for each in payload:
+ temp_dict = copy(each)
+ if isinstance(each, dict):
+ for every_key in each.keys():
+ if every_key not in supported_key_list:
+ temp_dict.pop(every_key)
+ temp_payload.append(temp_dict)
+ if temp_payload:
+ return temp_payload
+ return payload
+
+
+def list_to_dict(input_dict):
+ """The fn to convert list of dict to dict
+ :param input_dict: input list having list of dict
+ :rtype: A dict
+ :returns: dict with converted all list to dict type
+ """
+ if isinstance(input_dict, dict):
+ for k, v in iteritems(input_dict):
+ if isinstance(v, dict):
+ list_to_dict(v)
+ elif isinstance(v, list):
+ temp_dict = {}
+ for each in v:
+ if isinstance(each, dict):
+ if each.get("id") or each.get("id") == 0:
+ each.pop("id")
+ each_key_values = "_".join(
+ [str(x) for x in each.values()]
+ )
+ temp_dict.update({each_key_values: each})
+ input_dict[k] = temp_dict
+
+
+class QRadarRequest(object):
+ def __init__(
+ self,
+ module=None,
+ connection=None,
+ headers=None,
+ not_rest_data_keys=None,
+ task_vars=None,
+ ):
+ self.module = module
+ if module:
+ # This will be removed, once all of the available modules
+ # are moved to use action plugin design, as otherwise test
+ # would start to complain without the implementation.
+ self.connection = Connection(self.module._socket_path)
+ elif connection:
+ self.connection = connection
+ try:
+ self.connection.load_platform_plugins("ibm.qradar.qradar")
+ self.connection.set_options(var_options=task_vars)
+ except ConnectionError:
+ raise
+ # This allows us to exclude specific argspec keys from being included by
+ # the rest data that don't follow the qradar_* naming convention
+ if not_rest_data_keys:
+ self.not_rest_data_keys = not_rest_data_keys
+ else:
+ self.not_rest_data_keys = []
+ self.not_rest_data_keys.append("validate_certs")
+ self.headers = headers if headers else BASE_HEADERS
+
+ def _httpapi_error_handle(self, method, uri, payload=None):
+ # FIXME - make use of handle_httperror(self, exception) where applicable
+ # https://docs.ansible.com/ansible/latest/network/dev_guide/developing_plugins_network.html#developing-plugins-httpapi
+ code = 99999
+ response = {}
+ try:
+ code, response = self.connection.send_request(
+ method, uri, payload=payload, headers=self.headers
+ )
+ except ConnectionError as e:
+ self.module.fail_json(
+ msg="connection error occurred: {0}".format(e)
+ )
+ except CertificateError as e:
+ self.module.fail_json(
+ msg="certificate error occurred: {0}".format(e)
+ )
+ except ValueError as e:
+ try:
+ self.module.fail_json(
+ msg="certificate not found: {0}".format(e)
+ )
+ except AttributeError:
+ pass
+
+ if code == 404:
+ if (
+ to_text("Object not found") in to_text(response)
+ or to_text("Could not find object") in to_text(response)
+ or to_text("No offense was found") in to_text(response)
+ ):
+ return {}
+ if to_text("The rule does not exist.") in to_text(
+ response["description"]
+ ):
+ return code, {}
+
+ if code == 409:
+ if "code" in response:
+ if response["code"] in [1002, 1004]:
+ # https://www.ibm.com/support/knowledgecenter/SS42VS_7.3.1/com.ibm.qradar.doc/9.2--staged_config-deploy_status-POST.html
+ # Documentation says we should get 1002, but I'm getting 1004 from QRadar
+ return response
+ else:
+ self.module.fail_json(
+ msg="qradar httpapi returned error {0} with message {1}".format(
+ code, response
+ )
+ )
+ elif not (code >= 200 and code < 300):
+ try:
+ self.module.fail_json(
+ msg="qradar httpapi returned error {0} with message {1}".format(
+ code, response
+ )
+ )
+ except AttributeError:
+ pass
+
+ return code, response
+
+ def get(self, url, **kwargs):
+ return self._httpapi_error_handle("GET", url, **kwargs)
+
+ def put(self, url, **kwargs):
+ return self._httpapi_error_handle("PUT", url, **kwargs)
+
+ def post(self, url, **kwargs):
+ return self._httpapi_error_handle("POST", url, **kwargs)
+
+ def patch(self, url, **kwargs):
+ return self._httpapi_error_handle("PATCH", url, **kwargs)
+
+ def delete(self, url, **kwargs):
+ return self._httpapi_error_handle("DELETE", url, **kwargs)
+
+ def get_data(self):
+ """
+ Get the valid fields that should be passed to the REST API as urlencoded
+ data so long as the argument specification to the module follows the
+ convention:
+ - the key to the argspec item does not start with qradar_
+ - the key does not exist in the not_data_keys list
+ """
+ try:
+ qradar_data = {}
+ for param in self.module.params:
+ if (self.module.params[param]) is not None and (
+ param not in self.not_rest_data_keys
+ ):
+ qradar_data[param] = self.module.params[param]
+ return qradar_data
+
+ except TypeError as e:
+ self.module.fail_json(
+ msg="invalid data type provided: {0}".format(e)
+ )
+
+ def post_by_path(self, rest_path, data=None):
+ """
+ POST with data to path
+ """
+ if data is None:
+ data = json.dumps(self.get_data())
+ elif data is False:
+ # Because for some reason some QRadar REST API endpoint use the
+ # query string to modify state
+ return self.post("/{0}".format(rest_path))
+ return self.post("/{0}".format(rest_path), payload=data)
+
+ def create_update(self, rest_path, data=None):
+ """
+ Create or Update a file/directory monitor data input in qradar
+ """
+ if data is None:
+ data = json.dumps(self.get_data())
+ # return self.post("/{0}".format(rest_path), payload=data)
+ return self.patch("/{0}".format(rest_path), payload=data) # PATCH