diff options
Diffstat (limited to 'src/cephadm')
-rwxr-xr-x | src/cephadm/cephadm.py | 263 | ||||
-rw-r--r-- | src/cephadm/tests/test_agent.py | 2 | ||||
-rw-r--r-- | src/cephadm/tests/test_nfs.py | 1 | ||||
-rw-r--r-- | src/cephadm/tox.ini | 2 |
4 files changed, 230 insertions, 38 deletions
diff --git a/src/cephadm/cephadm.py b/src/cephadm/cephadm.py index bcb82c4c4..c71810640 100755 --- a/src/cephadm/cephadm.py +++ b/src/cephadm/cephadm.py @@ -37,9 +37,10 @@ from functools import wraps from glob import glob from io import StringIO from threading import Thread, Event +from pathlib import Path from urllib.error import HTTPError, URLError from urllib.request import urlopen, Request -from pathlib import Path + FuncT = TypeVar('FuncT', bound=Callable) @@ -55,7 +56,7 @@ DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0' DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7' DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3' DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4' -DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.1' +DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:1.0.0' DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1' DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23' DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29' @@ -136,6 +137,33 @@ async def concurrent_tasks(func: Callable, cmd_list: List[str]) -> List[Any]: return data +def http_query(addr: str = '', + port: str = '', + data: Optional[bytes] = None, + endpoint: str = '', + ssl_ctx: Optional[Any] = None, + timeout: Optional[int] = 10) -> Tuple[int, str]: + + url = f'https://{addr}:{port}{endpoint}' + logger.debug(f'sending query to {url}') + try: + req = Request(url, data, {'Content-Type': 'application/json'}) + with urlopen(req, context=ssl_ctx, timeout=timeout) as response: + response_str = response.read() + response_status = response.status + except HTTPError as e: + logger.debug(f'{e.code} {e.reason}') + response_status = e.code + response_str = e.reason + except URLError as e: + logger.debug(f'{e.reason}') + response_status = -1 + response_str = e.reason + except Exception: + raise + return (response_status, response_str) + + class EndPoint: """EndPoint representing an ip:port format""" @@ -672,6 +700,100 @@ class Monitoring(object): ################################## +class NodeProxy(object): + """Defines a node-proxy container""" + + daemon_type = 'node-proxy' + # TODO: update this if we make node-proxy an executable + entrypoint = '/usr/sbin/ceph-node-proxy' + required_files = ['node-proxy.json'] + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + + def __init__(self, + ctx: CephadmContext, + fsid: str, + daemon_id: Union[str, int], + config_json: Dict[Any, Any], + image: str = DEFAULT_IMAGE) -> None: + self.ctx = ctx + self.fsid = fsid + self.daemon_id = daemon_id + self.image = image + + config = dict_get(config_json, 'node-proxy.json', {}) + self.files = {'node-proxy.json': config} + + # validate the supplied args + self.validate() + + @classmethod + def init(cls, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str]) -> 'NodeProxy': + return cls(ctx, fsid, daemon_id, + fetch_configs(ctx), ctx.image) + + @staticmethod + def get_container_mounts(data_dir, log_dir): + # type: (str, str) -> Dict[str, str] + mounts = dict() + mounts[os.path.join(data_dir, 'node-proxy.json')] = '/usr/share/ceph/node-proxy.json:z' + return mounts + + def get_daemon_args(self) -> List[str]: + # TODO: this corresponds with the mount location of + # the config in _get_container_mounts above. They + # will both need to be updated when we have a proper + # location in the container for node-proxy + return ['--config', '/usr/share/ceph/node-proxy.json'] + + def validate(self): + # type: () -> None + if not is_fsid(self.fsid): + raise Error('not an fsid: %s' % self.fsid) + if not self.daemon_id: + raise Error('invalid daemon_id: %s' % self.daemon_id) + if not self.image: + raise Error('invalid image: %s' % self.image) + # check for the required files + if self.required_files: + for fname in self.required_files: + if fname not in self.files: + raise Error( + 'required file missing from config-json: %s' % fname + ) + + def get_daemon_name(self): + # type: () -> str + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def get_container_name(self, desc=None): + # type: (Optional[str]) -> str + cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name()) + if desc: + cname = '%s-%s' % (cname, desc) + return cname + + def create_daemon_dirs(self, data_dir, uid, gid): + # type: (str, int, int) -> None + """Create files under the container data dir""" + if not os.path.isdir(data_dir): + raise OSError('data_dir is not a directory: %s' % (data_dir)) + + logger.info('Writing node-proxy config...') + # populate files from the config-json + populate_files(data_dir, self.files, uid, gid) + + def config_and_keyring( + self, ctx: CephadmContext + ) -> Tuple[Optional[str], Optional[str]]: + return get_config_and_keyring(ctx) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return extract_uid_gid(ctx) + + @contextmanager def write_new( destination: Union[str, Path], @@ -726,7 +848,7 @@ class NFSGanesha(object): entrypoint = '/usr/bin/ganesha.nfsd' daemon_args = ['-F', '-L', 'STDERR'] - required_files = ['ganesha.conf'] + required_files = ['ganesha.conf', 'idmap.conf'] port_map = { 'nfs': 2049, @@ -1072,7 +1194,7 @@ class CephNvmeof(object): fetch_configs(ctx), ctx.image) @staticmethod - def get_container_mounts(data_dir: str) -> Dict[str, str]: + def get_container_mounts(data_dir: str, log_dir: str) -> Dict[str, str]: mounts = dict() mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z' mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z' @@ -1080,6 +1202,7 @@ class CephNvmeof(object): mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config' mounts['/dev/hugepages'] = '/dev/hugepages' mounts['/dev/vfio/vfio'] = '/dev/vfio/vfio' + mounts[log_dir] = '/var/log/ceph:z' return mounts @staticmethod @@ -1098,7 +1221,7 @@ class CephNvmeof(object): out, err, ret = call(ctx, [ctx.container_engine.path, 'inspect', '--format', '{{index .Config.Labels "io.ceph.version"}}', - ctx.image]) + container_id]) version = None if ret == 0: version = out.strip() @@ -1217,6 +1340,13 @@ class CephExporter(object): ] return args + def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: + if not os.path.exists(self.sock_dir): + os.mkdir(self.sock_dir) + # part of validation is for the sock dir, so we postpone + # it until now + self.validate() + def validate(self) -> None: if not os.path.isdir(self.sock_dir): raise Error(f'Directory does not exist. Got: {self.sock_dir}') @@ -1620,6 +1750,7 @@ def get_supported_daemons(): supported_daemons.append(CephadmAgent.daemon_type) supported_daemons.append(SNMPGateway.daemon_type) supported_daemons.extend(Tracing.components) + supported_daemons.append(NodeProxy.daemon_type) assert len(supported_daemons) == len(set(supported_daemons)) return supported_daemons @@ -2998,9 +3129,11 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id): ip = meta['ip'] if 'ports' in meta and meta['ports']: port = meta['ports'][0] - r += [f'--web.listen-address={ip}:{port}'] if daemon_type == 'prometheus': config = fetch_configs(ctx) + ip_to_bind_to = config.get('ip_to_bind_to', '') + if ip_to_bind_to: + ip = ip_to_bind_to retention_time = config.get('retention_time', '15d') retention_size = config.get('retention_size', '0') # default to disabled r += [f'--storage.tsdb.retention.time={retention_time}'] @@ -3015,6 +3148,7 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id): addr = next(iter(ipv4_addrs or ipv6_addrs), None) host = wrap_ipv6(addr) if addr else host r += [f'--web.external-url={scheme}://{host}:{port}'] + r += [f'--web.listen-address={ip}:{port}'] if daemon_type == 'alertmanager': config = fetch_configs(ctx) peers = config.get('peers', list()) # type: ignore @@ -3060,6 +3194,9 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id): elif daemon_type == SNMPGateway.daemon_type: sc = SNMPGateway.init(ctx, fsid, daemon_id) r.extend(sc.get_daemon_args()) + elif daemon_type == NodeProxy.daemon_type: + node_proxy = NodeProxy.init(ctx, fsid, daemon_id) + r.extend(node_proxy.get_daemon_args()) return r @@ -3173,6 +3310,14 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid, sg = SNMPGateway.init(ctx, fsid, daemon_id) sg.create_daemon_conf() + elif daemon_type == NodeProxy.daemon_type: + node_proxy = NodeProxy.init(ctx, fsid, daemon_id) + node_proxy.create_daemon_dirs(data_dir, uid, gid) + + elif daemon_type == CephExporter.daemon_type: + ceph_exporter = CephExporter.init(ctx, fsid, daemon_id) + ceph_exporter.create_daemon_dirs(data_dir, uid, gid) + _write_custom_conf_files(ctx, daemon_type, str(daemon_id), fsid, uid, gid) @@ -3480,7 +3625,8 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id, if daemon_type == CephNvmeof.daemon_type: assert daemon_id data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) - mounts.update(CephNvmeof.get_container_mounts(data_dir)) + log_dir = get_log_dir(fsid, ctx.log_dir) + mounts.update(CephNvmeof.get_container_mounts(data_dir, log_dir)) if daemon_type == CephIscsi.daemon_type: assert daemon_id @@ -3503,6 +3649,12 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id, data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) mounts.update(cc.get_container_mounts(data_dir)) + if daemon_type == NodeProxy.daemon_type: + assert daemon_id + data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) + log_dir = get_log_dir(fsid, ctx.log_dir) + mounts.update(NodeProxy.get_container_mounts(data_dir, log_dir)) + # Modifications podman makes to /etc/hosts causes issues with # certain daemons (specifically referencing "host.containers.internal" entry # being added to /etc/hosts in this case). To avoid that, but still @@ -3633,6 +3785,9 @@ def get_container(ctx: CephadmContext, host_network = False envs.extend(cc.get_container_envs()) container_args.extend(cc.get_container_args()) + elif daemon_type == NodeProxy.daemon_type: + entrypoint = NodeProxy.entrypoint + name = '%s.%s' % (daemon_type, daemon_id) if daemon_type in Monitoring.components: uid, gid = extract_uid_gid_monitoring(ctx, daemon_type) @@ -4718,12 +4873,13 @@ class MgrListener(Thread): conn.send(err_str.encode()) logger.error(err_str) else: - conn.send(b'ACK') - if 'config' in data: - self.agent.wakeup() - self.agent.ls_gatherer.wakeup() - self.agent.volume_gatherer.wakeup() - logger.debug(f'Got mgr message {data}') + if 'counter' in data: + conn.send(b'ACK') + if 'config' in data: + self.agent.wakeup() + self.agent.ls_gatherer.wakeup() + self.agent.volume_gatherer.wakeup() + logger.debug(f'Got mgr message {data}') except Exception as e: logger.error(f'Mgr Listener encountered exception: {e}') @@ -4731,17 +4887,20 @@ class MgrListener(Thread): self.stop = True def handle_json_payload(self, data: Dict[Any, Any]) -> None: - self.agent.ack = int(data['counter']) - if 'config' in data: - logger.info('Received new config from mgr') - config = data['config'] - for filename in config: - if filename in self.agent.required_files: - file_path = os.path.join(self.agent.daemon_dir, filename) - with write_new(file_path) as f: - f.write(config[filename]) - self.agent.pull_conf_settings() - self.agent.wakeup() + if 'counter' in data: + self.agent.ack = int(data['counter']) + if 'config' in data: + logger.info('Received new config from mgr') + config = data['config'] + for filename in config: + if filename in self.agent.required_files: + file_path = os.path.join(self.agent.daemon_dir, filename) + with write_new(file_path) as f: + f.write(config[filename]) + self.agent.pull_conf_settings() + self.agent.wakeup() + else: + raise RuntimeError('No valid data received.') class CephadmAgent(): @@ -4783,6 +4942,9 @@ class CephadmAgent(): self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0] self.recent_iteration_index: int = 0 self.cached_ls_values: Dict[str, Dict[str, str]] = {} + self.ssl_ctx = ssl.create_default_context() + self.ssl_ctx.check_hostname = True + self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED def validate(self, config: Dict[str, str] = {}) -> None: # check for the required files @@ -4894,6 +5056,7 @@ WantedBy=ceph-{fsid}.target def run(self) -> None: self.pull_conf_settings() + self.ssl_ctx.load_verify_locations(self.ca_path) try: for _ in range(1001): @@ -4915,11 +5078,6 @@ WantedBy=ceph-{fsid}.target if not self.volume_gatherer.is_alive(): self.volume_gatherer.start() - ssl_ctx = ssl.create_default_context() - ssl_ctx.check_hostname = True - ssl_ctx.verify_mode = ssl.CERT_REQUIRED - ssl_ctx.load_verify_locations(self.ca_path) - while not self.stop: start_time = time.monotonic() ack = self.ack @@ -4945,15 +5103,19 @@ WantedBy=ceph-{fsid}.target 'port': self.listener_port}) data = data.encode('ascii') - url = f'https://{self.target_ip}:{self.target_port}/data/' try: - req = Request(url, data, {'Content-Type': 'application/json'}) send_time = time.monotonic() - with urlopen(req, context=ssl_ctx) as response: - response_str = response.read() - response_json = json.loads(response_str) - total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds() - logger.info(f'Received mgr response: "{response_json["result"]}" {total_request_time} seconds after sending request.') + status, response = http_query(addr=self.target_ip, + port=self.target_port, + data=data, + endpoint='/data', + ssl_ctx=self.ssl_ctx) + response_json = json.loads(response) + if status != 200: + logger.error(f'HTTP error {status} while querying agent endpoint: {response}') + raise RuntimeError + total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds() + logger.info(f'Received mgr response: "{response_json["result"]}" {total_request_time} seconds after sending request.') except Exception as e: logger.error(f'Failed to send metadata to mgr: {e}') @@ -6441,6 +6603,10 @@ def command_bootstrap(ctx): 'For more information see:\n\n' '\thttps://docs.ceph.com/en/latest/mgr/telemetry/\n') logger.info('Bootstrap complete.') + + if getattr(ctx, 'deploy_cephadm_agent', None): + cli(['config', 'set', 'mgr', 'mgr/cephadm/use_agent', 'true']) + return ctx.error_code ################################## @@ -6758,6 +6924,15 @@ def _dispatch_deploy( deployment_type=deployment_type, endpoints=daemon_endpoints) + elif daemon_type == NodeProxy.daemon_type: + config, keyring = get_config_and_keyring(ctx) + uid, gid = extract_uid_gid(ctx) + c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id) + deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, + config=config, keyring=keyring, + deployment_type=deployment_type, + endpoints=daemon_endpoints) + elif daemon_type == CephadmAgent.daemon_type: # get current user gid and uid uid = os.getuid() @@ -8059,6 +8234,13 @@ def _rm_cluster(ctx: CephadmContext, keep_logs: bool, zap_osds: bool) -> None: for fname in glob(f'{ctx.log_dir}/cephadm.log*'): os.remove(fname) + try: + Path('/etc/ceph/podman-auth.json').unlink() + except FileNotFoundError: + pass + except Exception as e: + logger.debug(f'Could not remove podman-auth.json: {e}') + # rm sysctl settings sysctl_dirs: List[Path] = [Path(ctx.sysctl_dir), Path('/usr/lib/sysctl.d')] @@ -8097,6 +8279,7 @@ def check_time_sync(ctx, enabler=None): 'ntp.service', # 18.04 (at least) 'ntpsec.service', # 20.04 (at least) / buster 'openntpd.service', # ubuntu / debian + 'timemaster.service', # linuxptp on ubuntu/debian ] if not check_units(ctx, units, enabler): logger.warning('No time sync service is running; checked for %s' % units) @@ -10008,7 +10191,9 @@ def _get_parser(): parser_version.set_defaults(func=command_version) parser_pull = subparsers.add_parser( - 'pull', help='pull the default container image') + 'pull', + help='pull a ceph container image (will pull the default image if --image not provided)', + usage='cephadm pull (for default image) | cephadm --image <image-name> pull (for custom ceph image)') parser_pull.set_defaults(func=command_pull) parser_pull.add_argument( '--insecure', @@ -10440,6 +10625,10 @@ def _get_parser(): '--log-to-file', action='store_true', help='configure cluster to log to traditional log files in /var/log/ceph/$fsid') + parser_bootstrap.add_argument( + '--deploy-cephadm-agent', + action='store_true', + help='deploy the cephadm-agent') parser_deploy = subparsers.add_parser( 'deploy', help='deploy a daemon') diff --git a/src/cephadm/tests/test_agent.py b/src/cephadm/tests/test_agent.py index f9cf201e2..182c1f661 100644 --- a/src/cephadm/tests/test_agent.py +++ b/src/cephadm/tests/test_agent.py @@ -527,7 +527,7 @@ def test_agent_run(_pull_conf_settings, _port_in_use, _gatherer_start, 'port': str(open_listener_port) } _RQ_init.assert_called_with( - f'https://{target_ip}:{target_port}/data/', + f'https://{target_ip}:{target_port}/data', json.dumps(expected_data).encode('ascii'), {'Content-Type': 'application/json'} ) diff --git a/src/cephadm/tests/test_nfs.py b/src/cephadm/tests/test_nfs.py index 0649ef934..c9893b32d 100644 --- a/src/cephadm/tests/test_nfs.py +++ b/src/cephadm/tests/test_nfs.py @@ -25,6 +25,7 @@ def nfs_json(**kwargs): if kwargs.get("files"): result["files"] = { "ganesha.conf": "", + "idmap.conf": "", } if kwargs.get("rgw_content"): result["rgw"] = dict(kwargs["rgw_content"]) diff --git a/src/cephadm/tox.ini b/src/cephadm/tox.ini index 2cbfca70f..a6d7f3958 100644 --- a/src/cephadm/tox.ini +++ b/src/cephadm/tox.ini @@ -37,12 +37,14 @@ deps = pyfakefs >= 5, < 6 ; python_version >= "3.7" mock pytest + pyyaml commands=pytest {posargs} [testenv:mypy] basepython = python3 deps = mypy + types-PyYAML -c{toxinidir}/../mypy-constrains.txt commands = mypy --config-file ../mypy.ini {posargs:cephadm.py} |