summaryrefslogtreecommitdiffstats
path: root/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py
blob: 64a4e44dfe3db5fa3c88913fb7fb6a8ae39ac620 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import json
from urllib.error import HTTPError, URLError
from ceph_node_proxy.baseclient import BaseClient
from ceph_node_proxy.util import get_logger, http_req
from typing import Dict, Any, Tuple, Optional
from http.client import HTTPMessage


class RedFishClient(BaseClient):
    PREFIX = '/redfish/v1/'

    def __init__(self,
                 host: str = '',
                 port: str = '443',
                 username: str = '',
                 password: str = ''):
        super().__init__(host, username, password)
        self.log = get_logger(__name__)
        self.log.info(f'Initializing redfish client {__name__}')
        self.host: str = host
        self.port: str = port
        self.url: str = f'https://{self.host}:{self.port}'
        self.token: str = ''
        self.location: str = ''

    def login(self) -> None:
        if not self.is_logged_in():
            self.log.info('Logging in to '
                          f"{self.url} as '{self.username}'")
            oob_credentials = json.dumps({'UserName': self.username,
                                          'Password': self.password})
            headers = {'Content-Type': 'application/json'}
            location_endpoint: str = ''

            try:
                _headers, _data, _status_code = self.query(data=oob_credentials,
                                                           headers=headers,
                                                           endpoint='/redfish/v1/SessionService/Sessions/')
                if _status_code != 201:
                    self.log.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}")
                    raise RuntimeError
            except URLError as e:
                msg = f"Can't log in to {self.url} as '{self.username}': {e}"
                self.log.error(msg)
                raise RuntimeError
            self.token = _headers['X-Auth-Token']
            if _headers['Location'].startswith('http'):
                # We assume the value has the following format:
                # scheme://address:port/redfish/v1/SessionService/Session
                location_endpoint = f"/{_headers['Location'].split('/', 3)[-1:][0]}"
            else:
                location_endpoint = _headers['Location']
            self.location = location_endpoint
            self.log.info(f'Logged in to {self.url}, Received header "Location": {self.location}')

    def is_logged_in(self) -> bool:
        self.log.debug(f'Checking token validity for {self.url}')
        if not self.location or not self.token:
            self.log.debug(f'No token found for {self.url}.')
            return False
        headers = {'X-Auth-Token': self.token}
        try:
            _headers, _data, _status_code = self.query(headers=headers,
                                                       endpoint=self.location)
        except URLError as e:
            self.log.error("Can't check token "
                           f'validity for {self.url}: {e}')
            raise
        return _status_code == 200

    def logout(self) -> Dict[str, Any]:
        result: Dict[str, Any] = {}
        try:
            if self.is_logged_in():
                _, _data, _status_code = self.query(method='DELETE',
                                                    headers={'X-Auth-Token': self.token},
                                                    endpoint=self.location)
                result = json.loads(_data)
        except URLError:
            self.log.error(f"Can't log out from {self.url}")

        self.location = ''
        self.token = ''

        return result

    def get_path(self, path: str) -> Dict[str, Any]:
        if self.PREFIX not in path:
            path = f'{self.PREFIX}{path}'
        try:
            _, result, _status_code = self.query(endpoint=path)
            result_json = json.loads(result)
            return result_json
        except URLError as e:
            self.log.error(f"Can't get path {path}:\n{e}")
            raise RuntimeError

    def query(self,
              data: Optional[str] = None,
              headers: Dict[str, str] = {},
              method: Optional[str] = None,
              endpoint: str = '',
              timeout: int = 10) -> Tuple[HTTPMessage, str, int]:
        _headers = headers.copy() if headers else {}
        if self.token:
            _headers['X-Auth-Token'] = self.token
        if not _headers.get('Content-Type') and method in ['POST', 'PUT', 'PATCH']:
            _headers['Content-Type'] = 'application/json'
        try:
            (response_headers,
             response_str,
             response_status) = http_req(hostname=self.host,
                                         port=self.port,
                                         endpoint=endpoint,
                                         headers=_headers,
                                         method=method,
                                         data=data,
                                         timeout=timeout)

            return response_headers, response_str, response_status
        except (HTTPError, URLError) as e:
            self.log.debug(f'{e}')
            raise