summaryrefslogtreecommitdiffstats
path: root/ansible_collections/sensu/sensu_go/plugins/module_utils/client.py
blob: cd53077d395c1c01fab70f088c33ee29d7731cf4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# -*- coding: utf-8 -*-
# Copyright: (c) 2019, XLAB Steampunk <steampunk@xlab.si>
#
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function
__metaclass__ = type

try:
    from ansible.module_utils.compat import version
except ImportError:
    from distutils import version

from . import errors, http


class Client:
    BAD_VERSION = version.StrictVersion("9999.99.99")

    def __init__(self, address, username, password, api_key, verify, ca_path):
        self.address = address.rstrip("/")
        self.username = username
        self.password = password
        self.api_key = api_key
        self.verify = verify
        self.ca_path = ca_path

        self._auth_header = None  # Login when/if required
        self._version = None  # Set version only if the consumer needs it

    @property
    def auth_header(self):
        if not self._auth_header:
            self._auth_header = self._login()
        return self._auth_header

    @property
    def version(self):
        if self._version is None:
            resp = self.get("/version")
            if resp.status != 200:
                raise errors.SensuError(
                    "Version API returned status {0}".format(resp.status),
                )
            if resp.json is None:
                raise errors.SensuError(
                    "Version API did not return a valid JSON",
                )
            if "sensu_backend" not in resp.json:
                raise errors.SensuError(
                    "Version API did not return backend version",
                )
            try:
                self._version = version.StrictVersion(
                    resp.json["sensu_backend"].split("#")[0]
                )
            except ValueError:
                # Backend has no version compiled in - we are probably running
                # againts self-compiled version from git.
                self._version = self.BAD_VERSION

        return self._version

    def _login(self):
        if self.api_key:
            return self._api_key_login()
        return self._username_password_login()

    def _api_key_login(self):
        # We cannot validate the API key because there is no API endpoint that
        # we could hit for verification purposes. This means that the error
        # reporting will be a mess but there is not much we can do here.
        return dict(Authorization="Key {0}".format(self.api_key))

    def _username_password_login(self):
        resp = http.request(
            "GET", "{0}/auth".format(self.address), force_basic_auth=True,
            url_username=self.username, url_password=self.password,
            validate_certs=self.verify, ca_path=self.ca_path,
        )

        if resp.status != 200:
            raise errors.SensuError(
                "Authentication call returned status {0}".format(resp.status),
            )

        if resp.json is None:
            raise errors.SensuError(
                "Authentication call did not return a valid JSON",
            )

        if "access_token" not in resp.json:
            raise errors.SensuError(
                "Authentication call did not return access token",
            )

        return dict(
            Authorization="Bearer {0}".format(resp.json["access_token"]),
        )

    def request(self, method, path, payload=None):
        url = self.address + path
        headers = self.auth_header

        response = http.request(
            method, url, payload=payload, headers=headers,
            validate_certs=self.verify, ca_path=self.ca_path,
        )

        if response.status in (401, 403):
            raise errors.SensuError(
                "Authentication problem. Verify your credentials."
            )

        return response

    def get(self, path):
        return self.request("GET", path)

    def put(self, path, payload):
        return self.request("PUT", path, payload)

    def delete(self, path):
        return self.request("DELETE", path)

    def validate_auth_data(self, username, password):
        resp = http.request(
            "GET", "{0}/auth/test".format(self.address),
            force_basic_auth=True, url_username=username,
            url_password=password, validate_certs=self.verify,
            ca_path=self.ca_path,
        )
        if resp.status not in (200, 401):
            raise errors.SensuError(
                "Authentication test returned status {0}".format(resp.status),
            )
        return resp.status == 200