1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
import json
from urllib.error import HTTPError, URLError
from ceph_node_proxy.baseclient import BaseClient
from ceph_node_proxy.util import get_logger, http_req
from typing import Dict, Any, Tuple, Optional
from http.client import HTTPMessage
class RedFishClient(BaseClient):
PREFIX = '/redfish/v1/'
def __init__(self,
host: str = '',
port: str = '443',
username: str = '',
password: str = ''):
super().__init__(host, username, password)
self.log = get_logger(__name__)
self.log.info(f'Initializing redfish client {__name__}')
self.host: str = host
self.port: str = port
self.url: str = f'https://{self.host}:{self.port}'
self.token: str = ''
self.location: str = ''
def login(self) -> None:
if not self.is_logged_in():
self.log.info('Logging in to '
f"{self.url} as '{self.username}'")
oob_credentials = json.dumps({'UserName': self.username,
'Password': self.password})
headers = {'Content-Type': 'application/json'}
location_endpoint: str = ''
try:
_headers, _data, _status_code = self.query(data=oob_credentials,
headers=headers,
endpoint='/redfish/v1/SessionService/Sessions/')
if _status_code != 201:
self.log.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}")
raise RuntimeError
except URLError as e:
msg = f"Can't log in to {self.url} as '{self.username}': {e}"
self.log.error(msg)
raise RuntimeError
self.token = _headers['X-Auth-Token']
if _headers['Location'].startswith('http'):
# We assume the value has the following format:
# scheme://address:port/redfish/v1/SessionService/Session
location_endpoint = f"/{_headers['Location'].split('/', 3)[-1:][0]}"
else:
location_endpoint = _headers['Location']
self.location = location_endpoint
self.log.info(f'Logged in to {self.url}, Received header "Location": {self.location}')
def is_logged_in(self) -> bool:
self.log.debug(f'Checking token validity for {self.url}')
if not self.location or not self.token:
self.log.debug(f'No token found for {self.url}.')
return False
headers = {'X-Auth-Token': self.token}
try:
_headers, _data, _status_code = self.query(headers=headers,
endpoint=self.location)
except URLError as e:
self.log.error("Can't check token "
f'validity for {self.url}: {e}')
raise
return _status_code == 200
def logout(self) -> Dict[str, Any]:
result: Dict[str, Any] = {}
try:
if self.is_logged_in():
_, _data, _status_code = self.query(method='DELETE',
headers={'X-Auth-Token': self.token},
endpoint=self.location)
result = json.loads(_data)
except URLError:
self.log.error(f"Can't log out from {self.url}")
self.location = ''
self.token = ''
return result
def get_path(self, path: str) -> Dict[str, Any]:
if self.PREFIX not in path:
path = f'{self.PREFIX}{path}'
try:
_, result, _status_code = self.query(endpoint=path)
result_json = json.loads(result)
return result_json
except URLError as e:
self.log.error(f"Can't get path {path}:\n{e}")
raise RuntimeError
def query(self,
data: Optional[str] = None,
headers: Dict[str, str] = {},
method: Optional[str] = None,
endpoint: str = '',
timeout: int = 10) -> Tuple[HTTPMessage, str, int]:
_headers = headers.copy() if headers else {}
if self.token:
_headers['X-Auth-Token'] = self.token
if not _headers.get('Content-Type') and method in ['POST', 'PUT', 'PATCH']:
_headers['Content-Type'] = 'application/json'
try:
(response_headers,
response_str,
response_status) = http_req(hostname=self.host,
port=self.port,
endpoint=endpoint,
headers=_headers,
method=method,
data=data,
timeout=timeout)
return response_headers, response_str, response_status
except (HTTPError, URLError) as e:
self.log.debug(f'{e}')
raise
|