summaryrefslogtreecommitdiffstats
path: root/src/ceph-node-proxy/ceph_node_proxy
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-23 16:45:17 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-23 16:45:44 +0000
commit17d6a993fc17d533460c5f40f3908c708e057c18 (patch)
tree1a3bd93e0ecd74fa02f93a528fe2f87e5314c4b5 /src/ceph-node-proxy/ceph_node_proxy
parentReleasing progress-linux version 18.2.2-0progress7.99u1. (diff)
downloadceph-17d6a993fc17d533460c5f40f3908c708e057c18.tar.xz
ceph-17d6a993fc17d533460c5f40f3908c708e057c18.zip
Merging upstream version 18.2.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/ceph-node-proxy/ceph_node_proxy')
-rw-r--r--src/ceph-node-proxy/ceph_node_proxy/__init__.py2
-rw-r--r--src/ceph-node-proxy/ceph_node_proxy/api.py285
-rw-r--r--src/ceph-node-proxy/ceph_node_proxy/baseclient.py20
-rw-r--r--src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py283
-rw-r--r--src/ceph-node-proxy/ceph_node_proxy/basesystem.py96
-rw-r--r--src/ceph-node-proxy/ceph_node_proxy/main.py199
-rw-r--r--src/ceph-node-proxy/ceph_node_proxy/redfish_client.py123
-rw-r--r--src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py166
-rw-r--r--src/ceph-node-proxy/ceph_node_proxy/reporter.py69
-rw-r--r--src/ceph-node-proxy/ceph_node_proxy/util.py196
10 files changed, 1439 insertions, 0 deletions
diff --git a/src/ceph-node-proxy/ceph_node_proxy/__init__.py b/src/ceph-node-proxy/ceph_node_proxy/__init__.py
new file mode 100644
index 000000000..20403aa92
--- /dev/null
+++ b/src/ceph-node-proxy/ceph_node_proxy/__init__.py
@@ -0,0 +1,2 @@
+__version__ = '1.0.0'
+__release__ = 'squid'
diff --git a/src/ceph-node-proxy/ceph_node_proxy/api.py b/src/ceph-node-proxy/ceph_node_proxy/api.py
new file mode 100644
index 000000000..25ae03e51
--- /dev/null
+++ b/src/ceph-node-proxy/ceph_node_proxy/api.py
@@ -0,0 +1,285 @@
+import cherrypy # type: ignore
+from urllib.error import HTTPError
+from cherrypy._cpserver import Server # type: ignore
+from threading import Thread, Event
+from typing import Dict, Any, List
+from ceph_node_proxy.util import Config, get_logger, write_tmp_file
+from ceph_node_proxy.basesystem import BaseSystem
+from ceph_node_proxy.reporter import Reporter
+from typing import TYPE_CHECKING, Optional
+
+if TYPE_CHECKING:
+ from ceph_node_proxy.main import NodeProxyManager
+
+
+@cherrypy.tools.auth_basic(on=True)
+@cherrypy.tools.allow(methods=['PUT'])
+@cherrypy.tools.json_out()
+class Admin():
+ def __init__(self, api: 'API') -> None:
+ self.api = api
+
+ @cherrypy.expose
+ def start(self) -> Dict[str, str]:
+ self.api.backend.start()
+ self.api.reporter.run()
+ return {'ok': 'node-proxy daemon started'}
+
+ @cherrypy.expose
+ def reload(self) -> Dict[str, str]:
+ self.api.config.reload()
+ return {'ok': 'node-proxy config reloaded'}
+
+ def _stop(self) -> None:
+ self.api.backend.shutdown()
+ self.api.reporter.shutdown()
+
+ @cherrypy.expose
+ def stop(self) -> Dict[str, str]:
+ self._stop()
+ return {'ok': 'node-proxy daemon stopped'}
+
+ @cherrypy.expose
+ def shutdown(self) -> Dict[str, str]:
+ self._stop()
+ cherrypy.engine.exit()
+ return {'ok': 'Server shutdown.'}
+
+ @cherrypy.expose
+ def flush(self) -> Dict[str, str]:
+ self.api.backend.flush()
+ return {'ok': 'node-proxy data flushed'}
+
+
+class API(Server):
+ def __init__(self,
+ backend: 'BaseSystem',
+ reporter: 'Reporter',
+ config: 'Config',
+ addr: str = '0.0.0.0',
+ port: int = 0) -> None:
+ super().__init__()
+ self.log = get_logger(__name__)
+ self.backend = backend
+ self.reporter = reporter
+ self.config = config
+ self.socket_port = self.config.__dict__['api']['port'] if not port else port
+ self.socket_host = addr
+ self.subscribe()
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def memory(self) -> Dict[str, Any]:
+ return {'memory': self.backend.get_memory()}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def network(self) -> Dict[str, Any]:
+ return {'network': self.backend.get_network()}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def processors(self) -> Dict[str, Any]:
+ return {'processors': self.backend.get_processors()}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def storage(self) -> Dict[str, Any]:
+ return {'storage': self.backend.get_storage()}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def power(self) -> Dict[str, Any]:
+ return {'power': self.backend.get_power()}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def fans(self) -> Dict[str, Any]:
+ return {'fans': self.backend.get_fans()}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def firmwares(self) -> Dict[str, Any]:
+ return {'firmwares': self.backend.get_firmwares()}
+
+ def _cp_dispatch(self, vpath: List[str]) -> 'API':
+ if vpath[0] == 'led' and len(vpath) > 1: # /led/{type}/{id}
+ _type = vpath[1]
+ cherrypy.request.params['type'] = _type
+ vpath.pop(1) # /led/{id} or # /led
+ if _type == 'drive' and len(vpath) > 1: # /led/{id}
+ _id = vpath[1]
+ vpath.pop(1) # /led
+ cherrypy.request.params['id'] = _id
+ vpath[0] = '_led'
+ # /<endpoint>
+ return self
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['POST'])
+ @cherrypy.tools.json_in()
+ @cherrypy.tools.json_out()
+ @cherrypy.tools.auth_basic(on=True)
+ def shutdown(self, **kw: Any) -> int:
+ data: Dict[str, bool] = cherrypy.request.json
+
+ if 'force' not in data.keys():
+ msg = "The key 'force' wasn't passed."
+ self.log.debug(msg)
+ raise cherrypy.HTTPError(400, msg)
+ try:
+ result: int = self.backend.shutdown_host(force=data['force'])
+ except HTTPError as e:
+ raise cherrypy.HTTPError(e.code, e.reason)
+ return result
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['POST'])
+ @cherrypy.tools.json_in()
+ @cherrypy.tools.json_out()
+ @cherrypy.tools.auth_basic(on=True)
+ def powercycle(self, **kw: Any) -> int:
+ try:
+ result: int = self.backend.powercycle()
+ except HTTPError as e:
+ raise cherrypy.HTTPError(e.code, e.reason)
+ return result
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET', 'PATCH'])
+ @cherrypy.tools.json_in()
+ @cherrypy.tools.json_out()
+ @cherrypy.tools.auth_basic(on=True)
+ def _led(self, **kw: Any) -> Dict[str, Any]:
+ method: str = cherrypy.request.method
+ led_type: Optional[str] = kw.get('type')
+ id_drive: Optional[str] = kw.get('id')
+ result: Dict[str, Any] = dict()
+
+ if not led_type:
+ msg = "the led type must be provided (either 'chassis' or 'drive')."
+ self.log.debug(msg)
+ raise cherrypy.HTTPError(400, msg)
+
+ if led_type == 'drive':
+ id_drive_required = not id_drive
+ if id_drive_required or id_drive not in self.backend.get_storage():
+ msg = 'A valid device ID must be provided.'
+ self.log.debug(msg)
+ raise cherrypy.HTTPError(400, msg)
+
+ try:
+ if method == 'PATCH':
+ data: Dict[str, Any] = cherrypy.request.json
+
+ if 'state' not in data or data['state'] not in ['on', 'off']:
+ msg = "Invalid data. 'state' must be provided and have a valid value (on|off)."
+ self.log.error(msg)
+ raise cherrypy.HTTPError(400, msg)
+
+ func: Any = (self.backend.device_led_on if led_type == 'drive' and data['state'] == 'on' else
+ self.backend.device_led_off if led_type == 'drive' and data['state'] == 'off' else
+ self.backend.chassis_led_on if led_type != 'drive' and data['state'] == 'on' else
+ self.backend.chassis_led_off if led_type != 'drive' and data['state'] == 'off' else None)
+
+ else:
+ func = self.backend.get_device_led if led_type == 'drive' else self.backend.get_chassis_led
+
+ result = func(id_drive) if led_type == 'drive' else func()
+
+ except HTTPError as e:
+ raise cherrypy.HTTPError(e.code, e.reason)
+ return result
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def get_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]:
+ return self.backend.get_led()
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['PATCH'])
+ @cherrypy.tools.json_in()
+ @cherrypy.tools.json_out()
+ @cherrypy.tools.auth_basic(on=True)
+ def set_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]:
+ data = cherrypy.request.json
+ rc = self.backend.set_led(data)
+
+ if rc != 200:
+ cherrypy.response.status = rc
+ result = {'state': 'error: please, verify the data you sent.'}
+ else:
+ result = {'state': data['state'].lower()}
+ return result
+
+ def stop(self) -> None:
+ self.unsubscribe()
+ super().stop()
+
+
+class NodeProxyApi(Thread):
+ def __init__(self, node_proxy_mgr: 'NodeProxyManager') -> None:
+ super().__init__()
+ self.log = get_logger(__name__)
+ self.cp_shutdown_event = Event()
+ self.node_proxy_mgr = node_proxy_mgr
+ self.username = self.node_proxy_mgr.username
+ self.password = self.node_proxy_mgr.password
+ self.ssl_crt = self.node_proxy_mgr.api_ssl_crt
+ self.ssl_key = self.node_proxy_mgr.api_ssl_key
+ self.system = self.node_proxy_mgr.system
+ self.reporter_agent = self.node_proxy_mgr.reporter_agent
+ self.config = self.node_proxy_mgr.config
+ self.api = API(self.system, self.reporter_agent, self.config)
+
+ def check_auth(self, realm: str, username: str, password: str) -> bool:
+ return self.username == username and \
+ self.password == password
+
+ def shutdown(self) -> None:
+ self.log.info('Stopping node-proxy API...')
+ self.cp_shutdown_event.set()
+
+ def run(self) -> None:
+ self.log.info('node-proxy API configuration...')
+ cherrypy.config.update({
+ 'environment': 'production',
+ 'engine.autoreload.on': False,
+ 'log.screen': True,
+ })
+ config = {'/': {
+ 'request.methods_with_bodies': ('POST', 'PUT', 'PATCH'),
+ 'tools.trailing_slash.on': False,
+ 'tools.auth_basic.realm': 'localhost',
+ 'tools.auth_basic.checkpassword': self.check_auth
+ }}
+ cherrypy.tree.mount(self.api, '/', config=config)
+ # cherrypy.tree.mount(admin, '/admin', config=config)
+
+ ssl_crt = write_tmp_file(self.ssl_crt,
+ prefix_name='listener-crt-')
+ ssl_key = write_tmp_file(self.ssl_key,
+ prefix_name='listener-key-')
+
+ self.api.ssl_certificate = ssl_crt.name
+ self.api.ssl_private_key = ssl_key.name
+
+ cherrypy.server.unsubscribe()
+ try:
+ cherrypy.engine.start()
+ self.log.info('node-proxy API started.')
+ self.cp_shutdown_event.wait()
+ self.cp_shutdown_event.clear()
+ cherrypy.engine.exit()
+ cherrypy.server.httpserver = None
+ self.log.info('node-proxy API shutdown.')
+ except Exception as e:
+ self.log.error(f'node-proxy API error: {e}')
diff --git a/src/ceph-node-proxy/ceph_node_proxy/baseclient.py b/src/ceph-node-proxy/ceph_node_proxy/baseclient.py
new file mode 100644
index 000000000..6b4656148
--- /dev/null
+++ b/src/ceph-node-proxy/ceph_node_proxy/baseclient.py
@@ -0,0 +1,20 @@
+from typing import Dict, Any
+
+
+class BaseClient:
+ def __init__(self,
+ host: str,
+ username: str,
+ password: str) -> None:
+ self.host = host
+ self.username = username
+ self.password = password
+
+ def login(self) -> None:
+ raise NotImplementedError()
+
+ def logout(self) -> Dict[str, Any]:
+ raise NotImplementedError()
+
+ def get_path(self, path: str) -> Dict:
+ raise NotImplementedError()
diff --git a/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py b/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py
new file mode 100644
index 000000000..ea4e65cc6
--- /dev/null
+++ b/src/ceph-node-proxy/ceph_node_proxy/baseredfishsystem.py
@@ -0,0 +1,283 @@
+import concurrent.futures
+import json
+from ceph_node_proxy.basesystem import BaseSystem
+from ceph_node_proxy.redfish_client import RedFishClient
+from time import sleep
+from ceph_node_proxy.util import get_logger
+from typing import Dict, Any, List, Callable, Union
+from urllib.error import HTTPError, URLError
+
+
+class BaseRedfishSystem(BaseSystem):
+ def __init__(self, **kw: Any) -> None:
+ super().__init__(**kw)
+ self.common_endpoints: List[str] = kw.get('common_endpoints', ['/Systems/System.Embedded.1',
+ '/UpdateService'])
+ self.chassis_endpoint: str = kw.get('chassis_endpoint', '/Chassis/System.Embedded.1')
+ self.log = get_logger(__name__)
+ self.host: str = kw['host']
+ self.port: str = kw['port']
+ self.username: str = kw['username']
+ self.password: str = kw['password']
+ # move the following line (class attribute?)
+ self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password)
+ self.log.info(f'redfish system initialization, host: {self.host}, user: {self.username}')
+ self.data_ready: bool = False
+ self.previous_data: Dict = {}
+ self.data: Dict[str, Dict[str, Any]] = {}
+ self._system: Dict[str, Dict[str, Any]] = {}
+ self._sys: Dict[str, Any] = {}
+ self.job_service_endpoint: str = ''
+ self.create_reboot_job_endpoint: str = ''
+ self.setup_job_queue_endpoint: str = ''
+ self.component_list: List[str] = kw.get('component_list', ['memory',
+ 'power',
+ 'fans',
+ 'network',
+ 'processors',
+ 'storage',
+ 'firmwares'])
+ self.update_funcs: List[Callable] = []
+ for component in self.component_list:
+ self.log.debug(f'adding: {component} to hw component gathered list.')
+ func = f'_update_{component}'
+ if hasattr(self, func):
+ f = getattr(self, func)
+ self.update_funcs.append(f)
+
+ def main(self) -> None:
+ self.stop = False
+ self.client.login()
+ while not self.stop:
+ self.log.debug('waiting for a lock in the update loop.')
+ with self.lock:
+ if not self.pending_shutdown:
+ self.log.debug('lock acquired in the update loop.')
+ try:
+ self._update_system()
+ self._update_sn()
+
+ with concurrent.futures.ThreadPoolExecutor() as executor:
+ executor.map(lambda f: f(), self.update_funcs)
+
+ self.data_ready = True
+ except RuntimeError as e:
+ self.stop = True
+ self.log.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}')
+ self.client.logout()
+ raise
+ sleep(5)
+ self.log.debug('lock released in the update loop.')
+ self.log.debug('exiting update loop.')
+ raise SystemExit(0)
+
+ def flush(self) -> None:
+ self.log.debug('Acquiring lock to flush data.')
+ self.lock.acquire()
+ self.log.debug('Lock acquired, flushing data.')
+ self._system = {}
+ self.previous_data = {}
+ self.log.info('Data flushed.')
+ self.data_ready = False
+ self.log.debug('Data marked as not ready.')
+ self.lock.release()
+ self.log.debug('Released the lock after flushing data.')
+
+ # @retry(retries=10, delay=2)
+ def _get_path(self, path: str) -> Dict:
+ result: Dict[str, Any] = {}
+ try:
+ if not self.pending_shutdown:
+ self.log.debug(f'Getting path: {path}')
+ result = self.client.get_path(path)
+ else:
+ self.log.debug(f'Pending shutdown, aborting query to {path}')
+ except RuntimeError:
+ raise
+ if result is None:
+ self.log.error(f'The client reported an error when getting path: {path}')
+ raise RuntimeError(f'Could not get path: {path}')
+ return result
+
+ def get_members(self, data: Dict[str, Any], path: str) -> List:
+ _path = data[path]['@odata.id']
+ _data = self._get_path(_path)
+ return [self._get_path(member['@odata.id']) for member in _data['Members']]
+
+ def get_system(self) -> Dict[str, Any]:
+ result = {
+ 'host': self.get_host(),
+ 'sn': self.get_sn(),
+ 'status': {
+ 'storage': self.get_storage(),
+ 'processors': self.get_processors(),
+ 'network': self.get_network(),
+ 'memory': self.get_memory(),
+ 'power': self.get_power(),
+ 'fans': self.get_fans()
+ },
+ 'firmwares': self.get_firmwares(),
+ 'chassis': {'redfish_endpoint': f'/redfish/v1{self.chassis_endpoint}'} # TODO(guits): not ideal
+ }
+ return result
+
+ def _update_system(self) -> None:
+ for endpoint in self.common_endpoints:
+ result = self.client.get_path(endpoint)
+ _endpoint = endpoint.strip('/').split('/')[0]
+ self._system[_endpoint] = result
+
+ def _update_sn(self) -> None:
+ raise NotImplementedError()
+
+ def _update_memory(self) -> None:
+ raise NotImplementedError()
+
+ def _update_power(self) -> None:
+ raise NotImplementedError()
+
+ def _update_fans(self) -> None:
+ raise NotImplementedError()
+
+ def _update_network(self) -> None:
+ raise NotImplementedError()
+
+ def _update_processors(self) -> None:
+ raise NotImplementedError()
+
+ def _update_storage(self) -> None:
+ raise NotImplementedError()
+
+ def _update_firmwares(self) -> None:
+ raise NotImplementedError()
+
+ def device_led_on(self, device: str) -> int:
+ data: Dict[str, bool] = {'LocationIndicatorActive': True}
+ try:
+ result = self.set_device_led(device, data)
+ except (HTTPError, KeyError):
+ return 0
+ return result
+
+ def device_led_off(self, device: str) -> int:
+ data: Dict[str, bool] = {'LocationIndicatorActive': False}
+ try:
+ result = self.set_device_led(device, data)
+ except (HTTPError, KeyError):
+ return 0
+ return result
+
+ def chassis_led_on(self) -> int:
+ data: Dict[str, str] = {'IndicatorLED': 'Blinking'}
+ result = self.set_chassis_led(data)
+ return result
+
+ def chassis_led_off(self) -> int:
+ data: Dict[str, str] = {'IndicatorLED': 'Lit'}
+ result = self.set_chassis_led(data)
+ return result
+
+ def get_device_led(self, device: str) -> Dict[str, Any]:
+ endpoint = self._sys['storage'][device]['redfish_endpoint']
+ try:
+ result = self.client.query(method='GET',
+ endpoint=endpoint,
+ timeout=10)
+ except HTTPError as e:
+ self.log.error(f"Couldn't get the ident device LED status for device '{device}': {e}")
+ raise
+ response_json = json.loads(result[1])
+ _result: Dict[str, Any] = {'http_code': result[2]}
+ if result[2] == 200:
+ _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
+ else:
+ _result['LocationIndicatorActive'] = None
+ return _result
+
+ def set_device_led(self, device: str, data: Dict[str, bool]) -> int:
+ try:
+ _, response, status = self.client.query(
+ data=json.dumps(data),
+ method='PATCH',
+ endpoint=self._sys['storage'][device]['redfish_endpoint']
+ )
+ except (HTTPError, KeyError) as e:
+ self.log.error(f"Couldn't set the ident device LED for device '{device}': {e}")
+ raise
+ return status
+
+ def get_chassis_led(self) -> Dict[str, Any]:
+ endpoint = f'/redfish/v1/{self.chassis_endpoint}'
+ try:
+ result = self.client.query(method='GET',
+ endpoint=endpoint,
+ timeout=10)
+ except HTTPError as e:
+ self.log.error(f"Couldn't get the ident chassis LED status: {e}")
+ raise
+ response_json = json.loads(result[1])
+ _result: Dict[str, Any] = {'http_code': result[2]}
+ if result[2] == 200:
+ _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
+ else:
+ _result['LocationIndicatorActive'] = None
+ return _result
+
+ def set_chassis_led(self, data: Dict[str, str]) -> int:
+ # '{"IndicatorLED": "Lit"}' -> LocationIndicatorActive = false
+ # '{"IndicatorLED": "Blinking"}' -> LocationIndicatorActive = true
+ try:
+ _, response, status = self.client.query(
+ data=json.dumps(data),
+ method='PATCH',
+ endpoint=f'/redfish/v1{self.chassis_endpoint}'
+ )
+ except HTTPError as e:
+ self.log.error(f"Couldn't set the ident chassis LED: {e}")
+ raise
+ return status
+
+ def shutdown_host(self, force: bool = False) -> int:
+ reboot_type: str = 'GracefulRebootWithForcedShutdown' if force else 'GracefulRebootWithoutForcedShutdown'
+
+ try:
+ job_id: str = self.create_reboot_job(reboot_type)
+ status = self.schedule_reboot_job(job_id)
+ except (HTTPError, KeyError) as e:
+ self.log.error(f"Couldn't create the reboot job: {e}")
+ raise
+ return status
+
+ def powercycle(self) -> int:
+ try:
+ job_id: str = self.create_reboot_job('PowerCycle')
+ status = self.schedule_reboot_job(job_id)
+ except (HTTPError, URLError) as e:
+ self.log.error(f"Couldn't perform power cycle: {e}")
+ raise
+ return status
+
+ def create_reboot_job(self, reboot_type: str) -> str:
+ data: Dict[str, str] = dict(RebootJobType=reboot_type)
+ try:
+ headers, response, status = self.client.query(
+ data=json.dumps(data),
+ endpoint=self.create_reboot_job_endpoint
+ )
+ job_id: str = headers['Location'].split('/')[-1]
+ except (HTTPError, URLError) as e:
+ self.log.error(f"Couldn't create the reboot job: {e}")
+ raise
+ return job_id
+
+ def schedule_reboot_job(self, job_id: str) -> int:
+ data: Dict[str, Union[List[str], str]] = dict(JobArray=[job_id], StartTimeInterval='TIME_NOW')
+ try:
+ headers, response, status = self.client.query(
+ data=json.dumps(data),
+ endpoint=self.setup_job_queue_endpoint
+ )
+ except (HTTPError, KeyError) as e:
+ self.log.error(f"Couldn't schedule the reboot job: {e}")
+ raise
+ return status
diff --git a/src/ceph-node-proxy/ceph_node_proxy/basesystem.py b/src/ceph-node-proxy/ceph_node_proxy/basesystem.py
new file mode 100644
index 000000000..65eca55af
--- /dev/null
+++ b/src/ceph-node-proxy/ceph_node_proxy/basesystem.py
@@ -0,0 +1,96 @@
+import socket
+from threading import Lock
+from ceph_node_proxy.util import Config, get_logger, BaseThread
+from typing import Dict, Any
+from ceph_node_proxy.baseclient import BaseClient
+
+
+class BaseSystem(BaseThread):
+ def __init__(self, **kw: Any) -> None:
+ super().__init__()
+ self.lock: Lock = Lock()
+ self._system: Dict = {}
+ self.config: Config = kw.get('config', {})
+ self.client: BaseClient
+ self.log = get_logger(__name__)
+
+ def main(self) -> None:
+ raise NotImplementedError()
+
+ def get_system(self) -> Dict[str, Any]:
+ raise NotImplementedError()
+
+ def get_status(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_metadata(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_processors(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_memory(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_fans(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_power(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_network(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_storage(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_sn(self) -> str:
+ raise NotImplementedError()
+
+ def get_led(self) -> Dict[str, Any]:
+ raise NotImplementedError()
+
+ def set_led(self, data: Dict[str, str]) -> int:
+ raise NotImplementedError()
+
+ def get_chassis_led(self) -> Dict[str, Any]:
+ raise NotImplementedError()
+
+ def set_chassis_led(self, data: Dict[str, str]) -> int:
+ raise NotImplementedError()
+
+ def device_led_on(self, device: str) -> int:
+ raise NotImplementedError()
+
+ def device_led_off(self, device: str) -> int:
+ raise NotImplementedError()
+
+ def get_device_led(self, device: str) -> Dict[str, Any]:
+ raise NotImplementedError()
+
+ def set_device_led(self, device: str, data: Dict[str, bool]) -> int:
+ raise NotImplementedError()
+
+ def chassis_led_on(self) -> int:
+ raise NotImplementedError()
+
+ def chassis_led_off(self) -> int:
+ raise NotImplementedError()
+
+ def get_host(self) -> str:
+ return socket.gethostname()
+
+ def stop_update_loop(self) -> None:
+ raise NotImplementedError()
+
+ def flush(self) -> None:
+ raise NotImplementedError()
+
+ def shutdown_host(self, force: bool = False) -> int:
+ raise NotImplementedError()
+
+ def powercycle(self) -> int:
+ raise NotImplementedError()
diff --git a/src/ceph-node-proxy/ceph_node_proxy/main.py b/src/ceph-node-proxy/ceph_node_proxy/main.py
new file mode 100644
index 000000000..9a449ecf8
--- /dev/null
+++ b/src/ceph-node-proxy/ceph_node_proxy/main.py
@@ -0,0 +1,199 @@
+from ceph_node_proxy.redfishdellsystem import RedfishDellSystem
+from ceph_node_proxy.api import NodeProxyApi
+from ceph_node_proxy.reporter import Reporter
+from ceph_node_proxy.util import Config, get_logger, http_req, write_tmp_file, CONFIG
+from urllib.error import HTTPError
+from typing import Dict, Any, Optional
+
+import argparse
+import os
+import ssl
+import json
+import time
+import signal
+
+
+class NodeProxyManager:
+ def __init__(self, **kw: Any) -> None:
+ self.exc: Optional[Exception] = None
+ self.log = get_logger(__name__)
+ self.mgr_host: str = kw['mgr_host']
+ self.cephx_name: str = kw['cephx_name']
+ self.cephx_secret: str = kw['cephx_secret']
+ self.ca_path: str = kw['ca_path']
+ self.api_ssl_crt: str = kw['api_ssl_crt']
+ self.api_ssl_key: str = kw['api_ssl_key']
+ self.mgr_agent_port: str = str(kw['mgr_agent_port'])
+ self.stop: bool = False
+ self.ssl_ctx = ssl.create_default_context()
+ self.ssl_ctx.check_hostname = True
+ self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+ self.ssl_ctx.load_verify_locations(self.ca_path)
+ self.reporter_scheme: str = kw.get('reporter_scheme', 'https')
+ self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data')
+ self.cephx = {'cephx': {'name': self.cephx_name,
+ 'secret': self.cephx_secret}}
+ self.config = Config('/etc/ceph/node-proxy.yml', config=CONFIG)
+ self.username: str = ''
+ self.password: str = ''
+
+ def run(self) -> None:
+ self.init()
+ self.loop()
+
+ def init(self) -> None:
+ self.init_system()
+ self.init_reporter()
+ self.init_api()
+
+ def fetch_oob_details(self) -> Dict[str, str]:
+ try:
+ headers, result, status = http_req(hostname=self.mgr_host,
+ port=self.mgr_agent_port,
+ data=json.dumps(self.cephx),
+ endpoint='/node-proxy/oob',
+ ssl_ctx=self.ssl_ctx)
+ except HTTPError as e:
+ msg = f'No out of band tool details could be loaded: {e.code}, {e.reason}'
+ self.log.debug(msg)
+ raise
+
+ result_json = json.loads(result)
+ oob_details: Dict[str, str] = {
+ 'host': result_json['result']['addr'],
+ 'username': result_json['result']['username'],
+ 'password': result_json['result']['password'],
+ 'port': result_json['result'].get('port', '443')
+ }
+ return oob_details
+
+ def init_system(self) -> None:
+ try:
+ oob_details = self.fetch_oob_details()
+ self.username = oob_details['username']
+ self.password = oob_details['password']
+ except HTTPError:
+ self.log.warning('No oob details could be loaded, exiting...')
+ raise SystemExit(1)
+ try:
+ self.system = RedfishDellSystem(host=oob_details['host'],
+ port=oob_details['port'],
+ username=oob_details['username'],
+ password=oob_details['password'],
+ config=self.config)
+ self.system.start()
+ except RuntimeError:
+ self.log.error("Can't initialize the redfish system.")
+ raise
+
+ def init_reporter(self) -> None:
+ try:
+ self.reporter_agent = Reporter(self.system,
+ self.cephx,
+ reporter_scheme=self.reporter_scheme,
+ reporter_hostname=self.mgr_host,
+ reporter_port=self.mgr_agent_port,
+ reporter_endpoint=self.reporter_endpoint)
+ self.reporter_agent.start()
+ except RuntimeError:
+ self.log.error("Can't initialize the reporter.")
+ raise
+
+ def init_api(self) -> None:
+ try:
+ self.log.info('Starting node-proxy API...')
+ self.api = NodeProxyApi(self)
+ self.api.start()
+ except Exception as e:
+ self.log.error(f"Can't start node-proxy API: {e}")
+ raise
+
+ def loop(self) -> None:
+ while not self.stop:
+ for thread in [self.system, self.reporter_agent]:
+ try:
+ status = thread.check_status()
+ label = 'Ok' if status else 'Critical'
+ self.log.debug(f'{thread} status: {label}')
+ except Exception as e:
+ self.log.error(f'{thread} not running: {e.__class__.__name__}: {e}')
+ thread.shutdown()
+ self.init_system()
+ self.init_reporter()
+ self.log.debug('All threads are alive, next check in 20sec.')
+ time.sleep(20)
+
+ def shutdown(self) -> None:
+ self.stop = True
+ # if `self.system.shutdown()` is called before self.start(), it will fail.
+ if hasattr(self, 'api'):
+ self.api.shutdown()
+ if hasattr(self, 'reporter_agent'):
+ self.reporter_agent.shutdown()
+ if hasattr(self, 'system'):
+ self.system.shutdown()
+
+
+def handler(signum: Any, frame: Any, t_mgr: 'NodeProxyManager') -> None:
+ t_mgr.system.pending_shutdown = True
+ t_mgr.log.info('SIGTERM caught, shutting down threads...')
+ t_mgr.shutdown()
+ t_mgr.log.info('Logging out from RedFish API')
+ t_mgr.system.client.logout()
+ raise SystemExit(0)
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(
+ description='Ceph Node-Proxy for HW Monitoring',
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument(
+ '--config',
+ help='path of config file in json format',
+ required=True
+ )
+ parser.add_argument(
+ '--debug',
+ help='increase logging verbosity (debug level)',
+ action='store_true',
+ )
+
+ args = parser.parse_args()
+ if args.debug:
+ CONFIG['logging']['level'] = 10
+
+ if not os.path.exists(args.config):
+ raise Exception(f'No config file found at provided config path: {args.config}')
+
+ with open(args.config, 'r') as f:
+ try:
+ config_json = f.read()
+ config = json.loads(config_json)
+ except Exception as e:
+ raise Exception(f'Failed to load json config: {str(e)}')
+
+ target_ip = config['target_ip']
+ target_port = config['target_port']
+ keyring = config['keyring']
+ root_cert = config['root_cert.pem']
+ listener_cert = config['listener.crt']
+ listener_key = config['listener.key']
+ name = config['name']
+
+ ca_file = write_tmp_file(root_cert,
+ prefix_name='cephadm-endpoint-root-cert')
+
+ node_proxy_mgr = NodeProxyManager(mgr_host=target_ip,
+ cephx_name=name,
+ cephx_secret=keyring,
+ mgr_agent_port=target_port,
+ ca_path=ca_file.name,
+ api_ssl_crt=listener_cert,
+ api_ssl_key=listener_key)
+ signal.signal(signal.SIGTERM,
+ lambda signum, frame: handler(signum, frame, node_proxy_mgr))
+ node_proxy_mgr.run()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py b/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py
new file mode 100644
index 000000000..64a4e44df
--- /dev/null
+++ b/src/ceph-node-proxy/ceph_node_proxy/redfish_client.py
@@ -0,0 +1,123 @@
+import json
+from urllib.error import HTTPError, URLError
+from ceph_node_proxy.baseclient import BaseClient
+from ceph_node_proxy.util import get_logger, http_req
+from typing import Dict, Any, Tuple, Optional
+from http.client import HTTPMessage
+
+
+class RedFishClient(BaseClient):
+ PREFIX = '/redfish/v1/'
+
+ def __init__(self,
+ host: str = '',
+ port: str = '443',
+ username: str = '',
+ password: str = ''):
+ super().__init__(host, username, password)
+ self.log = get_logger(__name__)
+ self.log.info(f'Initializing redfish client {__name__}')
+ self.host: str = host
+ self.port: str = port
+ self.url: str = f'https://{self.host}:{self.port}'
+ self.token: str = ''
+ self.location: str = ''
+
+ def login(self) -> None:
+ if not self.is_logged_in():
+ self.log.info('Logging in to '
+ f"{self.url} as '{self.username}'")
+ oob_credentials = json.dumps({'UserName': self.username,
+ 'Password': self.password})
+ headers = {'Content-Type': 'application/json'}
+ location_endpoint: str = ''
+
+ try:
+ _headers, _data, _status_code = self.query(data=oob_credentials,
+ headers=headers,
+ endpoint='/redfish/v1/SessionService/Sessions/')
+ if _status_code != 201:
+ self.log.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}")
+ raise RuntimeError
+ except URLError as e:
+ msg = f"Can't log in to {self.url} as '{self.username}': {e}"
+ self.log.error(msg)
+ raise RuntimeError
+ self.token = _headers['X-Auth-Token']
+ if _headers['Location'].startswith('http'):
+ # We assume the value has the following format:
+ # scheme://address:port/redfish/v1/SessionService/Session
+ location_endpoint = f"/{_headers['Location'].split('/', 3)[-1:][0]}"
+ else:
+ location_endpoint = _headers['Location']
+ self.location = location_endpoint
+ self.log.info(f'Logged in to {self.url}, Received header "Location": {self.location}')
+
+ def is_logged_in(self) -> bool:
+ self.log.debug(f'Checking token validity for {self.url}')
+ if not self.location or not self.token:
+ self.log.debug(f'No token found for {self.url}.')
+ return False
+ headers = {'X-Auth-Token': self.token}
+ try:
+ _headers, _data, _status_code = self.query(headers=headers,
+ endpoint=self.location)
+ except URLError as e:
+ self.log.error("Can't check token "
+ f'validity for {self.url}: {e}')
+ raise
+ return _status_code == 200
+
+ def logout(self) -> Dict[str, Any]:
+ result: Dict[str, Any] = {}
+ try:
+ if self.is_logged_in():
+ _, _data, _status_code = self.query(method='DELETE',
+ headers={'X-Auth-Token': self.token},
+ endpoint=self.location)
+ result = json.loads(_data)
+ except URLError:
+ self.log.error(f"Can't log out from {self.url}")
+
+ self.location = ''
+ self.token = ''
+
+ return result
+
+ def get_path(self, path: str) -> Dict[str, Any]:
+ if self.PREFIX not in path:
+ path = f'{self.PREFIX}{path}'
+ try:
+ _, result, _status_code = self.query(endpoint=path)
+ result_json = json.loads(result)
+ return result_json
+ except URLError as e:
+ self.log.error(f"Can't get path {path}:\n{e}")
+ raise RuntimeError
+
+ def query(self,
+ data: Optional[str] = None,
+ headers: Dict[str, str] = {},
+ method: Optional[str] = None,
+ endpoint: str = '',
+ timeout: int = 10) -> Tuple[HTTPMessage, str, int]:
+ _headers = headers.copy() if headers else {}
+ if self.token:
+ _headers['X-Auth-Token'] = self.token
+ if not _headers.get('Content-Type') and method in ['POST', 'PUT', 'PATCH']:
+ _headers['Content-Type'] = 'application/json'
+ try:
+ (response_headers,
+ response_str,
+ response_status) = http_req(hostname=self.host,
+ port=self.port,
+ endpoint=endpoint,
+ headers=_headers,
+ method=method,
+ data=data,
+ timeout=timeout)
+
+ return response_headers, response_str, response_status
+ except (HTTPError, URLError) as e:
+ self.log.debug(f'{e}')
+ raise
diff --git a/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py b/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py
new file mode 100644
index 000000000..f0d24c667
--- /dev/null
+++ b/src/ceph-node-proxy/ceph_node_proxy/redfishdellsystem.py
@@ -0,0 +1,166 @@
+from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
+from ceph_node_proxy.util import get_logger, normalize_dict, to_snake_case
+from typing import Dict, Any, List
+
+
+class RedfishDellSystem(BaseRedfishSystem):
+ def __init__(self, **kw: Any) -> None:
+ super().__init__(**kw)
+ self.log = get_logger(__name__)
+ self.job_service_endpoint: str = '/redfish/v1/Managers/iDRAC.Embedded.1/Oem/Dell/DellJobService'
+ self.create_reboot_job_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.CreateRebootJob'
+ self.setup_job_queue_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.SetupJobQueue'
+
+ def build_common_data(self,
+ data: Dict[str, Any],
+ fields: List,
+ path: str) -> Dict[str, Dict[str, Dict]]:
+ result: Dict[str, Dict[str, Dict]] = dict()
+ for member_info in self.get_members(data, path):
+ member_id = member_info['Id']
+ result[member_id] = dict()
+ for field in fields:
+ try:
+ result[member_id][to_snake_case(field)] = member_info[field]
+ except KeyError:
+ self.log.warning(f'Could not find field: {field} in member_info: {member_info}')
+
+ return normalize_dict(result)
+
+ def build_chassis_data(self,
+ fields: Dict[str, List[str]],
+ path: str) -> Dict[str, Dict[str, Dict]]:
+ result: Dict[str, Dict[str, Dict]] = dict()
+ data = self._get_path(f'{self.chassis_endpoint}/{path}')
+
+ for elt, _fields in fields.items():
+ for member_elt in data[elt]:
+ _id = member_elt['MemberId']
+ result[_id] = dict()
+ for field in _fields:
+ try:
+ result[_id][to_snake_case(field)] = member_elt[field]
+ except KeyError:
+ self.log.warning(f'Could not find field: {field} in data: {data[elt]}')
+ return normalize_dict(result)
+
+ def get_sn(self) -> str:
+ return self._sys['SKU']
+
+ def get_status(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['status']
+
+ def get_memory(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['memory']
+
+ def get_processors(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['processors']
+
+ def get_network(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['network']
+
+ def get_storage(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['storage']
+
+ def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['firmwares']
+
+ def get_power(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['power']
+
+ def get_fans(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['fans']
+
+ def _update_network(self) -> None:
+ fields = ['Description', 'Name', 'SpeedMbps', 'Status']
+ self.log.debug('Updating network')
+ self._sys['network'] = self.build_common_data(data=self._system['Systems'],
+ fields=fields,
+ path='EthernetInterfaces')
+
+ def _update_processors(self) -> None:
+ fields = ['Description',
+ 'TotalCores',
+ 'TotalThreads',
+ 'ProcessorType',
+ 'Model',
+ 'Status',
+ 'Manufacturer']
+ self.log.debug('Updating processors')
+ self._sys['processors'] = self.build_common_data(data=self._system['Systems'],
+ fields=fields,
+ path='Processors')
+
+ def _update_storage(self) -> None:
+ fields = ['Description',
+ 'CapacityBytes',
+ 'Model', 'Protocol',
+ 'LocationIndicatorActive',
+ 'SerialNumber', 'Status',
+ 'PhysicalLocation']
+ entities = self.get_members(data=self._system['Systems'],
+ path='Storage')
+ self.log.debug('Updating storage')
+ result: Dict[str, Dict[str, Dict]] = dict()
+ for entity in entities:
+ for drive in entity['Drives']:
+ drive_path = drive['@odata.id']
+ drive_info = self._get_path(drive_path)
+ drive_id = drive_info['Id']
+ result[drive_id] = dict()
+ result[drive_id]['redfish_endpoint'] = drive['@odata.id']
+ for field in fields:
+ result[drive_id][to_snake_case(field)] = drive_info[field]
+ result[drive_id]['entity'] = entity['Id']
+ self._sys['storage'] = normalize_dict(result)
+
+ def _update_sn(self) -> None:
+ self.log.debug('Updating serial number')
+ self._sys['SKU'] = self._system['Systems']['SKU']
+
+ def _update_memory(self) -> None:
+ fields = ['Description',
+ 'MemoryDeviceType',
+ 'CapacityMiB',
+ 'Status']
+ self.log.debug('Updating memory')
+ self._sys['memory'] = self.build_common_data(data=self._system['Systems'],
+ fields=fields,
+ path='Memory')
+
+ def _update_power(self) -> None:
+ fields = {
+ 'PowerSupplies': [
+ 'Name',
+ 'Model',
+ 'Manufacturer',
+ 'Status'
+ ]
+ }
+ self.log.debug('Updating powersupplies')
+ self._sys['power'] = self.build_chassis_data(fields, 'Power')
+
+ def _update_fans(self) -> None:
+ fields = {
+ 'Fans': [
+ 'Name',
+ 'PhysicalContext',
+ 'Status'
+ ],
+ }
+ self.log.debug('Updating fans')
+ self._sys['fans'] = self.build_chassis_data(fields, 'Thermal')
+
+ def _update_firmwares(self) -> None:
+ fields = [
+ 'Name',
+ 'Description',
+ 'ReleaseDate',
+ 'Version',
+ 'Updateable',
+ 'Status',
+ ]
+ self.log.debug('Updating firmwares')
+ self._sys['firmwares'] = self.build_common_data(data=self._system['UpdateService'],
+ fields=fields,
+ path='FirmwareInventory')
diff --git a/src/ceph-node-proxy/ceph_node_proxy/reporter.py b/src/ceph-node-proxy/ceph_node_proxy/reporter.py
new file mode 100644
index 000000000..20d43b59d
--- /dev/null
+++ b/src/ceph-node-proxy/ceph_node_proxy/reporter.py
@@ -0,0 +1,69 @@
+import time
+import json
+from ceph_node_proxy.util import get_logger, http_req, BaseThread
+from urllib.error import HTTPError, URLError
+from typing import Dict, Any
+
+
+class Reporter(BaseThread):
+ def __init__(self,
+ system: Any,
+ cephx: Dict[str, Any],
+ reporter_scheme: str = 'https',
+ reporter_hostname: str = '',
+ reporter_port: str = '443',
+ reporter_endpoint: str = '/node-proxy/data') -> None:
+ super().__init__()
+ self.system = system
+ self.data: Dict[str, Any] = {}
+ self.stop: bool = False
+ self.cephx = cephx
+ self.data['cephx'] = self.cephx['cephx']
+ self.reporter_scheme: str = reporter_scheme
+ self.reporter_hostname: str = reporter_hostname
+ self.reporter_port: str = reporter_port
+ self.reporter_endpoint: str = reporter_endpoint
+ self.log = get_logger(__name__)
+ self.reporter_url: str = (f'{reporter_scheme}://{reporter_hostname}:'
+ f'{reporter_port}{reporter_endpoint}')
+ self.log.info(f'Reporter url set to {self.reporter_url}')
+
+ def main(self) -> None:
+ while not self.stop:
+ # Any logic to avoid sending the all the system
+ # information every loop can go here. In a real
+ # scenario probably we should just send the sub-parts
+ # that have changed to minimize the traffic in
+ # dense clusters
+ self.log.debug('waiting for a lock in reporter loop.')
+ with self.system.lock:
+ if not self.system.pending_shutdown:
+ self.log.debug('lock acquired in reporter loop.')
+ if self.system.data_ready:
+ self.log.debug('data ready to be sent to the mgr.')
+ if not self.system.get_system() == self.system.previous_data:
+ self.log.info('data has changed since last iteration.')
+ self.data['patch'] = self.system.get_system()
+ try:
+ # TODO: add a timeout parameter to the reporter in the config file
+ self.log.info(f'sending data to {self.reporter_url}')
+ http_req(hostname=self.reporter_hostname,
+ port=self.reporter_port,
+ method='POST',
+ headers={'Content-Type': 'application/json'},
+ endpoint=self.reporter_endpoint,
+ scheme=self.reporter_scheme,
+ data=json.dumps(self.data))
+ except (HTTPError, URLError) as e:
+ self.log.error(f"The reporter couldn't send data to the mgr: {e}")
+ raise
+ # Need to add a new parameter 'max_retries' to the reporter if it can't
+ # send the data for more than x times, maybe the daemon should stop altogether
+ else:
+ self.system.previous_data = self.system.get_system()
+ else:
+ self.log.debug('no diff, not sending data to the mgr.')
+ self.log.debug('lock released in reporter loop.')
+ time.sleep(5)
+ self.log.debug('exiting reporter loop.')
+ raise SystemExit(0)
diff --git a/src/ceph-node-proxy/ceph_node_proxy/util.py b/src/ceph-node-proxy/ceph_node_proxy/util.py
new file mode 100644
index 000000000..677161c63
--- /dev/null
+++ b/src/ceph-node-proxy/ceph_node_proxy/util.py
@@ -0,0 +1,196 @@
+import logging
+import yaml
+import os
+import time
+import re
+import ssl
+import traceback
+import threading
+from tempfile import NamedTemporaryFile, _TemporaryFileWrapper
+from urllib.error import HTTPError, URLError
+from urllib.request import urlopen, Request
+from typing import Dict, Callable, Any, Optional, MutableMapping, Tuple, Union
+
+
+CONFIG: Dict[str, Any] = {
+ 'reporter': {
+ 'check_interval': 5,
+ 'push_data_max_retries': 30,
+ 'endpoint': 'https://%(mgr_host):%(mgr_port)/node-proxy/data',
+ },
+ 'system': {
+ 'refresh_interval': 5
+ },
+ 'api': {
+ 'port': 9456,
+ },
+ 'logging': {
+ 'level': logging.INFO,
+ }
+}
+
+
+def get_logger(name: str, level: Union[int, str] = logging.NOTSET) -> logging.Logger:
+ if level == logging.NOTSET:
+ log_level = CONFIG['logging']['level']
+ logger = logging.getLogger(name)
+ logger.setLevel(log_level)
+ handler = logging.StreamHandler()
+ handler.setLevel(log_level)
+ fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ handler.setFormatter(fmt)
+ logger.handlers.clear()
+ logger.addHandler(handler)
+ logger.propagate = False
+
+ return logger
+
+
+logger = get_logger(__name__)
+
+
+class Config:
+ def __init__(self,
+ config_file: str = '/etc/ceph/node-proxy.yaml',
+ config: Dict[str, Any] = {}) -> None:
+ self.config_file = config_file
+ self.config = config
+
+ self.load_config()
+
+ def load_config(self) -> None:
+ if os.path.exists(self.config_file):
+ with open(self.config_file, 'r') as f:
+ self.config = yaml.safe_load(f)
+ else:
+ self.config = self.config
+
+ for k, v in self.config.items():
+ if k not in self.config.keys():
+ self.config[k] = v
+
+ for k, v in self.config.items():
+ setattr(self, k, v)
+
+ def reload(self, config_file: str = '') -> None:
+ if config_file != '':
+ self.config_file = config_file
+ self.load_config()
+
+
+class BaseThread(threading.Thread):
+ def __init__(self) -> None:
+ super().__init__()
+ self.exc: Optional[Exception] = None
+ self.stop: bool = False
+ self.daemon = True
+ self.name = self.__class__.__name__
+ self.log: logging.Logger = get_logger(__name__)
+ self.pending_shutdown: bool = False
+
+ def run(self) -> None:
+ logger.info(f'Starting {self.name}')
+ try:
+ self.main()
+ except Exception as e:
+ self.exc = e
+ return
+
+ def shutdown(self) -> None:
+ self.stop = True
+ self.pending_shutdown = True
+
+ def check_status(self) -> bool:
+ logger.debug(f'Checking status of {self.name}')
+ if self.exc:
+ traceback.print_tb(self.exc.__traceback__)
+ logger.error(f'Caught exception: {self.exc.__class__.__name__}')
+ raise self.exc
+ if not self.is_alive():
+ logger.info(f'{self.name} not alive')
+ self.start()
+ return True
+
+ def main(self) -> None:
+ raise NotImplementedError()
+
+
+def to_snake_case(name: str) -> str:
+ name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
+ return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
+
+
+def normalize_dict(test_dict: Dict) -> Dict:
+ res = dict()
+ for key in test_dict.keys():
+ if isinstance(test_dict[key], dict):
+ res[key.lower()] = normalize_dict(test_dict[key])
+ else:
+ if test_dict[key] is None:
+ test_dict[key] = 'unknown'
+ res[key.lower()] = test_dict[key]
+ return res
+
+
+def retry(exceptions: Any = Exception, retries: int = 20, delay: int = 1) -> Callable:
+ def decorator(f: Callable) -> Callable:
+ def _retry(*args: str, **kwargs: Any) -> Callable:
+ _tries = retries
+ while _tries > 1:
+ try:
+ logger.debug('{} {} attempt(s) left.'.format(f, _tries - 1))
+ return f(*args, **kwargs)
+ except exceptions:
+ time.sleep(delay)
+ _tries -= 1
+ logger.warn('{} has failed after {} tries'.format(f, retries))
+ return f(*args, **kwargs)
+ return _retry
+ return decorator
+
+
+def http_req(hostname: str = '',
+ port: str = '443',
+ method: Optional[str] = None,
+ headers: MutableMapping[str, str] = {},
+ data: Optional[str] = None,
+ endpoint: str = '/',
+ scheme: str = 'https',
+ ssl_verify: bool = False,
+ timeout: Optional[int] = None,
+ ssl_ctx: Optional[Any] = None) -> Tuple[Any, Any, Any]:
+
+ if not ssl_ctx:
+ ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ if not ssl_verify:
+ ssl_ctx.check_hostname = False
+ ssl_ctx.verify_mode = ssl.CERT_NONE
+ else:
+ ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+
+ url: str = f'{scheme}://{hostname}:{port}{endpoint}'
+ _data = bytes(data, 'ascii') if data else None
+ _headers = headers
+ if data and not method:
+ method = 'POST'
+ if not _headers.get('Content-Type') and method in ['POST', 'PATCH']:
+ _headers['Content-Type'] = 'application/json'
+ try:
+ req = Request(url, _data, _headers, method=method)
+ with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
+ response_str = response.read()
+ response_headers = response.headers
+ response_code = response.code
+ return response_headers, response_str.decode(), response_code
+ except (HTTPError, URLError) as e:
+ print(f'{e}')
+ # handle error here if needed
+ raise
+
+
+def write_tmp_file(data: str, prefix_name: str = 'node-proxy-') -> _TemporaryFileWrapper:
+ f = NamedTemporaryFile(prefix=prefix_name)
+ os.fchmod(f.fileno(), 0o600)
+ f.write(data.encode('utf-8'))
+ f.flush()
+ return f