1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
# -*- coding: utf-8 -*-
# Copyright: (c) 2019, XLAB Steampunk <steampunk@xlab.si>
#
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
try:
from ansible.module_utils.compat import version
except ImportError:
from distutils import version
from . import errors, http
class Client:
BAD_VERSION = version.StrictVersion("9999.99.99")
def __init__(self, address, username, password, api_key, verify, ca_path):
self.address = address.rstrip("/")
self.username = username
self.password = password
self.api_key = api_key
self.verify = verify
self.ca_path = ca_path
self._auth_header = None # Login when/if required
self._version = None # Set version only if the consumer needs it
@property
def auth_header(self):
if not self._auth_header:
self._auth_header = self._login()
return self._auth_header
@property
def version(self):
if self._version is None:
resp = self.get("/version")
if resp.status != 200:
raise errors.SensuError(
"Version API returned status {0}".format(resp.status),
)
if resp.json is None:
raise errors.SensuError(
"Version API did not return a valid JSON",
)
if "sensu_backend" not in resp.json:
raise errors.SensuError(
"Version API did not return backend version",
)
try:
self._version = version.StrictVersion(
resp.json["sensu_backend"].split("#")[0]
)
except ValueError:
# Backend has no version compiled in - we are probably running
# againts self-compiled version from git.
self._version = self.BAD_VERSION
return self._version
def _login(self):
if self.api_key:
return self._api_key_login()
return self._username_password_login()
def _api_key_login(self):
# We cannot validate the API key because there is no API endpoint that
# we could hit for verification purposes. This means that the error
# reporting will be a mess but there is not much we can do here.
return dict(Authorization="Key {0}".format(self.api_key))
def _username_password_login(self):
resp = http.request(
"GET", "{0}/auth".format(self.address), force_basic_auth=True,
url_username=self.username, url_password=self.password,
validate_certs=self.verify, ca_path=self.ca_path,
)
if resp.status != 200:
raise errors.SensuError(
"Authentication call returned status {0}".format(resp.status),
)
if resp.json is None:
raise errors.SensuError(
"Authentication call did not return a valid JSON",
)
if "access_token" not in resp.json:
raise errors.SensuError(
"Authentication call did not return access token",
)
return dict(
Authorization="Bearer {0}".format(resp.json["access_token"]),
)
def request(self, method, path, payload=None):
url = self.address + path
headers = self.auth_header
response = http.request(
method, url, payload=payload, headers=headers,
validate_certs=self.verify, ca_path=self.ca_path,
)
if response.status in (401, 403):
raise errors.SensuError(
"Authentication problem. Verify your credentials."
)
return response
def get(self, path):
return self.request("GET", path)
def put(self, path, payload):
return self.request("PUT", path, payload)
def delete(self, path):
return self.request("DELETE", path)
def validate_auth_data(self, username, password):
resp = http.request(
"GET", "{0}/auth/test".format(self.address),
force_basic_auth=True, url_username=username,
url_password=password, validate_certs=self.verify,
ca_path=self.ca_path,
)
if resp.status not in (200, 401):
raise errors.SensuError(
"Authentication test returned status {0}".format(resp.status),
)
return resp.status == 200
|