summaryrefslogtreecommitdiffstats
path: root/ansible_collections/check_point/mgmt/plugins/httpapi
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:04:41 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:04:41 +0000
commit975f66f2eebe9dadba04f275774d4ab83f74cf25 (patch)
tree89bd26a93aaae6a25749145b7e4bca4a1e75b2be /ansible_collections/check_point/mgmt/plugins/httpapi
parentInitial commit. (diff)
downloadansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.tar.xz
ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.zip
Adding upstream version 7.7.0+dfsg.upstream/7.7.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/check_point/mgmt/plugins/httpapi')
-rw-r--r--ansible_collections/check_point/mgmt/plugins/httpapi/checkpoint.py114
1 files changed, 114 insertions, 0 deletions
diff --git a/ansible_collections/check_point/mgmt/plugins/httpapi/checkpoint.py b/ansible_collections/check_point/mgmt/plugins/httpapi/checkpoint.py
new file mode 100644
index 000000000..ade89cb00
--- /dev/null
+++ b/ansible_collections/check_point/mgmt/plugins/httpapi/checkpoint.py
@@ -0,0 +1,114 @@
+# (c) 2018 Red Hat Inc.
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+
+__metaclass__ = type
+
+DOCUMENTATION = """
+---
+author: Ansible Networking Team (@rcarrillocruz)
+name: checkpoint
+short_description: HttpApi Plugin for Checkpoint devices
+description:
+ - This HttpApi plugin provides methods to connect to Checkpoint
+ devices over a HTTP(S)-based api.
+version_added: "2.8.0"
+options:
+ domain:
+ type: str
+ description:
+ - Specifies the domain of the Check Point device
+ vars:
+ - name: ansible_checkpoint_domain
+ api_key:
+ type: str
+ description:
+ - Login with api-key instead of user & password
+ vars:
+ - name: ansible_api_key
+ cloud_mgmt_id:
+ type: str
+ description:
+ - The Cloud Management ID
+ vars:
+ - name: ansible_cloud_mgmt_id
+"""
+
+import json
+
+from ansible.module_utils.basic import to_text
+from ansible.errors import AnsibleConnectionFailure
+from ansible.module_utils.six.moves.urllib.error import HTTPError
+from ansible.plugins.httpapi import HttpApiBase
+from ansible.module_utils.connection import ConnectionError
+
+BASE_HEADERS = {
+ 'Content-Type': 'application/json',
+ 'User-Agent': 'Ansible',
+}
+
+
+class HttpApi(HttpApiBase):
+ def login(self, username, password):
+ payload = {}
+ cp_domain = self.get_option('domain')
+ cp_api_key = self.get_option('api_key')
+ if cp_domain:
+ payload['domain'] = cp_domain
+ if username and password and not cp_api_key:
+ payload['user'] = username
+ payload['password'] = password
+ elif cp_api_key and not username and not password:
+ payload['api-key'] = cp_api_key
+ else:
+ raise AnsibleConnectionFailure('[Username and password] or api_key are required for login')
+ url = '/web_api/login'
+ response, response_data = self.send_request(url, payload)
+
+ try:
+ self.connection._auth = {'X-chkp-sid': response_data['sid']}
+ except KeyError:
+ raise ConnectionError(
+ 'Server returned response without token info during connection authentication: %s' % response)
+ # Case of read-only
+ if 'uid' in response_data.keys():
+ self.connection._session_uid = response_data['uid']
+
+ def logout(self):
+ url = '/web_api/logout'
+
+ response, dummy = self.send_request(url, None)
+
+ def get_session_uid(self):
+ return self.connection._session_uid
+
+ def send_request(self, path, body_params):
+ data = json.dumps(body_params) if body_params else '{}'
+ cp_cloud_mgmt_id = self.get_option('cloud_mgmt_id')
+ if cp_cloud_mgmt_id:
+ path = '/' + cp_cloud_mgmt_id + path
+ try:
+ self._display_request()
+ response, response_data = self.connection.send(path, data, method='POST', headers=BASE_HEADERS)
+ value = self._get_response_value(response_data)
+
+ return response.getcode(), self._response_to_json(value)
+ except AnsibleConnectionFailure as e:
+ return 404, e.message
+ except HTTPError as e:
+ error = json.loads(e.read())
+ return e.code, error
+
+ def _display_request(self):
+ self.connection.queue_message('vvvv', 'Web Services: %s %s' % ('POST', self.connection._url))
+
+ def _get_response_value(self, response_data):
+ return to_text(response_data.getvalue())
+
+ def _response_to_json(self, response_text):
+ try:
+ return json.loads(response_text) if response_text else {}
+ # JSONDecodeError only available on Python 3.5+
+ except ValueError:
+ raise ConnectionError('Invalid JSON response: %s' % response_text)