diff options
Diffstat (limited to 'src/ceph-node-proxy/ceph_node_proxy')
-rw-r--r-- | src/ceph-node-proxy/ceph_node_proxy/__init__.py | 2 | ||||
-rw-r--r-- | src/ceph-node-proxy/ceph_node_proxy/api.py | 285 | ||||
-rw-r--r-- | src/ceph-node-proxy/ceph_node_proxy/baseclient.py | 20 | ||||
-rw-r--r-- | src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py | 283 | ||||
-rw-r--r-- | src/ceph-node-proxy/ceph_node_proxy/basesystem.py | 96 | ||||
-rw-r--r-- | src/ceph-node-proxy/ceph_node_proxy/main.py | 199 | ||||
-rw-r--r-- | src/ceph-node-proxy/ceph_node_proxy/redfish_client.py | 123 | ||||
-rw-r--r-- | src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py | 166 | ||||
-rw-r--r-- | src/ceph-node-proxy/ceph_node_proxy/reporter.py | 69 | ||||
-rw-r--r-- | src/ceph-node-proxy/ceph_node_proxy/util.py | 196 |
10 files changed, 1439 insertions, 0 deletions
diff --git a/src/ceph-node-proxy/ceph_node_proxy/__init__.py b/src/ceph-node-proxy/ceph_node_proxy/__init__.py new file mode 100644 index 000000000..20403aa92 --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/__init__.py @@ -0,0 +1,2 @@ +__version__ = '1.0.0' +__release__ = 'squid' diff --git a/src/ceph-node-proxy/ceph_node_proxy/api.py b/src/ceph-node-proxy/ceph_node_proxy/api.py new file mode 100644 index 000000000..25ae03e51 --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/api.py @@ -0,0 +1,285 @@ +import cherrypy # type: ignore +from urllib.error import HTTPError +from cherrypy._cpserver import Server # type: ignore +from threading import Thread, Event +from typing import Dict, Any, List +from ceph_node_proxy.util import Config, get_logger, write_tmp_file +from ceph_node_proxy.basesystem import BaseSystem +from ceph_node_proxy.reporter import Reporter +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from ceph_node_proxy.main import NodeProxyManager + + +@cherrypy.tools.auth_basic(on=True) +@cherrypy.tools.allow(methods=['PUT']) +@cherrypy.tools.json_out() +class Admin(): + def __init__(self, api: 'API') -> None: + self.api = api + + @cherrypy.expose + def start(self) -> Dict[str, str]: + self.api.backend.start() + self.api.reporter.run() + return {'ok': 'node-proxy daemon started'} + + @cherrypy.expose + def reload(self) -> Dict[str, str]: + self.api.config.reload() + return {'ok': 'node-proxy config reloaded'} + + def _stop(self) -> None: + self.api.backend.shutdown() + self.api.reporter.shutdown() + + @cherrypy.expose + def stop(self) -> Dict[str, str]: + self._stop() + return {'ok': 'node-proxy daemon stopped'} + + @cherrypy.expose + def shutdown(self) -> Dict[str, str]: + self._stop() + cherrypy.engine.exit() + return {'ok': 'Server shutdown.'} + + @cherrypy.expose + def flush(self) -> Dict[str, str]: + self.api.backend.flush() + return {'ok': 'node-proxy data flushed'} + + +class API(Server): + def __init__(self, + backend: 'BaseSystem', + reporter: 'Reporter', + config: 'Config', + addr: str = '0.0.0.0', + port: int = 0) -> None: + super().__init__() + self.log = get_logger(__name__) + self.backend = backend + self.reporter = reporter + self.config = config + self.socket_port = self.config.__dict__['api']['port'] if not port else port + self.socket_host = addr + self.subscribe() + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def memory(self) -> Dict[str, Any]: + return {'memory': self.backend.get_memory()} + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def network(self) -> Dict[str, Any]: + return {'network': self.backend.get_network()} + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def processors(self) -> Dict[str, Any]: + return {'processors': self.backend.get_processors()} + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def storage(self) -> Dict[str, Any]: + return {'storage': self.backend.get_storage()} + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def power(self) -> Dict[str, Any]: + return {'power': self.backend.get_power()} + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def fans(self) -> Dict[str, Any]: + return {'fans': self.backend.get_fans()} + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def firmwares(self) -> Dict[str, Any]: + return {'firmwares': self.backend.get_firmwares()} + + def _cp_dispatch(self, vpath: List[str]) -> 'API': + if vpath[0] == 'led' and len(vpath) > 1: # /led/{type}/{id} + _type = vpath[1] + cherrypy.request.params['type'] = _type + vpath.pop(1) # /led/{id} or # /led + if _type == 'drive' and len(vpath) > 1: # /led/{id} + _id = vpath[1] + vpath.pop(1) # /led + cherrypy.request.params['id'] = _id + vpath[0] = '_led' + # /<endpoint> + return self + + @cherrypy.expose + @cherrypy.tools.allow(methods=['POST']) + @cherrypy.tools.json_in() + @cherrypy.tools.json_out() + @cherrypy.tools.auth_basic(on=True) + def shutdown(self, **kw: Any) -> int: + data: Dict[str, bool] = cherrypy.request.json + + if 'force' not in data.keys(): + msg = "The key 'force' wasn't passed." + self.log.debug(msg) + raise cherrypy.HTTPError(400, msg) + try: + result: int = self.backend.shutdown_host(force=data['force']) + except HTTPError as e: + raise cherrypy.HTTPError(e.code, e.reason) + return result + + @cherrypy.expose + @cherrypy.tools.allow(methods=['POST']) + @cherrypy.tools.json_in() + @cherrypy.tools.json_out() + @cherrypy.tools.auth_basic(on=True) + def powercycle(self, **kw: Any) -> int: + try: + result: int = self.backend.powercycle() + except HTTPError as e: + raise cherrypy.HTTPError(e.code, e.reason) + return result + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET', 'PATCH']) + @cherrypy.tools.json_in() + @cherrypy.tools.json_out() + @cherrypy.tools.auth_basic(on=True) + def _led(self, **kw: Any) -> Dict[str, Any]: + method: str = cherrypy.request.method + led_type: Optional[str] = kw.get('type') + id_drive: Optional[str] = kw.get('id') + result: Dict[str, Any] = dict() + + if not led_type: + msg = "the led type must be provided (either 'chassis' or 'drive')." + self.log.debug(msg) + raise cherrypy.HTTPError(400, msg) + + if led_type == 'drive': + id_drive_required = not id_drive + if id_drive_required or id_drive not in self.backend.get_storage(): + msg = 'A valid device ID must be provided.' + self.log.debug(msg) + raise cherrypy.HTTPError(400, msg) + + try: + if method == 'PATCH': + data: Dict[str, Any] = cherrypy.request.json + + if 'state' not in data or data['state'] not in ['on', 'off']: + msg = "Invalid data. 'state' must be provided and have a valid value (on|off)." + self.log.error(msg) + raise cherrypy.HTTPError(400, msg) + + func: Any = (self.backend.device_led_on if led_type == 'drive' and data['state'] == 'on' else + self.backend.device_led_off if led_type == 'drive' and data['state'] == 'off' else + self.backend.chassis_led_on if led_type != 'drive' and data['state'] == 'on' else + self.backend.chassis_led_off if led_type != 'drive' and data['state'] == 'off' else None) + + else: + func = self.backend.get_device_led if led_type == 'drive' else self.backend.get_chassis_led + + result = func(id_drive) if led_type == 'drive' else func() + + except HTTPError as e: + raise cherrypy.HTTPError(e.code, e.reason) + return result + + @cherrypy.expose + @cherrypy.tools.allow(methods=['GET']) + @cherrypy.tools.json_out() + def get_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]: + return self.backend.get_led() + + @cherrypy.expose + @cherrypy.tools.allow(methods=['PATCH']) + @cherrypy.tools.json_in() + @cherrypy.tools.json_out() + @cherrypy.tools.auth_basic(on=True) + def set_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]: + data = cherrypy.request.json + rc = self.backend.set_led(data) + + if rc != 200: + cherrypy.response.status = rc + result = {'state': 'error: please, verify the data you sent.'} + else: + result = {'state': data['state'].lower()} + return result + + def stop(self) -> None: + self.unsubscribe() + super().stop() + + +class NodeProxyApi(Thread): + def __init__(self, node_proxy_mgr: 'NodeProxyManager') -> None: + super().__init__() + self.log = get_logger(__name__) + self.cp_shutdown_event = Event() + self.node_proxy_mgr = node_proxy_mgr + self.username = self.node_proxy_mgr.username + self.password = self.node_proxy_mgr.password + self.ssl_crt = self.node_proxy_mgr.api_ssl_crt + self.ssl_key = self.node_proxy_mgr.api_ssl_key + self.system = self.node_proxy_mgr.system + self.reporter_agent = self.node_proxy_mgr.reporter_agent + self.config = self.node_proxy_mgr.config + self.api = API(self.system, self.reporter_agent, self.config) + + def check_auth(self, realm: str, username: str, password: str) -> bool: + return self.username == username and \ + self.password == password + + def shutdown(self) -> None: + self.log.info('Stopping node-proxy API...') + self.cp_shutdown_event.set() + + def run(self) -> None: + self.log.info('node-proxy API configuration...') + cherrypy.config.update({ + 'environment': 'production', + 'engine.autoreload.on': False, + 'log.screen': True, + }) + config = {'/': { + 'request.methods_with_bodies': ('POST', 'PUT', 'PATCH'), + 'tools.trailing_slash.on': False, + 'tools.auth_basic.realm': 'localhost', + 'tools.auth_basic.checkpassword': self.check_auth + }} + cherrypy.tree.mount(self.api, '/', config=config) + # cherrypy.tree.mount(admin, '/admin', config=config) + + ssl_crt = write_tmp_file(self.ssl_crt, + prefix_name='listener-crt-') + ssl_key = write_tmp_file(self.ssl_key, + prefix_name='listener-key-') + + self.api.ssl_certificate = ssl_crt.name + self.api.ssl_private_key = ssl_key.name + + cherrypy.server.unsubscribe() + try: + cherrypy.engine.start() + self.log.info('node-proxy API started.') + self.cp_shutdown_event.wait() + self.cp_shutdown_event.clear() + cherrypy.engine.exit() + cherrypy.server.httpserver = None + self.log.info('node-proxy API shutdown.') + except Exception as e: + self.log.error(f'node-proxy API error: {e}') diff --git a/src/ceph-node-proxy/ceph_node_proxy/baseclient.py b/src/ceph-node-proxy/ceph_node_proxy/baseclient.py new file mode 100644 index 000000000..6b4656148 --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/baseclient.py @@ -0,0 +1,20 @@ +from typing import Dict, Any + + +class BaseClient: + def __init__(self, + host: str, + username: str, + password: str) -> None: + self.host = host + self.username = username + self.password = password + + def login(self) -> None: + raise NotImplementedError() + + def logout(self) -> Dict[str, Any]: + raise NotImplementedError() + + def get_path(self, path: str) -> Dict: + raise NotImplementedError() diff --git a/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py b/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py new file mode 100644 index 000000000..ea4e65cc6 --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py @@ -0,0 +1,283 @@ +import concurrent.futures +import json +from ceph_node_proxy.basesystem import BaseSystem +from ceph_node_proxy.redfish_client import RedFishClient +from time import sleep +from ceph_node_proxy.util import get_logger +from typing import Dict, Any, List, Callable, Union +from urllib.error import HTTPError, URLError + + +class BaseRedfishSystem(BaseSystem): + def __init__(self, **kw: Any) -> None: + super().__init__(**kw) + self.common_endpoints: List[str] = kw.get('common_endpoints', ['/Systems/System.Embedded.1', + '/UpdateService']) + self.chassis_endpoint: str = kw.get('chassis_endpoint', '/Chassis/System.Embedded.1') + self.log = get_logger(__name__) + self.host: str = kw['host'] + self.port: str = kw['port'] + self.username: str = kw['username'] + self.password: str = kw['password'] + # move the following line (class attribute?) + self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password) + self.log.info(f'redfish system initialization, host: {self.host}, user: {self.username}') + self.data_ready: bool = False + self.previous_data: Dict = {} + self.data: Dict[str, Dict[str, Any]] = {} + self._system: Dict[str, Dict[str, Any]] = {} + self._sys: Dict[str, Any] = {} + self.job_service_endpoint: str = '' + self.create_reboot_job_endpoint: str = '' + self.setup_job_queue_endpoint: str = '' + self.component_list: List[str] = kw.get('component_list', ['memory', + 'power', + 'fans', + 'network', + 'processors', + 'storage', + 'firmwares']) + self.update_funcs: List[Callable] = [] + for component in self.component_list: + self.log.debug(f'adding: {component} to hw component gathered list.') + func = f'_update_{component}' + if hasattr(self, func): + f = getattr(self, func) + self.update_funcs.append(f) + + def main(self) -> None: + self.stop = False + self.client.login() + while not self.stop: + self.log.debug('waiting for a lock in the update loop.') + with self.lock: + if not self.pending_shutdown: + self.log.debug('lock acquired in the update loop.') + try: + self._update_system() + self._update_sn() + + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map(lambda f: f(), self.update_funcs) + + self.data_ready = True + except RuntimeError as e: + self.stop = True + self.log.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}') + self.client.logout() + raise + sleep(5) + self.log.debug('lock released in the update loop.') + self.log.debug('exiting update loop.') + raise SystemExit(0) + + def flush(self) -> None: + self.log.debug('Acquiring lock to flush data.') + self.lock.acquire() + self.log.debug('Lock acquired, flushing data.') + self._system = {} + self.previous_data = {} + self.log.info('Data flushed.') + self.data_ready = False + self.log.debug('Data marked as not ready.') + self.lock.release() + self.log.debug('Released the lock after flushing data.') + + # @retry(retries=10, delay=2) + def _get_path(self, path: str) -> Dict: + result: Dict[str, Any] = {} + try: + if not self.pending_shutdown: + self.log.debug(f'Getting path: {path}') + result = self.client.get_path(path) + else: + self.log.debug(f'Pending shutdown, aborting query to {path}') + except RuntimeError: + raise + if result is None: + self.log.error(f'The client reported an error when getting path: {path}') + raise RuntimeError(f'Could not get path: {path}') + return result + + def get_members(self, data: Dict[str, Any], path: str) -> List: + _path = data[path]['@odata.id'] + _data = self._get_path(_path) + return [self._get_path(member['@odata.id']) for member in _data['Members']] + + def get_system(self) -> Dict[str, Any]: + result = { + 'host': self.get_host(), + 'sn': self.get_sn(), + 'status': { + 'storage': self.get_storage(), + 'processors': self.get_processors(), + 'network': self.get_network(), + 'memory': self.get_memory(), + 'power': self.get_power(), + 'fans': self.get_fans() + }, + 'firmwares': self.get_firmwares(), + 'chassis': {'redfish_endpoint': f'/redfish/v1{self.chassis_endpoint}'} # TODO(guits): not ideal + } + return result + + def _update_system(self) -> None: + for endpoint in self.common_endpoints: + result = self.client.get_path(endpoint) + _endpoint = endpoint.strip('/').split('/')[0] + self._system[_endpoint] = result + + def _update_sn(self) -> None: + raise NotImplementedError() + + def _update_memory(self) -> None: + raise NotImplementedError() + + def _update_power(self) -> None: + raise NotImplementedError() + + def _update_fans(self) -> None: + raise NotImplementedError() + + def _update_network(self) -> None: + raise NotImplementedError() + + def _update_processors(self) -> None: + raise NotImplementedError() + + def _update_storage(self) -> None: + raise NotImplementedError() + + def _update_firmwares(self) -> None: + raise NotImplementedError() + + def device_led_on(self, device: str) -> int: + data: Dict[str, bool] = {'LocationIndicatorActive': True} + try: + result = self.set_device_led(device, data) + except (HTTPError, KeyError): + return 0 + return result + + def device_led_off(self, device: str) -> int: + data: Dict[str, bool] = {'LocationIndicatorActive': False} + try: + result = self.set_device_led(device, data) + except (HTTPError, KeyError): + return 0 + return result + + def chassis_led_on(self) -> int: + data: Dict[str, str] = {'IndicatorLED': 'Blinking'} + result = self.set_chassis_led(data) + return result + + def chassis_led_off(self) -> int: + data: Dict[str, str] = {'IndicatorLED': 'Lit'} + result = self.set_chassis_led(data) + return result + + def get_device_led(self, device: str) -> Dict[str, Any]: + endpoint = self._sys['storage'][device]['redfish_endpoint'] + try: + result = self.client.query(method='GET', + endpoint=endpoint, + timeout=10) + except HTTPError as e: + self.log.error(f"Couldn't get the ident device LED status for device '{device}': {e}") + raise + response_json = json.loads(result[1]) + _result: Dict[str, Any] = {'http_code': result[2]} + if result[2] == 200: + _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive'] + else: + _result['LocationIndicatorActive'] = None + return _result + + def set_device_led(self, device: str, data: Dict[str, bool]) -> int: + try: + _, response, status = self.client.query( + data=json.dumps(data), + method='PATCH', + endpoint=self._sys['storage'][device]['redfish_endpoint'] + ) + except (HTTPError, KeyError) as e: + self.log.error(f"Couldn't set the ident device LED for device '{device}': {e}") + raise + return status + + def get_chassis_led(self) -> Dict[str, Any]: + endpoint = f'/redfish/v1/{self.chassis_endpoint}' + try: + result = self.client.query(method='GET', + endpoint=endpoint, + timeout=10) + except HTTPError as e: + self.log.error(f"Couldn't get the ident chassis LED status: {e}") + raise + response_json = json.loads(result[1]) + _result: Dict[str, Any] = {'http_code': result[2]} + if result[2] == 200: + _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive'] + else: + _result['LocationIndicatorActive'] = None + return _result + + def set_chassis_led(self, data: Dict[str, str]) -> int: + # '{"IndicatorLED": "Lit"}' -> LocationIndicatorActive = false + # '{"IndicatorLED": "Blinking"}' -> LocationIndicatorActive = true + try: + _, response, status = self.client.query( + data=json.dumps(data), + method='PATCH', + endpoint=f'/redfish/v1{self.chassis_endpoint}' + ) + except HTTPError as e: + self.log.error(f"Couldn't set the ident chassis LED: {e}") + raise + return status + + def shutdown_host(self, force: bool = False) -> int: + reboot_type: str = 'GracefulRebootWithForcedShutdown' if force else 'GracefulRebootWithoutForcedShutdown' + + try: + job_id: str = self.create_reboot_job(reboot_type) + status = self.schedule_reboot_job(job_id) + except (HTTPError, KeyError) as e: + self.log.error(f"Couldn't create the reboot job: {e}") + raise + return status + + def powercycle(self) -> int: + try: + job_id: str = self.create_reboot_job('PowerCycle') + status = self.schedule_reboot_job(job_id) + except (HTTPError, URLError) as e: + self.log.error(f"Couldn't perform power cycle: {e}") + raise + return status + + def create_reboot_job(self, reboot_type: str) -> str: + data: Dict[str, str] = dict(RebootJobType=reboot_type) + try: + headers, response, status = self.client.query( + data=json.dumps(data), + endpoint=self.create_reboot_job_endpoint + ) + job_id: str = headers['Location'].split('/')[-1] + except (HTTPError, URLError) as e: + self.log.error(f"Couldn't create the reboot job: {e}") + raise + return job_id + + def schedule_reboot_job(self, job_id: str) -> int: + data: Dict[str, Union[List[str], str]] = dict(JobArray=[job_id], StartTimeInterval='TIME_NOW') + try: + headers, response, status = self.client.query( + data=json.dumps(data), + endpoint=self.setup_job_queue_endpoint + ) + except (HTTPError, KeyError) as e: + self.log.error(f"Couldn't schedule the reboot job: {e}") + raise + return status diff --git a/src/ceph-node-proxy/ceph_node_proxy/basesystem.py b/src/ceph-node-proxy/ceph_node_proxy/basesystem.py new file mode 100644 index 000000000..65eca55af --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/basesystem.py @@ -0,0 +1,96 @@ +import socket +from threading import Lock +from ceph_node_proxy.util import Config, get_logger, BaseThread +from typing import Dict, Any +from ceph_node_proxy.baseclient import BaseClient + + +class BaseSystem(BaseThread): + def __init__(self, **kw: Any) -> None: + super().__init__() + self.lock: Lock = Lock() + self._system: Dict = {} + self.config: Config = kw.get('config', {}) + self.client: BaseClient + self.log = get_logger(__name__) + + def main(self) -> None: + raise NotImplementedError() + + def get_system(self) -> Dict[str, Any]: + raise NotImplementedError() + + def get_status(self) -> Dict[str, Dict[str, Dict]]: + raise NotImplementedError() + + def get_metadata(self) -> Dict[str, Dict[str, Dict]]: + raise NotImplementedError() + + def get_processors(self) -> Dict[str, Dict[str, Dict]]: + raise NotImplementedError() + + def get_memory(self) -> Dict[str, Dict[str, Dict]]: + raise NotImplementedError() + + def get_fans(self) -> Dict[str, Dict[str, Dict]]: + raise NotImplementedError() + + def get_power(self) -> Dict[str, Dict[str, Dict]]: + raise NotImplementedError() + + def get_network(self) -> Dict[str, Dict[str, Dict]]: + raise NotImplementedError() + + def get_storage(self) -> Dict[str, Dict[str, Dict]]: + raise NotImplementedError() + + def get_firmwares(self) -> Dict[str, Dict[str, Dict]]: + raise NotImplementedError() + + def get_sn(self) -> str: + raise NotImplementedError() + + def get_led(self) -> Dict[str, Any]: + raise NotImplementedError() + + def set_led(self, data: Dict[str, str]) -> int: + raise NotImplementedError() + + def get_chassis_led(self) -> Dict[str, Any]: + raise NotImplementedError() + + def set_chassis_led(self, data: Dict[str, str]) -> int: + raise NotImplementedError() + + def device_led_on(self, device: str) -> int: + raise NotImplementedError() + + def device_led_off(self, device: str) -> int: + raise NotImplementedError() + + def get_device_led(self, device: str) -> Dict[str, Any]: + raise NotImplementedError() + + def set_device_led(self, device: str, data: Dict[str, bool]) -> int: + raise NotImplementedError() + + def chassis_led_on(self) -> int: + raise NotImplementedError() + + def chassis_led_off(self) -> int: + raise NotImplementedError() + + def get_host(self) -> str: + return socket.gethostname() + + def stop_update_loop(self) -> None: + raise NotImplementedError() + + def flush(self) -> None: + raise NotImplementedError() + + def shutdown_host(self, force: bool = False) -> int: + raise NotImplementedError() + + def powercycle(self) -> int: + raise NotImplementedError() diff --git a/src/ceph-node-proxy/ceph_node_proxy/main.py b/src/ceph-node-proxy/ceph_node_proxy/main.py new file mode 100644 index 000000000..9a449ecf8 --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/main.py @@ -0,0 +1,199 @@ +from ceph_node_proxy.redfishdellsystem import RedfishDellSystem +from ceph_node_proxy.api import NodeProxyApi +from ceph_node_proxy.reporter import Reporter +from ceph_node_proxy.util import Config, get_logger, http_req, write_tmp_file, CONFIG +from urllib.error import HTTPError +from typing import Dict, Any, Optional + +import argparse +import os +import ssl +import json +import time +import signal + + +class NodeProxyManager: + def __init__(self, **kw: Any) -> None: + self.exc: Optional[Exception] = None + self.log = get_logger(__name__) + self.mgr_host: str = kw['mgr_host'] + self.cephx_name: str = kw['cephx_name'] + self.cephx_secret: str = kw['cephx_secret'] + self.ca_path: str = kw['ca_path'] + self.api_ssl_crt: str = kw['api_ssl_crt'] + self.api_ssl_key: str = kw['api_ssl_key'] + self.mgr_agent_port: str = str(kw['mgr_agent_port']) + self.stop: bool = False + self.ssl_ctx = ssl.create_default_context() + self.ssl_ctx.check_hostname = True + self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED + self.ssl_ctx.load_verify_locations(self.ca_path) + self.reporter_scheme: str = kw.get('reporter_scheme', 'https') + self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data') + self.cephx = {'cephx': {'name': self.cephx_name, + 'secret': self.cephx_secret}} + self.config = Config('/etc/ceph/node-proxy.yml', config=CONFIG) + self.username: str = '' + self.password: str = '' + + def run(self) -> None: + self.init() + self.loop() + + def init(self) -> None: + self.init_system() + self.init_reporter() + self.init_api() + + def fetch_oob_details(self) -> Dict[str, str]: + try: + headers, result, status = http_req(hostname=self.mgr_host, + port=self.mgr_agent_port, + data=json.dumps(self.cephx), + endpoint='/node-proxy/oob', + ssl_ctx=self.ssl_ctx) + except HTTPError as e: + msg = f'No out of band tool details could be loaded: {e.code}, {e.reason}' + self.log.debug(msg) + raise + + result_json = json.loads(result) + oob_details: Dict[str, str] = { + 'host': result_json['result']['addr'], + 'username': result_json['result']['username'], + 'password': result_json['result']['password'], + 'port': result_json['result'].get('port', '443') + } + return oob_details + + def init_system(self) -> None: + try: + oob_details = self.fetch_oob_details() + self.username = oob_details['username'] + self.password = oob_details['password'] + except HTTPError: + self.log.warning('No oob details could be loaded, exiting...') + raise SystemExit(1) + try: + self.system = RedfishDellSystem(host=oob_details['host'], + port=oob_details['port'], + username=oob_details['username'], + password=oob_details['password'], + config=self.config) + self.system.start() + except RuntimeError: + self.log.error("Can't initialize the redfish system.") + raise + + def init_reporter(self) -> None: + try: + self.reporter_agent = Reporter(self.system, + self.cephx, + reporter_scheme=self.reporter_scheme, + reporter_hostname=self.mgr_host, + reporter_port=self.mgr_agent_port, + reporter_endpoint=self.reporter_endpoint) + self.reporter_agent.start() + except RuntimeError: + self.log.error("Can't initialize the reporter.") + raise + + def init_api(self) -> None: + try: + self.log.info('Starting node-proxy API...') + self.api = NodeProxyApi(self) + self.api.start() + except Exception as e: + self.log.error(f"Can't start node-proxy API: {e}") + raise + + def loop(self) -> None: + while not self.stop: + for thread in [self.system, self.reporter_agent]: + try: + status = thread.check_status() + label = 'Ok' if status else 'Critical' + self.log.debug(f'{thread} status: {label}') + except Exception as e: + self.log.error(f'{thread} not running: {e.__class__.__name__}: {e}') + thread.shutdown() + self.init_system() + self.init_reporter() + self.log.debug('All threads are alive, next check in 20sec.') + time.sleep(20) + + def shutdown(self) -> None: + self.stop = True + # if `self.system.shutdown()` is called before self.start(), it will fail. + if hasattr(self, 'api'): + self.api.shutdown() + if hasattr(self, 'reporter_agent'): + self.reporter_agent.shutdown() + if hasattr(self, 'system'): + self.system.shutdown() + + +def handler(signum: Any, frame: Any, t_mgr: 'NodeProxyManager') -> None: + t_mgr.system.pending_shutdown = True + t_mgr.log.info('SIGTERM caught, shutting down threads...') + t_mgr.shutdown() + t_mgr.log.info('Logging out from RedFish API') + t_mgr.system.client.logout() + raise SystemExit(0) + + +def main() -> None: + parser = argparse.ArgumentParser( + description='Ceph Node-Proxy for HW Monitoring', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument( + '--config', + help='path of config file in json format', + required=True + ) + parser.add_argument( + '--debug', + help='increase logging verbosity (debug level)', + action='store_true', + ) + + args = parser.parse_args() + if args.debug: + CONFIG['logging']['level'] = 10 + + if not os.path.exists(args.config): + raise Exception(f'No config file found at provided config path: {args.config}') + + with open(args.config, 'r') as f: + try: + config_json = f.read() + config = json.loads(config_json) + except Exception as e: + raise Exception(f'Failed to load json config: {str(e)}') + + target_ip = config['target_ip'] + target_port = config['target_port'] + keyring = config['keyring'] + root_cert = config['root_cert.pem'] + listener_cert = config['listener.crt'] + listener_key = config['listener.key'] + name = config['name'] + + ca_file = write_tmp_file(root_cert, + prefix_name='cephadm-endpoint-root-cert') + + node_proxy_mgr = NodeProxyManager(mgr_host=target_ip, + cephx_name=name, + cephx_secret=keyring, + mgr_agent_port=target_port, + ca_path=ca_file.name, + api_ssl_crt=listener_cert, + api_ssl_key=listener_key) + signal.signal(signal.SIGTERM, + lambda signum, frame: handler(signum, frame, node_proxy_mgr)) + node_proxy_mgr.run() + + +if __name__ == '__main__': + main() diff --git a/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py b/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py new file mode 100644 index 000000000..64a4e44df --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py @@ -0,0 +1,123 @@ +import json +from urllib.error import HTTPError, URLError +from ceph_node_proxy.baseclient import BaseClient +from ceph_node_proxy.util import get_logger, http_req +from typing import Dict, Any, Tuple, Optional +from http.client import HTTPMessage + + +class RedFishClient(BaseClient): + PREFIX = '/redfish/v1/' + + def __init__(self, + host: str = '', + port: str = '443', + username: str = '', + password: str = ''): + super().__init__(host, username, password) + self.log = get_logger(__name__) + self.log.info(f'Initializing redfish client {__name__}') + self.host: str = host + self.port: str = port + self.url: str = f'https://{self.host}:{self.port}' + self.token: str = '' + self.location: str = '' + + def login(self) -> None: + if not self.is_logged_in(): + self.log.info('Logging in to ' + f"{self.url} as '{self.username}'") + oob_credentials = json.dumps({'UserName': self.username, + 'Password': self.password}) + headers = {'Content-Type': 'application/json'} + location_endpoint: str = '' + + try: + _headers, _data, _status_code = self.query(data=oob_credentials, + headers=headers, + endpoint='/redfish/v1/SessionService/Sessions/') + if _status_code != 201: + self.log.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}") + raise RuntimeError + except URLError as e: + msg = f"Can't log in to {self.url} as '{self.username}': {e}" + self.log.error(msg) + raise RuntimeError + self.token = _headers['X-Auth-Token'] + if _headers['Location'].startswith('http'): + # We assume the value has the following format: + # scheme://address:port/redfish/v1/SessionService/Session + location_endpoint = f"/{_headers['Location'].split('/', 3)[-1:][0]}" + else: + location_endpoint = _headers['Location'] + self.location = location_endpoint + self.log.info(f'Logged in to {self.url}, Received header "Location": {self.location}') + + def is_logged_in(self) -> bool: + self.log.debug(f'Checking token validity for {self.url}') + if not self.location or not self.token: + self.log.debug(f'No token found for {self.url}.') + return False + headers = {'X-Auth-Token': self.token} + try: + _headers, _data, _status_code = self.query(headers=headers, + endpoint=self.location) + except URLError as e: + self.log.error("Can't check token " + f'validity for {self.url}: {e}') + raise + return _status_code == 200 + + def logout(self) -> Dict[str, Any]: + result: Dict[str, Any] = {} + try: + if self.is_logged_in(): + _, _data, _status_code = self.query(method='DELETE', + headers={'X-Auth-Token': self.token}, + endpoint=self.location) + result = json.loads(_data) + except URLError: + self.log.error(f"Can't log out from {self.url}") + + self.location = '' + self.token = '' + + return result + + def get_path(self, path: str) -> Dict[str, Any]: + if self.PREFIX not in path: + path = f'{self.PREFIX}{path}' + try: + _, result, _status_code = self.query(endpoint=path) + result_json = json.loads(result) + return result_json + except URLError as e: + self.log.error(f"Can't get path {path}:\n{e}") + raise RuntimeError + + def query(self, + data: Optional[str] = None, + headers: Dict[str, str] = {}, + method: Optional[str] = None, + endpoint: str = '', + timeout: int = 10) -> Tuple[HTTPMessage, str, int]: + _headers = headers.copy() if headers else {} + if self.token: + _headers['X-Auth-Token'] = self.token + if not _headers.get('Content-Type') and method in ['POST', 'PUT', 'PATCH']: + _headers['Content-Type'] = 'application/json' + try: + (response_headers, + response_str, + response_status) = http_req(hostname=self.host, + port=self.port, + endpoint=endpoint, + headers=_headers, + method=method, + data=data, + timeout=timeout) + + return response_headers, response_str, response_status + except (HTTPError, URLError) as e: + self.log.debug(f'{e}') + raise diff --git a/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py b/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py new file mode 100644 index 000000000..f0d24c667 --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py @@ -0,0 +1,166 @@ +from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem +from ceph_node_proxy.util import get_logger, normalize_dict, to_snake_case +from typing import Dict, Any, List + + +class RedfishDellSystem(BaseRedfishSystem): + def __init__(self, **kw: Any) -> None: + super().__init__(**kw) + self.log = get_logger(__name__) + self.job_service_endpoint: str = '/redfish/v1/Managers/iDRAC.Embedded.1/Oem/Dell/DellJobService' + self.create_reboot_job_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.CreateRebootJob' + self.setup_job_queue_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.SetupJobQueue' + + def build_common_data(self, + data: Dict[str, Any], + fields: List, + path: str) -> Dict[str, Dict[str, Dict]]: + result: Dict[str, Dict[str, Dict]] = dict() + for member_info in self.get_members(data, path): + member_id = member_info['Id'] + result[member_id] = dict() + for field in fields: + try: + result[member_id][to_snake_case(field)] = member_info[field] + except KeyError: + self.log.warning(f'Could not find field: {field} in member_info: {member_info}') + + return normalize_dict(result) + + def build_chassis_data(self, + fields: Dict[str, List[str]], + path: str) -> Dict[str, Dict[str, Dict]]: + result: Dict[str, Dict[str, Dict]] = dict() + data = self._get_path(f'{self.chassis_endpoint}/{path}') + + for elt, _fields in fields.items(): + for member_elt in data[elt]: + _id = member_elt['MemberId'] + result[_id] = dict() + for field in _fields: + try: + result[_id][to_snake_case(field)] = member_elt[field] + except KeyError: + self.log.warning(f'Could not find field: {field} in data: {data[elt]}') + return normalize_dict(result) + + def get_sn(self) -> str: + return self._sys['SKU'] + + def get_status(self) -> Dict[str, Dict[str, Dict]]: + return self._sys['status'] + + def get_memory(self) -> Dict[str, Dict[str, Dict]]: + return self._sys['memory'] + + def get_processors(self) -> Dict[str, Dict[str, Dict]]: + return self._sys['processors'] + + def get_network(self) -> Dict[str, Dict[str, Dict]]: + return self._sys['network'] + + def get_storage(self) -> Dict[str, Dict[str, Dict]]: + return self._sys['storage'] + + def get_firmwares(self) -> Dict[str, Dict[str, Dict]]: + return self._sys['firmwares'] + + def get_power(self) -> Dict[str, Dict[str, Dict]]: + return self._sys['power'] + + def get_fans(self) -> Dict[str, Dict[str, Dict]]: + return self._sys['fans'] + + def _update_network(self) -> None: + fields = ['Description', 'Name', 'SpeedMbps', 'Status'] + self.log.debug('Updating network') + self._sys['network'] = self.build_common_data(data=self._system['Systems'], + fields=fields, + path='EthernetInterfaces') + + def _update_processors(self) -> None: + fields = ['Description', + 'TotalCores', + 'TotalThreads', + 'ProcessorType', + 'Model', + 'Status', + 'Manufacturer'] + self.log.debug('Updating processors') + self._sys['processors'] = self.build_common_data(data=self._system['Systems'], + fields=fields, + path='Processors') + + def _update_storage(self) -> None: + fields = ['Description', + 'CapacityBytes', + 'Model', 'Protocol', + 'LocationIndicatorActive', + 'SerialNumber', 'Status', + 'PhysicalLocation'] + entities = self.get_members(data=self._system['Systems'], + path='Storage') + self.log.debug('Updating storage') + result: Dict[str, Dict[str, Dict]] = dict() + for entity in entities: + for drive in entity['Drives']: + drive_path = drive['@odata.id'] + drive_info = self._get_path(drive_path) + drive_id = drive_info['Id'] + result[drive_id] = dict() + result[drive_id]['redfish_endpoint'] = drive['@odata.id'] + for field in fields: + result[drive_id][to_snake_case(field)] = drive_info[field] + result[drive_id]['entity'] = entity['Id'] + self._sys['storage'] = normalize_dict(result) + + def _update_sn(self) -> None: + self.log.debug('Updating serial number') + self._sys['SKU'] = self._system['Systems']['SKU'] + + def _update_memory(self) -> None: + fields = ['Description', + 'MemoryDeviceType', + 'CapacityMiB', + 'Status'] + self.log.debug('Updating memory') + self._sys['memory'] = self.build_common_data(data=self._system['Systems'], + fields=fields, + path='Memory') + + def _update_power(self) -> None: + fields = { + 'PowerSupplies': [ + 'Name', + 'Model', + 'Manufacturer', + 'Status' + ] + } + self.log.debug('Updating powersupplies') + self._sys['power'] = self.build_chassis_data(fields, 'Power') + + def _update_fans(self) -> None: + fields = { + 'Fans': [ + 'Name', + 'PhysicalContext', + 'Status' + ], + } + self.log.debug('Updating fans') + self._sys['fans'] = self.build_chassis_data(fields, 'Thermal') + + def _update_firmwares(self) -> None: + fields = [ + 'Name', + 'Description', + 'ReleaseDate', + 'Version', + 'Updateable', + 'Status', + ] + self.log.debug('Updating firmwares') + self._sys['firmwares'] = self.build_common_data(data=self._system['UpdateService'], + fields=fields, + path='FirmwareInventory') diff --git a/src/ceph-node-proxy/ceph_node_proxy/reporter.py b/src/ceph-node-proxy/ceph_node_proxy/reporter.py new file mode 100644 index 000000000..20d43b59d --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/reporter.py @@ -0,0 +1,69 @@ +import time +import json +from ceph_node_proxy.util import get_logger, http_req, BaseThread +from urllib.error import HTTPError, URLError +from typing import Dict, Any + + +class Reporter(BaseThread): + def __init__(self, + system: Any, + cephx: Dict[str, Any], + reporter_scheme: str = 'https', + reporter_hostname: str = '', + reporter_port: str = '443', + reporter_endpoint: str = '/node-proxy/data') -> None: + super().__init__() + self.system = system + self.data: Dict[str, Any] = {} + self.stop: bool = False + self.cephx = cephx + self.data['cephx'] = self.cephx['cephx'] + self.reporter_scheme: str = reporter_scheme + self.reporter_hostname: str = reporter_hostname + self.reporter_port: str = reporter_port + self.reporter_endpoint: str = reporter_endpoint + self.log = get_logger(__name__) + self.reporter_url: str = (f'{reporter_scheme}://{reporter_hostname}:' + f'{reporter_port}{reporter_endpoint}') + self.log.info(f'Reporter url set to {self.reporter_url}') + + def main(self) -> None: + while not self.stop: + # Any logic to avoid sending the all the system + # information every loop can go here. In a real + # scenario probably we should just send the sub-parts + # that have changed to minimize the traffic in + # dense clusters + self.log.debug('waiting for a lock in reporter loop.') + with self.system.lock: + if not self.system.pending_shutdown: + self.log.debug('lock acquired in reporter loop.') + if self.system.data_ready: + self.log.debug('data ready to be sent to the mgr.') + if not self.system.get_system() == self.system.previous_data: + self.log.info('data has changed since last iteration.') + self.data['patch'] = self.system.get_system() + try: + # TODO: add a timeout parameter to the reporter in the config file + self.log.info(f'sending data to {self.reporter_url}') + http_req(hostname=self.reporter_hostname, + port=self.reporter_port, + method='POST', + headers={'Content-Type': 'application/json'}, + endpoint=self.reporter_endpoint, + scheme=self.reporter_scheme, + data=json.dumps(self.data)) + except (HTTPError, URLError) as e: + self.log.error(f"The reporter couldn't send data to the mgr: {e}") + raise + # Need to add a new parameter 'max_retries' to the reporter if it can't + # send the data for more than x times, maybe the daemon should stop altogether + else: + self.system.previous_data = self.system.get_system() + else: + self.log.debug('no diff, not sending data to the mgr.') + self.log.debug('lock released in reporter loop.') + time.sleep(5) + self.log.debug('exiting reporter loop.') + raise SystemExit(0) diff --git a/src/ceph-node-proxy/ceph_node_proxy/util.py b/src/ceph-node-proxy/ceph_node_proxy/util.py new file mode 100644 index 000000000..677161c63 --- /dev/null +++ b/src/ceph-node-proxy/ceph_node_proxy/util.py @@ -0,0 +1,196 @@ +import logging +import yaml +import os +import time +import re +import ssl +import traceback +import threading +from tempfile import NamedTemporaryFile, _TemporaryFileWrapper +from urllib.error import HTTPError, URLError +from urllib.request import urlopen, Request +from typing import Dict, Callable, Any, Optional, MutableMapping, Tuple, Union + + +CONFIG: Dict[str, Any] = { + 'reporter': { + 'check_interval': 5, + 'push_data_max_retries': 30, + 'endpoint': 'https://%(mgr_host):%(mgr_port)/node-proxy/data', + }, + 'system': { + 'refresh_interval': 5 + }, + 'api': { + 'port': 9456, + }, + 'logging': { + 'level': logging.INFO, + } +} + + +def get_logger(name: str, level: Union[int, str] = logging.NOTSET) -> logging.Logger: + if level == logging.NOTSET: + log_level = CONFIG['logging']['level'] + logger = logging.getLogger(name) + logger.setLevel(log_level) + handler = logging.StreamHandler() + handler.setLevel(log_level) + fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(fmt) + logger.handlers.clear() + logger.addHandler(handler) + logger.propagate = False + + return logger + + +logger = get_logger(__name__) + + +class Config: + def __init__(self, + config_file: str = '/etc/ceph/node-proxy.yaml', + config: Dict[str, Any] = {}) -> None: + self.config_file = config_file + self.config = config + + self.load_config() + + def load_config(self) -> None: + if os.path.exists(self.config_file): + with open(self.config_file, 'r') as f: + self.config = yaml.safe_load(f) + else: + self.config = self.config + + for k, v in self.config.items(): + if k not in self.config.keys(): + self.config[k] = v + + for k, v in self.config.items(): + setattr(self, k, v) + + def reload(self, config_file: str = '') -> None: + if config_file != '': + self.config_file = config_file + self.load_config() + + +class BaseThread(threading.Thread): + def __init__(self) -> None: + super().__init__() + self.exc: Optional[Exception] = None + self.stop: bool = False + self.daemon = True + self.name = self.__class__.__name__ + self.log: logging.Logger = get_logger(__name__) + self.pending_shutdown: bool = False + + def run(self) -> None: + logger.info(f'Starting {self.name}') + try: + self.main() + except Exception as e: + self.exc = e + return + + def shutdown(self) -> None: + self.stop = True + self.pending_shutdown = True + + def check_status(self) -> bool: + logger.debug(f'Checking status of {self.name}') + if self.exc: + traceback.print_tb(self.exc.__traceback__) + logger.error(f'Caught exception: {self.exc.__class__.__name__}') + raise self.exc + if not self.is_alive(): + logger.info(f'{self.name} not alive') + self.start() + return True + + def main(self) -> None: + raise NotImplementedError() + + +def to_snake_case(name: str) -> str: + name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) + return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower() + + +def normalize_dict(test_dict: Dict) -> Dict: + res = dict() + for key in test_dict.keys(): + if isinstance(test_dict[key], dict): + res[key.lower()] = normalize_dict(test_dict[key]) + else: + if test_dict[key] is None: + test_dict[key] = 'unknown' + res[key.lower()] = test_dict[key] + return res + + +def retry(exceptions: Any = Exception, retries: int = 20, delay: int = 1) -> Callable: + def decorator(f: Callable) -> Callable: + def _retry(*args: str, **kwargs: Any) -> Callable: + _tries = retries + while _tries > 1: + try: + logger.debug('{} {} attempt(s) left.'.format(f, _tries - 1)) + return f(*args, **kwargs) + except exceptions: + time.sleep(delay) + _tries -= 1 + logger.warn('{} has failed after {} tries'.format(f, retries)) + return f(*args, **kwargs) + return _retry + return decorator + + +def http_req(hostname: str = '', + port: str = '443', + method: Optional[str] = None, + headers: MutableMapping[str, str] = {}, + data: Optional[str] = None, + endpoint: str = '/', + scheme: str = 'https', + ssl_verify: bool = False, + timeout: Optional[int] = None, + ssl_ctx: Optional[Any] = None) -> Tuple[Any, Any, Any]: + + if not ssl_ctx: + ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + if not ssl_verify: + ssl_ctx.check_hostname = False + ssl_ctx.verify_mode = ssl.CERT_NONE + else: + ssl_ctx.verify_mode = ssl.CERT_REQUIRED + + url: str = f'{scheme}://{hostname}:{port}{endpoint}' + _data = bytes(data, 'ascii') if data else None + _headers = headers + if data and not method: + method = 'POST' + if not _headers.get('Content-Type') and method in ['POST', 'PATCH']: + _headers['Content-Type'] = 'application/json' + try: + req = Request(url, _data, _headers, method=method) + with urlopen(req, context=ssl_ctx, timeout=timeout) as response: + response_str = response.read() + response_headers = response.headers + response_code = response.code + return response_headers, response_str.decode(), response_code + except (HTTPError, URLError) as e: + print(f'{e}') + # handle error here if needed + raise + + +def write_tmp_file(data: str, prefix_name: str = 'node-proxy-') -> _TemporaryFileWrapper: + f = NamedTemporaryFile(prefix=prefix_name) + os.fchmod(f.fileno(), 0o600) + f.write(data.encode('utf-8')) + f.flush() + return f |