summaryrefslogtreecommitdiffstats
path: root/src/ceph-node-proxy/ceph_node_proxy/reporter.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ceph-node-proxy/ceph_node_proxy/reporter.py')
-rw-r--r--src/ceph-node-proxy/ceph_node_proxy/reporter.py69
1 files changed, 69 insertions, 0 deletions
diff --git a/src/ceph-node-proxy/ceph_node_proxy/reporter.py b/src/ceph-node-proxy/ceph_node_proxy/reporter.py
new file mode 100644
index 000000000..20d43b59d
--- /dev/null
+++ b/src/ceph-node-proxy/ceph_node_proxy/reporter.py
@@ -0,0 +1,69 @@
+import time
+import json
+from ceph_node_proxy.util import get_logger, http_req, BaseThread
+from urllib.error import HTTPError, URLError
+from typing import Dict, Any
+
+
+class Reporter(BaseThread):
+ def __init__(self,
+ system: Any,
+ cephx: Dict[str, Any],
+ reporter_scheme: str = 'https',
+ reporter_hostname: str = '',
+ reporter_port: str = '443',
+ reporter_endpoint: str = '/node-proxy/data') -> None:
+ super().__init__()
+ self.system = system
+ self.data: Dict[str, Any] = {}
+ self.stop: bool = False
+ self.cephx = cephx
+ self.data['cephx'] = self.cephx['cephx']
+ self.reporter_scheme: str = reporter_scheme
+ self.reporter_hostname: str = reporter_hostname
+ self.reporter_port: str = reporter_port
+ self.reporter_endpoint: str = reporter_endpoint
+ self.log = get_logger(__name__)
+ self.reporter_url: str = (f'{reporter_scheme}://{reporter_hostname}:'
+ f'{reporter_port}{reporter_endpoint}')
+ self.log.info(f'Reporter url set to {self.reporter_url}')
+
+ def main(self) -> None:
+ while not self.stop:
+ # Any logic to avoid sending the all the system
+ # information every loop can go here. In a real
+ # scenario probably we should just send the sub-parts
+ # that have changed to minimize the traffic in
+ # dense clusters
+ self.log.debug('waiting for a lock in reporter loop.')
+ with self.system.lock:
+ if not self.system.pending_shutdown:
+ self.log.debug('lock acquired in reporter loop.')
+ if self.system.data_ready:
+ self.log.debug('data ready to be sent to the mgr.')
+ if not self.system.get_system() == self.system.previous_data:
+ self.log.info('data has changed since last iteration.')
+ self.data['patch'] = self.system.get_system()
+ try:
+ # TODO: add a timeout parameter to the reporter in the config file
+ self.log.info(f'sending data to {self.reporter_url}')
+ http_req(hostname=self.reporter_hostname,
+ port=self.reporter_port,
+ method='POST',
+ headers={'Content-Type': 'application/json'},
+ endpoint=self.reporter_endpoint,
+ scheme=self.reporter_scheme,
+ data=json.dumps(self.data))
+ except (HTTPError, URLError) as e:
+ self.log.error(f"The reporter couldn't send data to the mgr: {e}")
+ raise
+ # Need to add a new parameter 'max_retries' to the reporter if it can't
+ # send the data for more than x times, maybe the daemon should stop altogether
+ else:
+ self.system.previous_data = self.system.get_system()
+ else:
+ self.log.debug('no diff, not sending data to the mgr.')
+ self.log.debug('lock released in reporter loop.')
+ time.sleep(5)
+ self.log.debug('exiting reporter loop.')
+ raise SystemExit(0)