diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:04:41 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:04:41 +0000 |
commit | 975f66f2eebe9dadba04f275774d4ab83f74cf25 (patch) | |
tree | 89bd26a93aaae6a25749145b7e4bca4a1e75b2be /ansible_collections/community/fortios/plugins/httpapi | |
parent | Initial commit. (diff) | |
download | ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.tar.xz ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.zip |
Adding upstream version 7.7.0+dfsg.upstream/7.7.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/community/fortios/plugins/httpapi')
-rw-r--r-- | ansible_collections/community/fortios/plugins/httpapi/fortianalyzer.py | 453 | ||||
-rw-r--r-- | ansible_collections/community/fortios/plugins/httpapi/fortimanager.py | 459 |
2 files changed, 912 insertions, 0 deletions
diff --git a/ansible_collections/community/fortios/plugins/httpapi/fortianalyzer.py b/ansible_collections/community/fortios/plugins/httpapi/fortianalyzer.py new file mode 100644 index 000000000..0e59ea48e --- /dev/null +++ b/ansible_collections/community/fortios/plugins/httpapi/fortianalyzer.py @@ -0,0 +1,453 @@ +# Copyright (c) 2018 Fortinet and/or its affiliates. +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. +# + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = ''' +--- +author: + - Luke Weighall (@lweighall) + - Andrew Welsh (@Ghilli3) + - Jim Huber (@p4r4n0y1ng) +name: fortianalyzer +short_description: HttpApi Plugin for Fortinet FortiAnalyzer Appliance or VM. +description: + - This HttpApi plugin provides methods to connect to Fortinet FortiAnalyzer Appliance or VM via JSON RPC API. + +''' + +import json +from ansible.plugins.httpapi import HttpApiBase +from ansible.module_utils.basic import to_text +from ansible_collections.community.fortios.plugins.module_utils.fortianalyzer.common import BASE_HEADERS +from ansible_collections.community.fortios.plugins.module_utils.fortianalyzer.common import FAZBaseException +from ansible_collections.community.fortios.plugins.module_utils.fortianalyzer.common import FAZCommon +from ansible_collections.community.fortios.plugins.module_utils.fortianalyzer.common import FAZMethods + + +class HttpApi(HttpApiBase): + def __init__(self, connection): + super(HttpApi, self).__init__(connection) + self._req_id = 0 + self._sid = None + self._url = "/jsonrpc" + self._host = None + self._tools = FAZCommon + self._debug = False + self._connected_faz = None + self._last_response_msg = None + self._last_response_code = None + self._last_data_payload = None + self._last_url = None + self._last_response_raw = None + self._locked_adom_list = list() + self._locked_adoms_by_user = list() + self._uses_workspace = False + self._uses_adoms = False + self._adom_list = list() + self._logged_in_user = None + + def set_become(self, become_context): + """ + ELEVATION IS NOT REQUIRED ON FORTINET DEVICES - SKIPPED + :param become_context: Unused input. + :return: None + """ + return None + + def update_auth(self, response, response_data): + """ + TOKENS ARE NOT USED SO NO NEED TO UPDATE AUTH + :param response: Unused input. + :param response_data Unused_input. + :return: None + """ + return None + + def login(self, username, password): + """ + This function will log the plugin into FortiAnalyzer, and return the results. + :param username: Username of FortiAnalyzer Admin + :param password: Password of FortiAnalyzer Admin + + :return: Dictionary of status if it logged in or not. + """ + + self._logged_in_user = username + self.send_request(FAZMethods.EXEC, self._tools.format_request(FAZMethods.EXEC, "sys/login/user", + passwd=password, user=username,)) + + if "FortiAnalyzer object connected to FortiAnalyzer" in self.__str__(): + # If Login worked then inspect the FortiAnalyzer for Workspace Mode, and it's system information. + self.inspect_faz() + return + else: + raise FAZBaseException(msg="Unknown error while logging in...connection was lost during login operation..." + " Exiting") + + def inspect_faz(self): + # CHECK FOR WORKSPACE MODE TO SEE IF WE HAVE TO ENABLE ADOM LOCKS + status = self.get_system_status() + if status[0] == -11: + # THE CONNECTION GOT LOST SOMEHOW, REMOVE THE SID AND REPORT BAD LOGIN + self.logout() + raise FAZBaseException(msg="Error -11 -- the Session ID was likely malformed somehow. Contact authors." + " Exiting") + elif status[0] == 0: + try: + self.check_mode() + if self._uses_adoms: + self.get_adom_list() + if self._uses_workspace: + self.get_locked_adom_list() + self._connected_faz = status[1] + self._host = self._connected_faz["Hostname"] + except Exception: + pass + return + + def logout(self): + """ + This function will logout of the FortiAnalyzer. + """ + if self.sid is not None: + # IF WE WERE USING WORKSPACES, THEN CLEAN UP OUR LOCKS IF THEY STILL EXIST + if self.uses_workspace: + self.get_lock_info() + self.run_unlock() + ret_code, response = self.send_request(FAZMethods.EXEC, + self._tools.format_request(FAZMethods.EXEC, "sys/logout")) + self.sid = None + return ret_code, response + + def send_request(self, method, params): + """ + Responsible for actual sending of data to the connection httpapi base plugin. Does some formatting as well. + :param params: A formatted dictionary that was returned by self.common_datagram_params() + before being called here. + :param method: The preferred API Request method (GET, ADD, POST, etc....) + :type method: basestring + + :return: Dictionary of status if it logged in or not. + """ + + try: + if self.sid is None and params[0]["url"] != "sys/login/user": + try: + self.connection._connect() + except Exception as err: + raise FAZBaseException( + msg="An problem happened with the httpapi plugin self-init connection process. " + "Error: " + to_text(err)) + except IndexError: + raise FAZBaseException("An attempt was made at communicating with a FAZ with " + "no valid session and an incorrectly formatted request.") + except Exception: + raise FAZBaseException("An attempt was made at communicating with a FAZ with " + "no valid session and an unexpected error was discovered.") + + self._update_request_id() + json_request = { + "method": method, + "params": params, + "session": self.sid, + "id": self.req_id, + "verbose": 1 + } + data = json.dumps(json_request, ensure_ascii=False).replace('\\\\', '\\') + try: + # Sending URL and Data in Unicode, per Ansible Specifications for Connection Plugins + response, response_data = self.connection.send(path=to_text(self._url), data=to_text(data), + headers=BASE_HEADERS) + # Get Unicode Response - Must convert from StringIO to unicode first so we can do a replace function below + result = json.loads(to_text(response_data.getvalue())) + self._update_self_from_response(result, self._url, data) + return self._handle_response(result) + except Exception as err: + raise FAZBaseException(err) + + def _handle_response(self, response): + self._set_sid(response) + if isinstance(response["result"], list): + result = response["result"][0] + else: + result = response["result"] + if "data" in result: + return result["status"]["code"], result["data"] + else: + return result["status"]["code"], result + + def _update_self_from_response(self, response, url, data): + self._last_response_raw = response + if isinstance(response["result"], list): + result = response["result"][0] + else: + result = response["result"] + if "status" in result: + self._last_response_code = result["status"]["code"] + self._last_response_msg = result["status"]["message"] + self._last_url = url + self._last_data_payload = data + + def _set_sid(self, response): + if self.sid is None and "session" in response: + self.sid = response["session"] + + def return_connected_faz(self): + """ + Returns the data stored under self._connected_faz + + :return: dict + """ + try: + if self._connected_faz: + return self._connected_faz + except Exception: + raise FAZBaseException("Couldn't Retrieve Connected FAZ Stats") + + def get_system_status(self): + """ + Returns the system status page from the FortiAnalyzer, for logging and other uses. + return: status + """ + status = self.send_request(FAZMethods.GET, self._tools.format_request(FAZMethods.GET, "sys/status")) + return status + + @property + def debug(self): + return self._debug + + @debug.setter + def debug(self, val): + self._debug = val + + @property + def req_id(self): + return self._req_id + + @req_id.setter + def req_id(self, val): + self._req_id = val + + def _update_request_id(self, reqid=0): + self.req_id = reqid if reqid != 0 else self.req_id + 1 + + @property + def sid(self): + return self._sid + + @sid.setter + def sid(self, val): + self._sid = val + + def __str__(self): + if self.sid is not None and self.connection._url is not None: + return "FortiAnalyzer object connected to FortiAnalyzer: " + to_text(self.connection._url) + return "FortiAnalyzer object with no valid connection to a FortiAnalyzer appliance." + + ################################## + # BEGIN DATABASE LOCK CONTEXT CODE + ################################## + + @property + def uses_workspace(self): + return self._uses_workspace + + @uses_workspace.setter + def uses_workspace(self, val): + self._uses_workspace = val + + @property + def uses_adoms(self): + return self._uses_adoms + + @uses_adoms.setter + def uses_adoms(self, val): + self._uses_adoms = val + + def add_adom_to_lock_list(self, adom): + if adom not in self._locked_adom_list: + self._locked_adom_list.append(adom) + + def remove_adom_from_lock_list(self, adom): + if adom in self._locked_adom_list: + self._locked_adom_list.remove(adom) + + def check_mode(self): + """ + Checks FortiAnalyzer for the use of Workspace mode + """ + url = "/cli/global/system/global" + code, resp_obj = self.send_request(FAZMethods.GET, + self._tools.format_request(FAZMethods.GET, + url, + fields=["workspace-mode", "adom-status"])) + try: + if resp_obj["workspace-mode"] == "workflow": + self.uses_workspace = True + elif resp_obj["workspace-mode"] == "disabled": + self.uses_workspace = False + except KeyError: + self.uses_workspace = False + except Exception: + raise FAZBaseException(msg="Couldn't determine workspace-mode in the plugin") + try: + if resp_obj["adom-status"] in [1, "enable"]: + self.uses_adoms = True + else: + self.uses_adoms = False + except KeyError: + self.uses_adoms = False + except Exception: + raise FAZBaseException(msg="Couldn't determine adom-status in the plugin") + + def run_unlock(self): + """ + Checks for ADOM status, if locked, it will unlock + """ + for adom_locked in self._locked_adoms_by_user: + adom = adom_locked["adom"] + self.unlock_adom(adom) + + def lock_adom(self, adom=None, *args, **kwargs): + """ + Locks an ADOM for changes + """ + if adom: + if adom.lower() == "global": + url = "/dvmdb/global/workspace/lock/" + else: + url = "/dvmdb/adom/{adom}/workspace/lock/".format(adom=adom) + else: + url = "/dvmdb/adom/root/workspace/lock" + code, respobj = self.send_request(FAZMethods.EXEC, self._tools.format_request(FAZMethods.EXEC, url)) + if code == 0 and respobj["status"]["message"].lower() == "ok": + self.add_adom_to_lock_list(adom) + return code, respobj + + def unlock_adom(self, adom=None, *args, **kwargs): + """ + Unlocks an ADOM after changes + """ + if adom: + if adom.lower() == "global": + url = "/dvmdb/global/workspace/unlock/" + else: + url = "/dvmdb/adom/{adom}/workspace/unlock/".format(adom=adom) + else: + url = "/dvmdb/adom/root/workspace/unlock" + code, respobj = self.send_request(FAZMethods.EXEC, self._tools.format_request(FAZMethods.EXEC, url)) + if code == 0 and respobj["status"]["message"].lower() == "ok": + self.remove_adom_from_lock_list(adom) + return code, respobj + + def commit_changes(self, adom=None, aux=False, *args, **kwargs): + """ + Commits changes to an ADOM + """ + if adom: + if aux: + url = "/pm/config/adom/{adom}/workspace/commit".format(adom=adom) + else: + if adom.lower() == "global": + url = "/dvmdb/global/workspace/commit/" + else: + url = "/dvmdb/adom/{adom}/workspace/commit".format(adom=adom) + else: + url = "/dvmdb/adom/root/workspace/commit" + return self.send_request(FAZMethods.EXEC, self._tools.format_request(FAZMethods.EXEC, url)) + + def get_lock_info(self, adom=None): + """ + Gets ADOM lock info so it can be displayed with the error messages. Or if determined to be locked by ansible + for some reason, then unlock it. + """ + if not adom or adom == "root": + url = "/dvmdb/adom/root/workspace/lockinfo" + else: + if adom.lower() == "global": + url = "/dvmdb/global/workspace/lockinfo/" + else: + url = "/dvmdb/adom/{adom}/workspace/lockinfo/".format(adom=adom) + datagram = {} + data = self._tools.format_request(FAZMethods.GET, url, **datagram) + resp_obj = self.send_request(FAZMethods.GET, data) + code = resp_obj[0] + if code != 0: + self._module.fail_json(msg=("An error occurred trying to get the ADOM Lock Info. Error: " + to_text(resp_obj))) + elif code == 0: + try: + if resp_obj[1]["status"]["message"] == "OK": + self._lock_info = None + except Exception: + self._lock_info = resp_obj[1] + return resp_obj + + def get_adom_list(self): + """ + Gets the list of ADOMs for the FortiAnalyzer + """ + if self.uses_adoms: + url = "/dvmdb/adom" + datagram = {} + data = self._tools.format_request(FAZMethods.GET, url, **datagram) + resp_obj = self.send_request(FAZMethods.GET, data) + code = resp_obj[0] + if code != 0: + self._module.fail_json(msg=("An error occurred trying to get the ADOM Info. Error: " + to_text(resp_obj))) + elif code == 0: + num_of_adoms = len(resp_obj[1]) + append_list = ['root', ] + for adom in resp_obj[1]: + if adom["tab_status"] != "": + append_list.append(to_text(adom["name"])) + self._adom_list = append_list + return resp_obj + + def get_locked_adom_list(self): + """ + Gets the list of locked adoms + """ + try: + locked_list = list() + locked_by_user_list = list() + for adom in self._adom_list: + adom_lock_info = self.get_lock_info(adom=adom) + try: + if adom_lock_info[1]["status"]["message"] == "OK": + continue + except Exception: + pass + try: + if adom_lock_info[1][0]["lock_user"]: + locked_list.append(to_text(adom)) + if adom_lock_info[1][0]["lock_user"] == self._logged_in_user: + locked_by_user_list.append({"adom": to_text(adom), "user": to_text(adom_lock_info[1][0]["lock_user"])}) + except Exception as err: + raise FAZBaseException(err) + self._locked_adom_list = locked_list + self._locked_adoms_by_user = locked_by_user_list + + except Exception as err: + raise FAZBaseException(msg=("An error occurred while trying to get the locked adom list. Error: " + + to_text(err))) + + ################################# + # END DATABASE LOCK CONTEXT CODE + ################################# diff --git a/ansible_collections/community/fortios/plugins/httpapi/fortimanager.py b/ansible_collections/community/fortios/plugins/httpapi/fortimanager.py new file mode 100644 index 000000000..bbdaacb8d --- /dev/null +++ b/ansible_collections/community/fortios/plugins/httpapi/fortimanager.py @@ -0,0 +1,459 @@ +# Copyright (c) 2018 Fortinet and/or its affiliates. +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. +# + +from __future__ import (absolute_import, division, print_function) + +__metaclass__ = type + +DOCUMENTATION = ''' +--- +author: + - Luke Weighall (@lweighall) + - Andrew Welsh (@Ghilli3) + - Jim Huber (@p4r4n0y1ng) +name: fortimanager +short_description: HttpApi Plugin for Fortinet FortiManager Appliance or VM. +description: + - This HttpApi plugin provides methods to connect to Fortinet FortiManager Appliance or VM via JSON RPC API. +''' + +import json +from ansible.errors import AnsibleError +from ansible.plugins.httpapi import HttpApiBase +from ansible.module_utils.basic import to_text + +try: + from ansible_collections.fortinet.fortimanager.plugins.module_utils.common import BASE_HEADERS + from ansible_collections.fortinet.fortimanager.plugins.module_utils.common import FMGBaseException + from ansible_collections.fortinet.fortimanager.plugins.module_utils.common import FMGRCommon + from ansible_collections.fortinet.fortimanager.plugins.module_utils.common import FMGRMethods + HAS_FORTIMANAGER_COLLECTION = True +except ImportError: + HAS_FORTIMANAGER_COLLECTION = False + + +class HttpApi(HttpApiBase): + def __init__(self, connection): + super(HttpApi, self).__init__(connection) + self._req_id = 0 + self._sid = None + self._url = "/jsonrpc" + self._host = None + self._tools = FMGRCommon + self._debug = False + self._connected_fmgr = None + self._last_response_msg = None + self._last_response_code = None + self._last_data_payload = None + self._last_url = None + self._last_response_raw = None + self._locked_adom_list = list() + self._locked_adoms_by_user = list() + self._uses_workspace = False + self._uses_adoms = False + self._adom_list = list() + self._logged_in_user = None + if not HAS_FORTIMANAGER_COLLECTION: + raise AnsibleError("The community.fortios.fortimanager httpapi plugin requires the fortios.fortimanager collection.") + + def set_become(self, become_context): + """ + ELEVATION IS NOT REQUIRED ON FORTINET DEVICES - SKIPPED. + :param become_context: Unused input. + :return: None + """ + return None + + def update_auth(self, response, response_data): + """ + TOKENS ARE NOT USED SO NO NEED TO UPDATE AUTH. + :param response: Unused input. + :param response_data Unused_input. + :return: None + """ + return None + + def login(self, username, password): + + """ + This function will log the plugin into FortiManager, and return the results. + :param username: Username of FortiManager Admin + :param password: Password of FortiManager Admin + + :return: Dictionary of status if it logged in or not. + """ + self._logged_in_user = username + self.send_request(FMGRMethods.EXEC, self._tools.format_request(FMGRMethods.EXEC, "sys/login/user", + passwd=password, user=username, )) + + if "FortiManager object connected to FortiManager" in self.__str__(): + # If Login worked, then inspect the FortiManager for Workspace Mode, and it's system information. + self.inspect_fmgr() + return + else: + raise FMGBaseException(msg="Unknown error while logging in...connection was lost during login operation...." + " Exiting") + + def inspect_fmgr(self): + # CHECK FOR WORKSPACE MODE TO SEE IF WE HAVE TO ENABLE ADOM LOCKS + status = self.get_system_status() + if status[0] == -11: + # THE CONNECTION GOT LOST SOMEHOW, REMOVE THE SID AND REPORT BAD LOGIN + self.logout() + raise FMGBaseException(msg="Error -11 -- the Session ID was likely malformed somehow. Contact authors." + " Exiting") + elif status[0] == 0: + try: + self.check_mode() + if self._uses_adoms: + self.get_adom_list() + if self._uses_workspace: + self.get_locked_adom_list() + self._connected_fmgr = status[1] + self._host = self._connected_fmgr["Hostname"] + except BaseException: + pass + return + + def logout(self): + """ + This function will logout of the FortiManager. + """ + if self.sid is not None: + # IF WE WERE USING WORKSPACES, THEN CLEAN UP OUR LOCKS IF THEY STILL EXIST + if self.uses_workspace: + self.get_lock_info() + self.run_unlock() + ret_code, response = self.send_request(FMGRMethods.EXEC, + self._tools.format_request(FMGRMethods.EXEC, "sys/logout")) + self.sid = None + return ret_code, response + + def send_request(self, method, params): + """ + Responsible for actual sending of data to the connection httpapi base plugin. Does some formatting too. + :param params: A formatted dictionary that was returned by self.common_datagram_params() + before being called here. + :param method: The preferred API Request method (GET, ADD, POST, etc....) + :type method: basestring + + :return: Dictionary of status, if it logged in or not. + """ + try: + if self.sid is None and params[0]["url"] != "sys/login/user": + try: + self.connection._connect() + except Exception as err: + raise FMGBaseException( + msg="An problem happened with the httpapi plugin self-init connection process. " + "Error: " + to_text(err)) + except IndexError: + raise FMGBaseException("An attempt was made at communicating with a FMG with " + "no valid session and an incorrectly formatted request.") + except Exception as err: + raise FMGBaseException("An attempt was made at communicating with a FMG with " + "no valid session and an unexpected error was discovered. \n Error: " + to_text(err)) + + self._update_request_id() + json_request = { + "method": method, + "params": params, + "session": self.sid, + "id": self.req_id, + "verbose": 1 + } + data = json.dumps(json_request, ensure_ascii=False).replace('\\\\', '\\') + try: + # Sending URL and Data in Unicode, per Ansible Specifications for Connection Plugins + response, response_data = self.connection.send(path=to_text(self._url), data=to_text(data), + headers=BASE_HEADERS) + # Get Unicode Response - Must convert from StringIO to unicode first so we can do a replace function below + result = json.loads(to_text(response_data.getvalue())) + self._update_self_from_response(result, self._url, data) + return self._handle_response(result) + except Exception as err: + raise FMGBaseException(err) + + def _handle_response(self, response): + self._set_sid(response) + if isinstance(response["result"], list): + result = response["result"][0] + else: + result = response["result"] + if "data" in result: + return result["status"]["code"], result["data"] + else: + return result["status"]["code"], result + + def _update_self_from_response(self, response, url, data): + self._last_response_raw = response + if isinstance(response["result"], list): + result = response["result"][0] + else: + result = response["result"] + if "status" in result: + self._last_response_code = result["status"]["code"] + self._last_response_msg = result["status"]["message"] + self._last_url = url + self._last_data_payload = data + + def _set_sid(self, response): + if self.sid is None and "session" in response: + self.sid = response["session"] + + def return_connected_fmgr(self): + """ + Returns the data stored under self._connected_fmgr + + :return: dict + """ + try: + if self._connected_fmgr: + return self._connected_fmgr + except Exception: + raise FMGBaseException("Couldn't Retrieve Connected FMGR Stats") + + def get_system_status(self): + """ + Returns the system status page from the FortiManager, for logging and other uses. + return: status + """ + status = self.send_request(FMGRMethods.GET, self._tools.format_request(FMGRMethods.GET, "sys/status")) + return status + + @property + def debug(self): + return self._debug + + @debug.setter + def debug(self, val): + self._debug = val + + @property + def req_id(self): + return self._req_id + + @req_id.setter + def req_id(self, val): + self._req_id = val + + def _update_request_id(self, reqid=0): + self.req_id = reqid if reqid != 0 else self.req_id + 1 + + @property + def sid(self): + return self._sid + + @sid.setter + def sid(self, val): + self._sid = val + + def __str__(self): + if self.sid is not None and self.connection._url is not None: + return "FortiManager object connected to FortiManager: " + to_text(self.connection._url) + return "FortiManager object with no valid connection to a FortiManager appliance." + + ################################## + # BEGIN DATABASE LOCK CONTEXT CODE + ################################## + + @property + def uses_workspace(self): + return self._uses_workspace + + @uses_workspace.setter + def uses_workspace(self, val): + self._uses_workspace = val + + @property + def uses_adoms(self): + return self._uses_adoms + + @uses_adoms.setter + def uses_adoms(self, val): + self._uses_adoms = val + + def add_adom_to_lock_list(self, adom): + if adom not in self._locked_adom_list: + self._locked_adom_list.append(adom) + + def remove_adom_from_lock_list(self, adom): + if adom in self._locked_adom_list: + self._locked_adom_list.remove(adom) + + def check_mode(self): + """ + Checks FortiManager for the use of Workspace mode + """ + url = "/cli/global/system/global" + code, resp_obj = self.send_request(FMGRMethods.GET, + self._tools.format_request(FMGRMethods.GET, + url, + fields=["workspace-mode", "adom-status"])) + try: + if resp_obj["workspace-mode"] == "workflow": + self.uses_workspace = True + elif resp_obj["workspace-mode"] == "disabled": + self.uses_workspace = False + except KeyError: + raise FMGBaseException(msg="Couldn't determine workspace-mode in the plugin") + try: + if resp_obj["adom-status"] in [1, "enable"]: + self.uses_adoms = True + else: + self.uses_adoms = False + except KeyError: + raise FMGBaseException(msg="Couldn't determine adom-status in the plugin") + + def run_unlock(self): + """ + Checks for ADOM status, if locked, it will unlock + """ + for adom_locked in self._locked_adoms_by_user: + adom = adom_locked["adom"] + self.unlock_adom(adom) + + def lock_adom(self, adom=None, *args, **kwargs): + """ + Locks an ADOM for changes + """ + if adom: + if adom.lower() == "global": + url = "/dvmdb/global/workspace/lock/" + else: + url = "/dvmdb/adom/{adom}/workspace/lock/".format(adom=adom) + else: + url = "/dvmdb/adom/root/workspace/lock" + code, respobj = self.send_request(FMGRMethods.EXEC, self._tools.format_request(FMGRMethods.EXEC, url)) + if code == 0 and respobj["status"]["message"].lower() == "ok": + self.add_adom_to_lock_list(adom) + return code, respobj + + def unlock_adom(self, adom=None, *args, **kwargs): + """ + Unlocks an ADOM after changes + """ + if adom: + if adom.lower() == "global": + url = "/dvmdb/global/workspace/unlock/" + else: + url = "/dvmdb/adom/{adom}/workspace/unlock/".format(adom=adom) + else: + url = "/dvmdb/adom/root/workspace/unlock" + code, respobj = self.send_request(FMGRMethods.EXEC, self._tools.format_request(FMGRMethods.EXEC, url)) + if code == 0 and respobj["status"]["message"].lower() == "ok": + self.remove_adom_from_lock_list(adom) + return code, respobj + + def commit_changes(self, adom=None, aux=False, *args, **kwargs): + """ + Commits changes to an ADOM + """ + if adom: + if aux: + url = "/pm/config/adom/{adom}/workspace/commit".format(adom=adom) + else: + if adom.lower() == "global": + url = "/dvmdb/global/workspace/commit/" + else: + url = "/dvmdb/adom/{adom}/workspace/commit".format(adom=adom) + else: + url = "/dvmdb/adom/root/workspace/commit" + return self.send_request(FMGRMethods.EXEC, self._tools.format_request(FMGRMethods.EXEC, url)) + + def get_lock_info(self, adom=None): + """ + Gets ADOM lock info so it can be displayed with the error messages. Or if determined to be locked by ansible + for some reason, then unlock it. + """ + if not adom or adom == "root": + url = "/dvmdb/adom/root/workspace/lockinfo" + else: + if adom.lower() == "global": + url = "/dvmdb/global/workspace/lockinfo/" + else: + url = "/dvmdb/adom/{adom}/workspace/lockinfo/".format(adom=adom) + datagram = {} + data = self._tools.format_request(FMGRMethods.GET, url, **datagram) + resp_obj = self.send_request(FMGRMethods.GET, data) + code = resp_obj[0] + if code != 0: + self._module.fail_json(msg=("An error occurred trying to get the ADOM Lock Info. " + "Error: " + to_text(resp_obj))) + elif code == 0: + try: + if resp_obj[1]["status"]["message"] == "OK": + self._lock_info = None + except Exception: + self._lock_info = resp_obj[1] + return resp_obj + + def get_adom_list(self): + """ + Gets the list of ADOMs for the FortiManager + """ + if self.uses_adoms: + url = "/dvmdb/adom" + datagram = {} + data = self._tools.format_request(FMGRMethods.GET, url, **datagram) + resp_obj = self.send_request(FMGRMethods.GET, data) + code = resp_obj[0] + if code != 0: + self._module.fail_json(msg=("An error occurred trying to get the ADOM Info. " + "Error: " + to_text(resp_obj))) + elif code == 0: + num_of_adoms = len(resp_obj[1]) + append_list = ['root', ] + for adom in resp_obj[1]: + if adom["tab_status"] != "": + append_list.append(to_text(adom["name"])) + self._adom_list = append_list + return resp_obj + + def get_locked_adom_list(self): + """ + Gets the list of locked adoms + """ + try: + locked_list = list() + locked_by_user_list = list() + for adom in self._adom_list: + adom_lock_info = self.get_lock_info(adom=adom) + try: + if adom_lock_info[1]["status"]["message"] == "OK": + continue + except IndexError as err: + pass + try: + if adom_lock_info[1][0]["lock_user"]: + locked_list.append(to_text(adom)) + if adom_lock_info[1][0]["lock_user"] == self._logged_in_user: + locked_by_user_list.append({"adom": to_text(adom), + "user": to_text(adom_lock_info[1][0]["lock_user"])}) + except Exception as err: + raise FMGBaseException(err) + self._locked_adom_list = locked_list + self._locked_adoms_by_user = locked_by_user_list + + except Exception as err: + raise FMGBaseException(msg=("An error occurred while trying to get the locked adom list. Error: " + + to_text(err))) + + ################################ + # END DATABASE LOCK CONTEXT CODE + ################################ |