import errno import ipaddress import logging import os import subprocess import tempfile from typing import Dict, Tuple, Any, List, cast, Optional from mgr_module import HandleCommandResult from mgr_module import NFS_POOL_NAME as POOL_NAME from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec from orchestrator import DaemonDescription from cephadm.services.cephadmservice import AuthEntity, CephadmDaemonDeploySpec, CephService logger = logging.getLogger(__name__) class NFSService(CephService): TYPE = 'nfs' def ranked(self) -> bool: return True def fence(self, daemon_id: str) -> None: logger.info(f'Fencing old nfs.{daemon_id}') ret, out, err = self.mgr.mon_command({ 'prefix': 'auth rm', 'entity': f'client.nfs.{daemon_id}', }) # TODO: block/fence this entity (in case it is still running somewhere) def fence_old_ranks(self, spec: ServiceSpec, rank_map: Dict[int, Dict[int, Optional[str]]], num_ranks: int) -> None: for rank, m in list(rank_map.items()): if rank >= num_ranks: for daemon_id in m.values(): if daemon_id is not None: self.fence(daemon_id) del rank_map[rank] nodeid = f'{spec.service_name()}.{rank}' self.mgr.log.info(f'Removing {nodeid} from the ganesha grace table') self.run_grace_tool(cast(NFSServiceSpec, spec), 'remove', nodeid) self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map) else: max_gen = max(m.keys()) for gen, daemon_id in list(m.items()): if gen < max_gen: if daemon_id is not None: self.fence(daemon_id) del rank_map[rank][gen] self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map) def config(self, spec: NFSServiceSpec) -> None: # type: ignore from nfs.cluster import create_ganesha_pool assert self.TYPE == spec.service_type create_ganesha_pool(self.mgr) def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: assert self.TYPE == daemon_spec.daemon_type daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) return daemon_spec def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: assert self.TYPE == daemon_spec.daemon_type daemon_type = daemon_spec.daemon_type daemon_id = daemon_spec.daemon_id host = daemon_spec.host spec = cast(NFSServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec) deps: List[str] = [] nodeid = f'{daemon_spec.service_name}.{daemon_spec.rank}' # create the RADOS recovery pool keyring rados_user = f'{daemon_type}.{daemon_id}' rados_keyring = self.create_keyring(daemon_spec) # ensure rank is known to ganesha self.mgr.log.info(f'Ensuring {nodeid} is in the ganesha grace table') self.run_grace_tool(spec, 'add', nodeid) # create the rados config object self.create_rados_config_obj(spec) # create the RGW keyring rgw_user = f'{rados_user}-rgw' rgw_keyring = self.create_rgw_keyring(daemon_spec) if spec.virtual_ip: bind_addr = spec.virtual_ip else: bind_addr = daemon_spec.ip if daemon_spec.ip else '' if not bind_addr: logger.warning(f'Bind address in {daemon_type}.{daemon_id}\'s ganesha conf is defaulting to empty') else: logger.debug("using haproxy bind address: %r", bind_addr) # generate the ganesha config def get_ganesha_conf() -> str: context: Dict[str, Any] = { "user": rados_user, "nodeid": nodeid, "pool": POOL_NAME, "namespace": spec.service_id, "rgw_user": rgw_user, "url": f'rados://{POOL_NAME}/{spec.service_id}/{spec.rados_config_name()}', # fall back to default NFS port if not present in daemon_spec "port": daemon_spec.ports[0] if daemon_spec.ports else 2049, "bind_addr": bind_addr, "haproxy_hosts": [], } if spec.enable_haproxy_protocol: context["haproxy_hosts"] = self._haproxy_hosts() logger.debug("selected haproxy_hosts: %r", context["haproxy_hosts"]) return self.mgr.template.render('services/nfs/ganesha.conf.j2', context) # generate the cephadm config json def get_cephadm_config() -> Dict[str, Any]: config: Dict[str, Any] = {} config['pool'] = POOL_NAME config['namespace'] = spec.service_id config['userid'] = rados_user config['extra_args'] = ['-N', 'NIV_EVENT'] config['files'] = { 'ganesha.conf': get_ganesha_conf(), } config.update( self.get_config_and_keyring( daemon_type, daemon_id, keyring=rados_keyring, host=host ) ) config['rgw'] = { 'cluster': 'ceph', 'user': rgw_user, 'keyring': rgw_keyring, } logger.debug('Generated cephadm config-json: %s' % config) return config return get_cephadm_config(), deps def create_rados_config_obj(self, spec: NFSServiceSpec, clobber: bool = False) -> None: objname = spec.rados_config_name() cmd = [ 'rados', '-n', f"mgr.{self.mgr.get_mgr_id()}", '-k', str(self.mgr.get_ceph_option('keyring')), '-p', POOL_NAME, '--namespace', cast(str, spec.service_id), ] result = subprocess.run( cmd + ['get', objname, '-'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10) if not result.returncode and not clobber: logger.info('Rados config object exists: %s' % objname) else: logger.info('Creating rados config object: %s' % objname) result = subprocess.run( cmd + ['put', objname, '-'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10) if result.returncode: self.mgr.log.warning( f'Unable to create rados config object {objname}: {result.stderr.decode("utf-8")}' ) raise RuntimeError(result.stderr.decode("utf-8")) def create_keyring(self, daemon_spec: CephadmDaemonDeploySpec) -> str: daemon_id = daemon_spec.daemon_id spec = cast(NFSServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec) entity: AuthEntity = self.get_auth_entity(daemon_id) osd_caps = 'allow rw pool=%s namespace=%s' % (POOL_NAME, spec.service_id) logger.info('Creating key for %s' % entity) keyring = self.get_keyring_with_caps(entity, ['mon', 'allow r', 'osd', osd_caps]) return keyring def create_rgw_keyring(self, daemon_spec: CephadmDaemonDeploySpec) -> str: daemon_id = daemon_spec.daemon_id entity: AuthEntity = self.get_auth_entity(f'{daemon_id}-rgw') logger.info('Creating key for %s' % entity) keyring = self.get_keyring_with_caps(entity, ['mon', 'allow r', 'osd', 'allow rwx tag rgw *=*']) return keyring def run_grace_tool(self, spec: NFSServiceSpec, action: str, nodeid: str) -> None: # write a temp keyring and referencing config file. this is a kludge # because the ganesha-grace-tool can only authenticate as a client (and # not a mgr). Also, it doesn't allow you to pass a keyring location via # the command line, nor does it parse the CEPH_ARGS env var. tmp_id = f'mgr.nfs.grace.{spec.service_name()}' entity = AuthEntity(f'client.{tmp_id}') keyring = self.get_keyring_with_caps( entity, ['mon', 'allow r', 'osd', f'allow rwx pool {POOL_NAME}'] ) tmp_keyring = tempfile.NamedTemporaryFile(mode='w', prefix='mgr-grace-keyring') os.fchmod(tmp_keyring.fileno(), 0o600) tmp_keyring.write(keyring) tmp_keyring.flush() tmp_conf = tempfile.NamedTemporaryFile(mode='w', prefix='mgr-grace-conf') tmp_conf.write(self.mgr.get_minimal_ceph_conf()) tmp_conf.write(f'\tkeyring = {tmp_keyring.name}\n') tmp_conf.flush() try: cmd: List[str] = [ 'ganesha-rados-grace', '--cephconf', tmp_conf.name, '--userid', tmp_id, '--pool', POOL_NAME, '--ns', cast(str, spec.service_id), action, nodeid, ] self.mgr.log.debug(cmd) result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10) if result.returncode: self.mgr.log.warning( f'ganesha-rados-grace tool failed: {result.stderr.decode("utf-8")}' ) raise RuntimeError(f'grace tool failed: {result.stderr.decode("utf-8")}') finally: self.mgr.check_mon_command({ 'prefix': 'auth rm', 'entity': entity, }) def remove_rgw_keyring(self, daemon: DaemonDescription) -> None: assert daemon.daemon_id is not None daemon_id: str = daemon.daemon_id entity: AuthEntity = self.get_auth_entity(f'{daemon_id}-rgw') logger.info(f'Removing key for {entity}') self.mgr.check_mon_command({ 'prefix': 'auth rm', 'entity': entity, }) def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: super().post_remove(daemon, is_failed_deploy=is_failed_deploy) self.remove_rgw_keyring(daemon) def ok_to_stop(self, daemon_ids: List[str], force: bool = False, known: Optional[List[str]] = None) -> HandleCommandResult: # if only 1 nfs, alert user (this is not passable with --force) warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'NFS', 1, True) if warn: return HandleCommandResult(-errno.EBUSY, '', warn_message) # if reached here, there is > 1 nfs daemon. if force: return HandleCommandResult(0, warn_message, '') # if reached here, > 1 nfs daemon and no force flag. # Provide warning warn_message = "WARNING: Removing NFS daemons can cause clients to lose connectivity. " return HandleCommandResult(-errno.EBUSY, '', warn_message) def purge(self, service_name: str) -> None: if service_name not in self.mgr.spec_store: return spec = cast(NFSServiceSpec, self.mgr.spec_store[service_name].spec) logger.info(f'Removing grace file for {service_name}') cmd = [ 'rados', '-n', f"mgr.{self.mgr.get_mgr_id()}", '-k', str(self.mgr.get_ceph_option('keyring')), '-p', POOL_NAME, '--namespace', cast(str, spec.service_id), 'rm', 'grace', ] subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10 ) def _haproxy_hosts(self) -> List[str]: # NB: Ideally, we would limit the list to IPs on hosts running # haproxy/ingress only, but due to the nature of cephadm today # we'd "only know the set of haproxy hosts after they've been # deployed" (quoth @adk7398). As it is today we limit the list # of hosts we know are managed by cephadm. That ought to be # good enough to prevent acceping haproxy protocol messages # from "rouge" systems that are not under our control. At # least until we learn otherwise. cluster_ips: List[str] = [] for host in self.mgr.inventory.keys(): default_addr = self.mgr.inventory.get_addr(host) cluster_ips.append(default_addr) nets = self.mgr.cache.networks.get(host) if not nets: continue for subnet, iface in nets.items(): ip_subnet = ipaddress.ip_network(subnet) if ipaddress.ip_address(default_addr) in ip_subnet: continue # already present if ip_subnet.is_loopback or ip_subnet.is_link_local: continue # ignore special subnets addrs: List[str] = sum((addr_list for addr_list in iface.values()), []) if addrs: # one address per interface/subnet is enough cluster_ips.append(addrs[0]) return cluster_ips