From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/pybind/mgr/nfs/export.py | 814 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 814 insertions(+) create mode 100644 src/pybind/mgr/nfs/export.py (limited to 'src/pybind/mgr/nfs/export.py') diff --git a/src/pybind/mgr/nfs/export.py b/src/pybind/mgr/nfs/export.py new file mode 100644 index 000000000..3c4ce8625 --- /dev/null +++ b/src/pybind/mgr/nfs/export.py @@ -0,0 +1,814 @@ +import errno +import json +import logging +from typing import ( + List, + Any, + Dict, + Tuple, + Optional, + TYPE_CHECKING, + TypeVar, + Callable, + Set, + cast) +from os.path import normpath + +from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES + +from orchestrator import NoOrchestrator +from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS + +from .export_utils import GaneshaConfParser, Export, RawBlock, CephFSFSAL, RGWFSAL +from .exception import NFSException, NFSInvalidOperation, FSNotFound +from .utils import ( + CONF_PREFIX, + EXPORT_PREFIX, + USER_CONF_PREFIX, + export_obj_name, + conf_obj_name, + available_clusters, + check_fs, + restart_nfs_service) + +if TYPE_CHECKING: + from nfs.module import Module + +FuncT = TypeVar('FuncT', bound=Callable) + +log = logging.getLogger(__name__) + + +def known_cluster_ids(mgr: 'Module') -> Set[str]: + """Return the set of known cluster IDs.""" + try: + clusters = set(available_clusters(mgr)) + except NoOrchestrator: + clusters = nfs_rados_configs(mgr.rados) + return clusters + + +def export_cluster_checker(func: FuncT) -> FuncT: + def cluster_check( + export: 'ExportMgr', + *args: Any, + **kwargs: Any + ) -> Tuple[int, str, str]: + """ + This method checks if cluster exists + """ + clusters = known_cluster_ids(export.mgr) + cluster_id: str = kwargs['cluster_id'] + log.debug("checking for %r in known nfs clusters: %r", + cluster_id, clusters) + if cluster_id not in clusters: + return -errno.ENOENT, "", "Cluster does not exist" + return func(export, *args, **kwargs) + return cast(FuncT, cluster_check) + + +def exception_handler( + exception_obj: Exception, + log_msg: str = "" +) -> Tuple[int, str, str]: + if log_msg: + log.exception(log_msg) + return getattr(exception_obj, 'errno', -1), "", str(exception_obj) + + +def _check_rados_notify(ioctx: Any, obj: str) -> None: + try: + ioctx.notify(obj) + except TimedOut: + log.exception("Ganesha timed out") + + +def normalize_path(path: str) -> str: + if path: + path = normpath(path.strip()) + if path[:2] == "//": + path = path[1:] + return path + + +class NFSRados: + def __init__(self, rados: 'Rados', namespace: str) -> None: + self.rados = rados + self.pool = POOL_NAME + self.namespace = namespace + + def _make_rados_url(self, obj: str) -> str: + return "rados://{}/{}/{}".format(self.pool, self.namespace, obj) + + def _create_url_block(self, obj_name: str) -> RawBlock: + return RawBlock('%url', values={'value': self._make_rados_url(obj_name)}) + + def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + ioctx.write_full(obj, conf_block.encode('utf-8')) + if not config_obj: + # Return after creating empty common config object + return + log.debug("write configuration into rados object %s/%s/%s", + self.pool, self.namespace, obj) + + # Add created obj url to common config obj + ioctx.append(config_obj, GaneshaConfParser.write_block( + self._create_url_block(obj)).encode('utf-8')) + _check_rados_notify(ioctx, config_obj) + log.debug("Added %s url to %s", obj, config_obj) + + def read_obj(self, obj: str) -> Optional[str]: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + try: + return ioctx.read(obj, 1048576).decode() + except ObjectNotFound: + return None + + def update_obj(self, conf_block: str, obj: str, config_obj: str, + should_notify: Optional[bool] = True) -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + ioctx.write_full(obj, conf_block.encode('utf-8')) + log.debug("write configuration into rados object %s/%s/%s", + self.pool, self.namespace, obj) + if should_notify: + _check_rados_notify(ioctx, config_obj) + log.debug("Update export %s in %s", obj, config_obj) + + def remove_obj(self, obj: str, config_obj: str) -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + export_urls = ioctx.read(config_obj) + url = '%url "{}"\n\n'.format(self._make_rados_url(obj)) + export_urls = export_urls.replace(url.encode('utf-8'), b'') + ioctx.remove_object(obj) + ioctx.write_full(config_obj, export_urls) + _check_rados_notify(ioctx, config_obj) + log.debug("Object deleted: %s", url) + + def remove_all_obj(self) -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + for obj in ioctx.list_objects(): + obj.remove() + + def check_user_config(self) -> bool: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + for obj in ioctx.list_objects(): + if obj.key.startswith(USER_CONF_PREFIX): + return True + return False + + +def nfs_rados_configs(rados: 'Rados', nfs_pool: str = POOL_NAME) -> Set[str]: + """Return a set of all the namespaces in the nfs_pool where nfs + configuration objects are found. The namespaces also correspond + to the cluster ids. + """ + ns: Set[str] = set() + prefixes = (EXPORT_PREFIX, CONF_PREFIX, USER_CONF_PREFIX) + with rados.open_ioctx(nfs_pool) as ioctx: + ioctx.set_namespace(LIBRADOS_ALL_NSPACES) + for obj in ioctx.list_objects(): + if obj.key.startswith(prefixes): + ns.add(obj.nspace) + return ns + + +class ExportMgr: + def __init__( + self, + mgr: 'Module', + export_ls: Optional[Dict[str, List[Export]]] = None + ) -> None: + self.mgr = mgr + self.rados_pool = POOL_NAME + self._exports: Optional[Dict[str, List[Export]]] = export_ls + + @property + def exports(self) -> Dict[str, List[Export]]: + if self._exports is None: + self._exports = {} + log.info("Begin export parsing") + for cluster_id in known_cluster_ids(self.mgr): + self.export_conf_objs = [] # type: List[Export] + self._read_raw_config(cluster_id) + self._exports[cluster_id] = self.export_conf_objs + log.info("Exports parsed successfully %s", self.exports.items()) + return self._exports + + def _fetch_export( + self, + cluster_id: str, + pseudo_path: str + ) -> Optional[Export]: + try: + for ex in self.exports[cluster_id]: + if ex.pseudo == pseudo_path: + return ex + return None + except KeyError: + log.info('no exports for cluster %s', cluster_id) + return None + + def _fetch_export_id( + self, + cluster_id: str, + export_id: int + ) -> Optional[Export]: + try: + for ex in self.exports[cluster_id]: + if ex.export_id == export_id: + return ex + return None + except KeyError: + log.info(f'no exports for cluster {cluster_id}') + return None + + def _delete_export_user(self, export: Export) -> None: + if isinstance(export.fsal, CephFSFSAL): + assert export.fsal.user_id + self.mgr.check_mon_command({ + 'prefix': 'auth rm', + 'entity': 'client.{}'.format(export.fsal.user_id), + }) + log.info("Deleted export user %s", export.fsal.user_id) + elif isinstance(export.fsal, RGWFSAL): + # do nothing; we're using the bucket owner creds. + pass + + def _create_export_user(self, export: Export) -> None: + if isinstance(export.fsal, CephFSFSAL): + fsal = cast(CephFSFSAL, export.fsal) + assert fsal.fs_name + fsal.user_id = f"nfs.{export.cluster_id}.{export.export_id}" + fsal.cephx_key = self._create_user_key( + export.cluster_id, fsal.user_id, export.path, fsal.fs_name + ) + log.debug("Successfully created user %s for cephfs path %s", fsal.user_id, export.path) + + elif isinstance(export.fsal, RGWFSAL): + rgwfsal = cast(RGWFSAL, export.fsal) + if not rgwfsal.user_id: + assert export.path + ret, out, err = self.mgr.tool_exec( + ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path] + ) + if ret: + raise NFSException(f'Failed to fetch owner for bucket {export.path}') + j = json.loads(out) + owner = j.get('owner', '') + rgwfsal.user_id = owner + assert rgwfsal.user_id + ret, out, err = self.mgr.tool_exec([ + 'radosgw-admin', 'user', 'info', '--uid', rgwfsal.user_id + ]) + if ret: + raise NFSException( + f'Failed to fetch key for bucket {export.path} owner {rgwfsal.user_id}' + ) + j = json.loads(out) + + # FIXME: make this more tolerate of unexpected output? + rgwfsal.access_key_id = j['keys'][0]['access_key'] + rgwfsal.secret_access_key = j['keys'][0]['secret_key'] + log.debug("Successfully fetched user %s for RGW path %s", rgwfsal.user_id, export.path) + + def _gen_export_id(self, cluster_id: str) -> int: + exports = sorted([ex.export_id for ex in self.exports[cluster_id]]) + nid = 1 + for e_id in exports: + if e_id == nid: + nid += 1 + else: + break + return nid + + def _read_raw_config(self, rados_namespace: str) -> None: + with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx: + ioctx.set_namespace(rados_namespace) + for obj in ioctx.list_objects(): + if obj.key.startswith(EXPORT_PREFIX): + size, _ = obj.stat() + raw_config = obj.read(size) + raw_config = raw_config.decode("utf-8") + log.debug("read export configuration from rados " + "object %s/%s/%s", self.rados_pool, + rados_namespace, obj.key) + self.export_conf_objs.append(Export.from_export_block( + GaneshaConfParser(raw_config).parse()[0], rados_namespace)) + + def _save_export(self, cluster_id: str, export: Export) -> None: + self.exports[cluster_id].append(export) + self._rados(cluster_id).write_obj( + GaneshaConfParser.write_block(export.to_export_block()), + export_obj_name(export.export_id), + conf_obj_name(export.cluster_id) + ) + + def _delete_export( + self, + cluster_id: str, + pseudo_path: Optional[str], + export_obj: Optional[Export] = None + ) -> Tuple[int, str, str]: + try: + if export_obj: + export: Optional[Export] = export_obj + else: + assert pseudo_path + export = self._fetch_export(cluster_id, pseudo_path) + + if export: + if pseudo_path: + self._rados(cluster_id).remove_obj( + export_obj_name(export.export_id), conf_obj_name(cluster_id)) + self.exports[cluster_id].remove(export) + self._delete_export_user(export) + if not self.exports[cluster_id]: + del self.exports[cluster_id] + log.debug("Deleted all exports for cluster %s", cluster_id) + return 0, "Successfully deleted export", "" + return 0, "", "Export does not exist" + except Exception as e: + return exception_handler(e, f"Failed to delete {pseudo_path} export for {cluster_id}") + + def _fetch_export_obj(self, cluster_id: str, ex_id: int) -> Optional[Export]: + try: + with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx: + ioctx.set_namespace(cluster_id) + export = Export.from_export_block( + GaneshaConfParser( + ioctx.read(export_obj_name(ex_id)).decode("utf-8") + ).parse()[0], + cluster_id + ) + return export + except ObjectNotFound: + log.exception("Export ID: %s not found", ex_id) + return None + + def _update_export(self, cluster_id: str, export: Export, + need_nfs_service_restart: bool) -> None: + self.exports[cluster_id].append(export) + self._rados(cluster_id).update_obj( + GaneshaConfParser.write_block(export.to_export_block()), + export_obj_name(export.export_id), conf_obj_name(export.cluster_id), + should_notify=not need_nfs_service_restart) + if need_nfs_service_restart: + restart_nfs_service(self.mgr, export.cluster_id) + + @export_cluster_checker + def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Tuple[int, str, str]: + # if addr(s) are provided, construct client list and adjust outer block + clients = [] + if addr: + clients = [{ + 'addresses': addr, + 'access_type': 'ro' if kwargs['read_only'] else 'rw', + 'squash': kwargs['squash'], + }] + kwargs['squash'] = 'none' + kwargs['clients'] = clients + + if clients: + kwargs['access_type'] = "none" + elif kwargs['read_only']: + kwargs['access_type'] = "RO" + else: + kwargs['access_type'] = "RW" + + if kwargs['cluster_id'] not in self.exports: + self.exports[kwargs['cluster_id']] = [] + + try: + fsal_type = kwargs.pop('fsal_type') + if fsal_type == 'cephfs': + return self.create_cephfs_export(**kwargs) + if fsal_type == 'rgw': + return self.create_rgw_export(**kwargs) + raise NotImplementedError() + except Exception as e: + return exception_handler(e, f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}") + + @export_cluster_checker + def delete_export(self, + cluster_id: str, + pseudo_path: str) -> Tuple[int, str, str]: + return self._delete_export(cluster_id, pseudo_path) + + def delete_all_exports(self, cluster_id: str) -> None: + try: + export_list = list(self.exports[cluster_id]) + except KeyError: + log.info("No exports to delete") + return + for export in export_list: + ret, out, err = self._delete_export(cluster_id=cluster_id, pseudo_path=None, + export_obj=export) + if ret != 0: + raise NFSException(f"Failed to delete exports: {err} and {ret}") + log.info("All exports successfully deleted for cluster id: %s", cluster_id) + + def list_all_exports(self) -> List[Dict[str, Any]]: + r = [] + for cluster_id, ls in self.exports.items(): + r.extend([e.to_dict() for e in ls]) + return r + + @export_cluster_checker + def list_exports(self, + cluster_id: str, + detailed: bool = False) -> Tuple[int, str, str]: + try: + if detailed: + result_d = [export.to_dict() for export in self.exports[cluster_id]] + return 0, json.dumps(result_d, indent=2), '' + else: + result_ps = [export.pseudo for export in self.exports[cluster_id]] + return 0, json.dumps(result_ps, indent=2), '' + + except KeyError: + log.warning("No exports to list for %s", cluster_id) + return 0, '', '' + except Exception as e: + return exception_handler(e, f"Failed to list exports for {cluster_id}") + + def _get_export_dict(self, cluster_id: str, pseudo_path: str) -> Optional[Dict[str, Any]]: + export = self._fetch_export(cluster_id, pseudo_path) + if export: + return export.to_dict() + log.warning(f"No {pseudo_path} export to show for {cluster_id}") + return None + + @export_cluster_checker + def get_export( + self, + cluster_id: str, + pseudo_path: str, + ) -> Tuple[int, str, str]: + try: + export_dict = self._get_export_dict(cluster_id, pseudo_path) + if export_dict: + return 0, json.dumps(export_dict, indent=2), '' + log.warning("No %s export to show for %s", pseudo_path, cluster_id) + return 0, '', '' + except Exception as e: + return exception_handler(e, f"Failed to get {pseudo_path} export for {cluster_id}") + + def get_export_by_id( + self, + cluster_id: str, + export_id: int + ) -> Optional[Dict[str, Any]]: + export = self._fetch_export_id(cluster_id, export_id) + return export.to_dict() if export else None + + def get_export_by_pseudo( + self, + cluster_id: str, + pseudo_path: str + ) -> Optional[Dict[str, Any]]: + export = self._fetch_export(cluster_id, pseudo_path) + return export.to_dict() if export else None + + def apply_export(self, cluster_id: str, export_config: str) -> Tuple[int, str, str]: + try: + if not export_config: + raise NFSInvalidOperation("Empty Config!!") + try: + j = json.loads(export_config) + except ValueError: + # okay, not JSON. is it an EXPORT block? + try: + blocks = GaneshaConfParser(export_config).parse() + exports = [ + Export.from_export_block(block, cluster_id) + for block in blocks + ] + j = [export.to_dict() for export in exports] + except Exception as ex: + raise NFSInvalidOperation(f"Input must be JSON or a ganesha EXPORT block: {ex}") + + # check export type + if isinstance(j, list): + ret, out, err = (0, '', '') + for export in j: + try: + r, o, e = self._apply_export(cluster_id, export) + except Exception as ex: + r, o, e = exception_handler(ex, f'Failed to apply export: {ex}') + if r: + ret = r + if o: + out += o + '\n' + if e: + err += e + '\n' + return ret, out, err + else: + r, o, e = self._apply_export(cluster_id, j) + return r, o, e + except NotImplementedError: + return 0, " Manual Restart of NFS PODS required for successful update of exports", "" + except Exception as e: + return exception_handler(e, f'Failed to update export: {e}') + + def _update_user_id( + self, + cluster_id: str, + path: str, + fs_name: str, + user_id: str + ) -> None: + osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format( + self.rados_pool, cluster_id, fs_name) + # NFS-Ganesha can dynamically enforce an export's access type changes, but Ceph server + # daemons can't dynamically enforce changes in Ceph user caps of the Ceph clients. To + # allow dynamic updates of CephFS NFS exports, always set FSAL Ceph user's MDS caps with + # path restricted read-write access. Rely on the ganesha servers to enforce the export + # access type requested for the NFS clients. + self.mgr.check_mon_command({ + 'prefix': 'auth caps', + 'entity': f'client.{user_id}', + 'caps': ['mon', 'allow r', 'osd', osd_cap, 'mds', 'allow rw path={}'.format(path)], + }) + + log.info("Export user updated %s", user_id) + + def _create_user_key( + self, + cluster_id: str, + entity: str, + path: str, + fs_name: str, + ) -> str: + osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format( + self.rados_pool, cluster_id, fs_name) + nfs_caps = [ + 'mon', 'allow r', + 'osd', osd_cap, + 'mds', 'allow rw path={}'.format(path) + ] + + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth get-or-create', + 'entity': 'client.{}'.format(entity), + 'caps': nfs_caps, + 'format': 'json', + }) + if ret == -errno.EINVAL and 'does not match' in err: + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth caps', + 'entity': 'client.{}'.format(entity), + 'caps': nfs_caps, + 'format': 'json', + }) + if err: + raise NFSException(f'Failed to update caps for {entity}: {err}') + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth get', + 'entity': 'client.{}'.format(entity), + 'format': 'json', + }) + if err: + raise NFSException(f'Failed to fetch caps for {entity}: {err}') + + json_res = json.loads(out) + log.info("Export user created is %s", json_res[0]['entity']) + return json_res[0]['key'] + + def create_export_from_dict(self, + cluster_id: str, + ex_id: int, + ex_dict: Dict[str, Any]) -> Export: + pseudo_path = ex_dict.get("pseudo") + if not pseudo_path: + raise NFSInvalidOperation("export must specify pseudo path") + + path = ex_dict.get("path") + if path is None: + raise NFSInvalidOperation("export must specify path") + path = normalize_path(path) + + fsal = ex_dict.get("fsal", {}) + fsal_type = fsal.get("name") + if fsal_type == NFS_GANESHA_SUPPORTED_FSALS[1]: + if '/' in path and path != '/': + raise NFSInvalidOperation('"/" is not allowed in path with bucket name') + elif fsal_type == NFS_GANESHA_SUPPORTED_FSALS[0]: + fs_name = fsal.get("fs_name") + if not fs_name: + raise NFSInvalidOperation("export FSAL must specify fs_name") + if not check_fs(self.mgr, fs_name): + raise FSNotFound(fs_name) + + user_id = f"nfs.{cluster_id}.{ex_id}" + if "user_id" in fsal and fsal["user_id"] != user_id: + raise NFSInvalidOperation(f"export FSAL user_id must be '{user_id}'") + else: + raise NFSInvalidOperation(f"NFS Ganesha supported FSALs are {NFS_GANESHA_SUPPORTED_FSALS}." + "Export must specify any one of it.") + + ex_dict["fsal"] = fsal + ex_dict["cluster_id"] = cluster_id + export = Export.from_dict(ex_id, ex_dict) + export.validate(self.mgr) + log.debug("Successfully created %s export-%s from dict for cluster %s", + fsal_type, ex_id, cluster_id) + return export + + def create_cephfs_export(self, + fs_name: str, + cluster_id: str, + pseudo_path: str, + read_only: bool, + path: str, + squash: str, + access_type: str, + clients: list = []) -> Tuple[int, str, str]: + pseudo_path = normalize_path(pseudo_path) + + if not self._fetch_export(cluster_id, pseudo_path): + export = self.create_export_from_dict( + cluster_id, + self._gen_export_id(cluster_id), + { + "pseudo": pseudo_path, + "path": path, + "access_type": access_type, + "squash": squash, + "fsal": { + "name": NFS_GANESHA_SUPPORTED_FSALS[0], + "fs_name": fs_name, + }, + "clients": clients, + } + ) + log.debug("creating cephfs export %s", export) + self._create_export_user(export) + self._save_export(cluster_id, export) + result = { + "bind": export.pseudo, + "fs": fs_name, + "path": export.path, + "cluster": cluster_id, + "mode": export.access_type, + } + return (0, json.dumps(result, indent=4), '') + return 0, "", "Export already exists" + + def create_rgw_export(self, + cluster_id: str, + pseudo_path: str, + access_type: str, + read_only: bool, + squash: str, + bucket: Optional[str] = None, + user_id: Optional[str] = None, + clients: list = []) -> Tuple[int, str, str]: + pseudo_path = normalize_path(pseudo_path) + + if not bucket and not user_id: + return -errno.EINVAL, "", "Must specify either bucket or user_id" + + if not self._fetch_export(cluster_id, pseudo_path): + export = self.create_export_from_dict( + cluster_id, + self._gen_export_id(cluster_id), + { + "pseudo": pseudo_path, + "path": bucket or '/', + "access_type": access_type, + "squash": squash, + "fsal": { + "name": NFS_GANESHA_SUPPORTED_FSALS[1], + "user_id": user_id, + }, + "clients": clients, + } + ) + log.debug("creating rgw export %s", export) + self._create_export_user(export) + self._save_export(cluster_id, export) + result = { + "bind": export.pseudo, + "path": export.path, + "cluster": cluster_id, + "mode": export.access_type, + "squash": export.squash, + } + return (0, json.dumps(result, indent=4), '') + return 0, "", "Export already exists" + + def _apply_export( + self, + cluster_id: str, + new_export_dict: Dict, + ) -> Tuple[int, str, str]: + for k in ['path', 'pseudo']: + if k not in new_export_dict: + raise NFSInvalidOperation(f'Export missing required field {k}') + if cluster_id not in self.exports: + self.exports[cluster_id] = [] + + new_export_dict['path'] = normalize_path(new_export_dict['path']) + new_export_dict['pseudo'] = normalize_path(new_export_dict['pseudo']) + + old_export = self._fetch_export(cluster_id, new_export_dict['pseudo']) + if old_export: + # Check if export id matches + if new_export_dict.get('export_id'): + if old_export.export_id != new_export_dict.get('export_id'): + raise NFSInvalidOperation('Export ID changed, Cannot update export') + else: + new_export_dict['export_id'] = old_export.export_id + elif new_export_dict.get('export_id'): + old_export = self._fetch_export_obj(cluster_id, new_export_dict['export_id']) + if old_export: + # re-fetch via old pseudo + old_export = self._fetch_export(cluster_id, old_export.pseudo) + assert old_export + log.debug("export %s pseudo %s -> %s", + old_export.export_id, old_export.pseudo, new_export_dict['pseudo']) + + new_export = self.create_export_from_dict( + cluster_id, + new_export_dict.get('export_id', self._gen_export_id(cluster_id)), + new_export_dict + ) + + if not old_export: + self._create_export_user(new_export) + self._save_export(cluster_id, new_export) + return 0, f'Added export {new_export.pseudo}', '' + + need_nfs_service_restart = True + if old_export.fsal.name != new_export.fsal.name: + raise NFSInvalidOperation('FSAL change not allowed') + if old_export.pseudo != new_export.pseudo: + log.debug('export %s pseudo %s -> %s', + new_export.export_id, old_export.pseudo, new_export.pseudo) + + if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]: + old_fsal = cast(CephFSFSAL, old_export.fsal) + new_fsal = cast(CephFSFSAL, new_export.fsal) + if old_fsal.user_id != new_fsal.user_id: + self._delete_export_user(old_export) + self._create_export_user(new_export) + elif ( + old_export.path != new_export.path + or old_fsal.fs_name != new_fsal.fs_name + ): + self._update_user_id( + cluster_id, + new_export.path, + cast(str, new_fsal.fs_name), + cast(str, new_fsal.user_id) + ) + new_fsal.cephx_key = old_fsal.cephx_key + else: + expected_mds_caps = 'allow rw path={}'.format(new_export.path) + entity = new_fsal.user_id + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth get', + 'entity': 'client.{}'.format(entity), + 'format': 'json', + }) + if ret: + raise NFSException(f'Failed to fetch caps for {entity}: {err}') + actual_mds_caps = json.loads(out)[0]['caps'].get('mds') + if actual_mds_caps != expected_mds_caps: + self._update_user_id( + cluster_id, + new_export.path, + cast(str, new_fsal.fs_name), + cast(str, new_fsal.user_id) + ) + elif old_export.pseudo == new_export.pseudo: + need_nfs_service_restart = False + new_fsal.cephx_key = old_fsal.cephx_key + + if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]: + old_rgw_fsal = cast(RGWFSAL, old_export.fsal) + new_rgw_fsal = cast(RGWFSAL, new_export.fsal) + if old_rgw_fsal.user_id != new_rgw_fsal.user_id: + self._delete_export_user(old_export) + self._create_export_user(new_export) + elif old_rgw_fsal.access_key_id != new_rgw_fsal.access_key_id: + raise NFSInvalidOperation('access_key_id change is not allowed') + elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key: + raise NFSInvalidOperation('secret_access_key change is not allowed') + + self.exports[cluster_id].remove(old_export) + + self._update_export(cluster_id, new_export, need_nfs_service_restart) + + return 0, f"Updated export {new_export.pseudo}", "" + + def _rados(self, cluster_id: str) -> NFSRados: + """Return a new NFSRados object for the given cluster id.""" + return NFSRados(self.mgr.rados, cluster_id) -- cgit v1.2.3