diff options
Diffstat (limited to 'src/pybind/mgr/nfs/export.py')
-rw-r--r-- | src/pybind/mgr/nfs/export.py | 856 |
1 files changed, 856 insertions, 0 deletions
diff --git a/src/pybind/mgr/nfs/export.py b/src/pybind/mgr/nfs/export.py new file mode 100644 index 000000000..5887c898f --- /dev/null +++ b/src/pybind/mgr/nfs/export.py @@ -0,0 +1,856 @@ +import errno +import json +import logging +from typing import ( + List, + Any, + Dict, + Optional, + TYPE_CHECKING, + TypeVar, + Callable, + Set, + cast) +from os.path import normpath +import cephfs + +from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES + +from object_format import ErrorResponse +from orchestrator import NoOrchestrator +from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS + +from .ganesha_conf import ( + CephFSFSAL, + Export, + GaneshaConfParser, + RGWFSAL, + RawBlock, + format_block) +from .exception import NFSException, NFSInvalidOperation, FSNotFound, NFSObjectNotFound +from .utils import ( + CONF_PREFIX, + EXPORT_PREFIX, + NonFatalError, + USER_CONF_PREFIX, + export_obj_name, + conf_obj_name, + available_clusters, + check_fs, + restart_nfs_service, cephfs_path_is_dir) + +if TYPE_CHECKING: + from nfs.module import Module + +FuncT = TypeVar('FuncT', bound=Callable) + +log = logging.getLogger(__name__) + + +def known_cluster_ids(mgr: 'Module') -> Set[str]: + """Return the set of known cluster IDs.""" + try: + clusters = set(available_clusters(mgr)) + except NoOrchestrator: + clusters = nfs_rados_configs(mgr.rados) + return clusters + + +def _check_rados_notify(ioctx: Any, obj: str) -> None: + try: + ioctx.notify(obj) + except TimedOut: + log.exception("Ganesha timed out") + + +def normalize_path(path: str) -> str: + if path: + path = normpath(path.strip()) + if path[:2] == "//": + path = path[1:] + return path + + +class NFSRados: + def __init__(self, rados: 'Rados', namespace: str) -> None: + self.rados = rados + self.pool = POOL_NAME + self.namespace = namespace + + def _make_rados_url(self, obj: str) -> str: + return "rados://{}/{}/{}".format(self.pool, self.namespace, obj) + + def _create_url_block(self, obj_name: str) -> RawBlock: + return RawBlock('%url', values={'value': self._make_rados_url(obj_name)}) + + def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + ioctx.write_full(obj, conf_block.encode('utf-8')) + if not config_obj: + # Return after creating empty common config object + return + log.debug("write configuration into rados object %s/%s/%s", + self.pool, self.namespace, obj) + + # Add created obj url to common config obj + ioctx.append(config_obj, format_block( + self._create_url_block(obj)).encode('utf-8')) + _check_rados_notify(ioctx, config_obj) + log.debug("Added %s url to %s", obj, config_obj) + + def read_obj(self, obj: str) -> Optional[str]: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + try: + return ioctx.read(obj, 1048576).decode() + except ObjectNotFound: + return None + + def update_obj(self, conf_block: str, obj: str, config_obj: str, + should_notify: Optional[bool] = True) -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + ioctx.write_full(obj, conf_block.encode('utf-8')) + log.debug("write configuration into rados object %s/%s/%s", + self.pool, self.namespace, obj) + if should_notify: + _check_rados_notify(ioctx, config_obj) + log.debug("Update export %s in %s", obj, config_obj) + + def remove_obj(self, obj: str, config_obj: str) -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + export_urls = ioctx.read(config_obj) + url = '%url "{}"\n\n'.format(self._make_rados_url(obj)) + export_urls = export_urls.replace(url.encode('utf-8'), b'') + ioctx.remove_object(obj) + ioctx.write_full(config_obj, export_urls) + _check_rados_notify(ioctx, config_obj) + log.debug("Object deleted: %s", url) + + def remove_all_obj(self) -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + for obj in ioctx.list_objects(): + obj.remove() + + def check_user_config(self) -> bool: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + for obj in ioctx.list_objects(): + if obj.key.startswith(USER_CONF_PREFIX): + return True + return False + + +def nfs_rados_configs(rados: 'Rados', nfs_pool: str = POOL_NAME) -> Set[str]: + """Return a set of all the namespaces in the nfs_pool where nfs + configuration objects are found. The namespaces also correspond + to the cluster ids. + """ + ns: Set[str] = set() + prefixes = (EXPORT_PREFIX, CONF_PREFIX, USER_CONF_PREFIX) + with rados.open_ioctx(nfs_pool) as ioctx: + ioctx.set_namespace(LIBRADOS_ALL_NSPACES) + for obj in ioctx.list_objects(): + if obj.key.startswith(prefixes): + ns.add(obj.nspace) + return ns + + +class AppliedExportResults: + """Gathers the results of multiple changed exports. + Returned by apply_export. + """ + + def __init__(self) -> None: + self.changes: List[Dict[str, str]] = [] + self.has_error = False + + def append(self, value: Dict[str, str]) -> None: + if value.get("state", "") == "error": + self.has_error = True + self.changes.append(value) + + def to_simplified(self) -> List[Dict[str, str]]: + return self.changes + + def mgr_return_value(self) -> int: + return -errno.EIO if self.has_error else 0 + + +class ExportMgr: + def __init__( + self, + mgr: 'Module', + export_ls: Optional[Dict[str, List[Export]]] = None + ) -> None: + self.mgr = mgr + self.rados_pool = POOL_NAME + self._exports: Optional[Dict[str, List[Export]]] = export_ls + + @property + def exports(self) -> Dict[str, List[Export]]: + if self._exports is None: + self._exports = {} + log.info("Begin export parsing") + for cluster_id in known_cluster_ids(self.mgr): + self.export_conf_objs = [] # type: List[Export] + self._read_raw_config(cluster_id) + self._exports[cluster_id] = self.export_conf_objs + log.info("Exports parsed successfully %s", self.exports.items()) + return self._exports + + def _fetch_export( + self, + cluster_id: str, + pseudo_path: str + ) -> Optional[Export]: + try: + for ex in self.exports[cluster_id]: + if ex.pseudo == pseudo_path: + return ex + return None + except KeyError: + log.info('no exports for cluster %s', cluster_id) + return None + + def _fetch_export_id( + self, + cluster_id: str, + export_id: int + ) -> Optional[Export]: + try: + for ex in self.exports[cluster_id]: + if ex.export_id == export_id: + return ex + return None + except KeyError: + log.info(f'no exports for cluster {cluster_id}') + return None + + def _delete_export_user(self, export: Export) -> None: + if isinstance(export.fsal, CephFSFSAL): + assert export.fsal.user_id + self.mgr.check_mon_command({ + 'prefix': 'auth rm', + 'entity': 'client.{}'.format(export.fsal.user_id), + }) + log.info("Deleted export user %s", export.fsal.user_id) + elif isinstance(export.fsal, RGWFSAL): + # do nothing; we're using the bucket owner creds. + pass + + def _create_export_user(self, export: Export) -> None: + if isinstance(export.fsal, CephFSFSAL): + fsal = cast(CephFSFSAL, export.fsal) + assert fsal.fs_name + fsal.user_id = f"nfs.{export.cluster_id}.{export.export_id}" + fsal.cephx_key = self._create_user_key( + export.cluster_id, fsal.user_id, export.path, fsal.fs_name + ) + log.debug("Successfully created user %s for cephfs path %s", fsal.user_id, export.path) + + elif isinstance(export.fsal, RGWFSAL): + rgwfsal = cast(RGWFSAL, export.fsal) + if not rgwfsal.user_id: + assert export.path + ret, out, err = self.mgr.tool_exec( + ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path] + ) + if ret: + raise NFSException(f'Failed to fetch owner for bucket {export.path}') + j = json.loads(out) + owner = j.get('owner', '') + rgwfsal.user_id = owner + assert rgwfsal.user_id + ret, out, err = self.mgr.tool_exec([ + 'radosgw-admin', 'user', 'info', '--uid', rgwfsal.user_id + ]) + if ret: + raise NFSException( + f'Failed to fetch key for bucket {export.path} owner {rgwfsal.user_id}' + ) + j = json.loads(out) + + # FIXME: make this more tolerate of unexpected output? + rgwfsal.access_key_id = j['keys'][0]['access_key'] + rgwfsal.secret_access_key = j['keys'][0]['secret_key'] + log.debug("Successfully fetched user %s for RGW path %s", rgwfsal.user_id, export.path) + + def _gen_export_id(self, cluster_id: str) -> int: + exports = sorted([ex.export_id for ex in self.exports[cluster_id]]) + nid = 1 + for e_id in exports: + if e_id == nid: + nid += 1 + else: + break + return nid + + def _read_raw_config(self, rados_namespace: str) -> None: + with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx: + ioctx.set_namespace(rados_namespace) + for obj in ioctx.list_objects(): + if obj.key.startswith(EXPORT_PREFIX): + size, _ = obj.stat() + raw_config = obj.read(size) + raw_config = raw_config.decode("utf-8") + log.debug("read export configuration from rados " + "object %s/%s/%s", self.rados_pool, + rados_namespace, obj.key) + self.export_conf_objs.append(Export.from_export_block( + GaneshaConfParser(raw_config).parse()[0], rados_namespace)) + + def _save_export(self, cluster_id: str, export: Export) -> None: + self.exports[cluster_id].append(export) + self._rados(cluster_id).write_obj( + format_block(export.to_export_block()), + export_obj_name(export.export_id), + conf_obj_name(export.cluster_id) + ) + + def _delete_export( + self, + cluster_id: str, + pseudo_path: Optional[str], + export_obj: Optional[Export] = None + ) -> None: + try: + if export_obj: + export: Optional[Export] = export_obj + else: + assert pseudo_path + export = self._fetch_export(cluster_id, pseudo_path) + + if export: + if pseudo_path: + self._rados(cluster_id).remove_obj( + export_obj_name(export.export_id), conf_obj_name(cluster_id)) + self.exports[cluster_id].remove(export) + self._delete_export_user(export) + if not self.exports[cluster_id]: + del self.exports[cluster_id] + log.debug("Deleted all exports for cluster %s", cluster_id) + return None + raise NonFatalError("Export does not exist") + except Exception as e: + log.exception(f"Failed to delete {pseudo_path} export for {cluster_id}") + raise ErrorResponse.wrap(e) + + def _fetch_export_obj(self, cluster_id: str, ex_id: int) -> Optional[Export]: + try: + with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx: + ioctx.set_namespace(cluster_id) + export = Export.from_export_block( + GaneshaConfParser( + ioctx.read(export_obj_name(ex_id)).decode("utf-8") + ).parse()[0], + cluster_id + ) + return export + except ObjectNotFound: + log.exception("Export ID: %s not found", ex_id) + return None + + def _update_export(self, cluster_id: str, export: Export, + need_nfs_service_restart: bool) -> None: + self.exports[cluster_id].append(export) + self._rados(cluster_id).update_obj( + format_block(export.to_export_block()), + export_obj_name(export.export_id), conf_obj_name(export.cluster_id), + should_notify=not need_nfs_service_restart) + if need_nfs_service_restart: + restart_nfs_service(self.mgr, export.cluster_id) + + def _validate_cluster_id(self, cluster_id: str) -> None: + """Raise an exception if cluster_id is not valid.""" + clusters = known_cluster_ids(self.mgr) + log.debug("checking for %r in known nfs clusters: %r", + cluster_id, clusters) + if cluster_id not in clusters: + raise ErrorResponse(f"Cluster {cluster_id!r} does not exist", + return_value=-errno.ENOENT) + + def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Dict[str, Any]: + self._validate_cluster_id(kwargs['cluster_id']) + # if addr(s) are provided, construct client list and adjust outer block + clients = [] + if addr: + clients = [{ + 'addresses': addr, + 'access_type': 'ro' if kwargs['read_only'] else 'rw', + 'squash': kwargs['squash'], + }] + kwargs['squash'] = 'none' + kwargs['clients'] = clients + + if clients: + kwargs['access_type'] = "none" + elif kwargs['read_only']: + kwargs['access_type'] = "RO" + else: + kwargs['access_type'] = "RW" + + if kwargs['cluster_id'] not in self.exports: + self.exports[kwargs['cluster_id']] = [] + + try: + fsal_type = kwargs.pop('fsal_type') + if fsal_type == 'cephfs': + return self.create_cephfs_export(**kwargs) + if fsal_type == 'rgw': + return self.create_rgw_export(**kwargs) + raise NotImplementedError() + except Exception as e: + log.exception( + f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}") + raise ErrorResponse.wrap(e) + + def delete_export(self, + cluster_id: str, + pseudo_path: str) -> None: + self._validate_cluster_id(cluster_id) + return self._delete_export(cluster_id, pseudo_path) + + def delete_all_exports(self, cluster_id: str) -> None: + try: + export_list = list(self.exports[cluster_id]) + except KeyError: + log.info("No exports to delete") + return + for export in export_list: + try: + self._delete_export(cluster_id=cluster_id, pseudo_path=None, + export_obj=export) + except Exception as e: + raise NFSException(f"Failed to delete export {export.export_id}: {e}") + log.info("All exports successfully deleted for cluster id: %s", cluster_id) + + def list_all_exports(self) -> List[Dict[str, Any]]: + r = [] + for cluster_id, ls in self.exports.items(): + r.extend([e.to_dict() for e in ls]) + return r + + def list_exports(self, + cluster_id: str, + detailed: bool = False) -> List[Any]: + self._validate_cluster_id(cluster_id) + try: + if detailed: + result_d = [export.to_dict() for export in self.exports[cluster_id]] + return result_d + else: + result_ps = [export.pseudo for export in self.exports[cluster_id]] + return result_ps + + except KeyError: + log.warning("No exports to list for %s", cluster_id) + return [] + except Exception as e: + log.exception(f"Failed to list exports for {cluster_id}") + raise ErrorResponse.wrap(e) + + def _get_export_dict(self, cluster_id: str, pseudo_path: str) -> Optional[Dict[str, Any]]: + export = self._fetch_export(cluster_id, pseudo_path) + if export: + return export.to_dict() + log.warning(f"No {pseudo_path} export to show for {cluster_id}") + return None + + def get_export( + self, + cluster_id: str, + pseudo_path: str, + ) -> Dict[str, Any]: + self._validate_cluster_id(cluster_id) + try: + export_dict = self._get_export_dict(cluster_id, pseudo_path) + log.info(f"Fetched {export_dict!r} for {cluster_id!r}, {pseudo_path!r}") + return export_dict if export_dict else {} + except Exception as e: + log.exception(f"Failed to get {pseudo_path} export for {cluster_id}") + raise ErrorResponse.wrap(e) + + def get_export_by_id( + self, + cluster_id: str, + export_id: int + ) -> Optional[Dict[str, Any]]: + export = self._fetch_export_id(cluster_id, export_id) + return export.to_dict() if export else None + + def get_export_by_pseudo( + self, + cluster_id: str, + pseudo_path: str + ) -> Optional[Dict[str, Any]]: + export = self._fetch_export(cluster_id, pseudo_path) + return export.to_dict() if export else None + + # This method is used by the dashboard module (../dashboard/controllers/nfs.py) + # Do not change interface without updating the Dashboard code + def apply_export(self, cluster_id: str, export_config: str) -> AppliedExportResults: + try: + exports = self._read_export_config(cluster_id, export_config) + except Exception as e: + log.exception(f'Failed to update export: {e}') + raise ErrorResponse.wrap(e) + + aeresults = AppliedExportResults() + for export in exports: + aeresults.append(self._change_export(cluster_id, export)) + return aeresults + + def _read_export_config(self, cluster_id: str, export_config: str) -> List[Dict]: + if not export_config: + raise NFSInvalidOperation("Empty Config!!") + try: + j = json.loads(export_config) + except ValueError: + # okay, not JSON. is it an EXPORT block? + try: + blocks = GaneshaConfParser(export_config).parse() + exports = [ + Export.from_export_block(block, cluster_id) + for block in blocks + ] + j = [export.to_dict() for export in exports] + except Exception as ex: + raise NFSInvalidOperation(f"Input must be JSON or a ganesha EXPORT block: {ex}") + # check export type - always return a list + if isinstance(j, list): + return j # j is already a list object + return [j] # return a single object list, with j as the only item + + def _change_export(self, cluster_id: str, export: Dict) -> Dict[str, str]: + try: + return self._apply_export(cluster_id, export) + except NotImplementedError: + # in theory, the NotImplementedError here may be raised by a hook back to + # an orchestration module. If the orchestration module supports it the NFS + # servers may be restarted. If not supported the expectation is that an + # (unfortunately generic) NotImplementedError will be raised. We then + # indicate to the user that manual intervention may be needed now that the + # configuration changes have been applied. + return { + "pseudo": export['pseudo'], + "state": "warning", + "msg": "changes applied (Manual restart of NFS Pods required)", + } + except Exception as ex: + msg = f'Failed to apply export: {ex}' + log.exception(msg) + return {"state": "error", "msg": msg} + + def _update_user_id( + self, + cluster_id: str, + path: str, + fs_name: str, + user_id: str + ) -> None: + osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format( + self.rados_pool, cluster_id, fs_name) + # NFS-Ganesha can dynamically enforce an export's access type changes, but Ceph server + # daemons can't dynamically enforce changes in Ceph user caps of the Ceph clients. To + # allow dynamic updates of CephFS NFS exports, always set FSAL Ceph user's MDS caps with + # path restricted read-write access. Rely on the ganesha servers to enforce the export + # access type requested for the NFS clients. + self.mgr.check_mon_command({ + 'prefix': 'auth caps', + 'entity': f'client.{user_id}', + 'caps': ['mon', 'allow r', 'osd', osd_cap, 'mds', 'allow rw path={}'.format(path)], + }) + + log.info("Export user updated %s", user_id) + + def _create_user_key( + self, + cluster_id: str, + entity: str, + path: str, + fs_name: str, + ) -> str: + osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format( + self.rados_pool, cluster_id, fs_name) + nfs_caps = [ + 'mon', 'allow r', + 'osd', osd_cap, + 'mds', 'allow rw path={}'.format(path) + ] + + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth get-or-create', + 'entity': 'client.{}'.format(entity), + 'caps': nfs_caps, + 'format': 'json', + }) + if ret == -errno.EINVAL and 'does not match' in err: + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth caps', + 'entity': 'client.{}'.format(entity), + 'caps': nfs_caps, + 'format': 'json', + }) + if err: + raise NFSException(f'Failed to update caps for {entity}: {err}') + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth get', + 'entity': 'client.{}'.format(entity), + 'format': 'json', + }) + if err: + raise NFSException(f'Failed to fetch caps for {entity}: {err}') + + json_res = json.loads(out) + log.info("Export user created is %s", json_res[0]['entity']) + return json_res[0]['key'] + + def create_export_from_dict(self, + cluster_id: str, + ex_id: int, + ex_dict: Dict[str, Any]) -> Export: + pseudo_path = ex_dict.get("pseudo") + if not pseudo_path: + raise NFSInvalidOperation("export must specify pseudo path") + + path = ex_dict.get("path") + if path is None: + raise NFSInvalidOperation("export must specify path") + path = normalize_path(path) + + fsal = ex_dict.get("fsal", {}) + fsal_type = fsal.get("name") + if fsal_type == NFS_GANESHA_SUPPORTED_FSALS[1]: + if '/' in path and path != '/': + raise NFSInvalidOperation('"/" is not allowed in path with bucket name') + elif fsal_type == NFS_GANESHA_SUPPORTED_FSALS[0]: + fs_name = fsal.get("fs_name") + if not fs_name: + raise NFSInvalidOperation("export FSAL must specify fs_name") + if not check_fs(self.mgr, fs_name): + raise FSNotFound(fs_name) + + user_id = f"nfs.{cluster_id}.{ex_id}" + if "user_id" in fsal and fsal["user_id"] != user_id: + raise NFSInvalidOperation(f"export FSAL user_id must be '{user_id}'") + else: + raise NFSInvalidOperation(f"NFS Ganesha supported FSALs are {NFS_GANESHA_SUPPORTED_FSALS}." + "Export must specify any one of it.") + + ex_dict["fsal"] = fsal + ex_dict["cluster_id"] = cluster_id + export = Export.from_dict(ex_id, ex_dict) + export.validate(self.mgr) + log.debug("Successfully created %s export-%s from dict for cluster %s", + fsal_type, ex_id, cluster_id) + return export + + def create_cephfs_export(self, + fs_name: str, + cluster_id: str, + pseudo_path: str, + read_only: bool, + path: str, + squash: str, + access_type: str, + clients: list = [], + sectype: Optional[List[str]] = None) -> Dict[str, Any]: + + try: + cephfs_path_is_dir(self.mgr, fs_name, path) + except NotADirectoryError: + raise NFSException(f"path {path} is not a dir", -errno.ENOTDIR) + except cephfs.ObjectNotFound: + raise NFSObjectNotFound(f"path {path} does not exist") + except cephfs.Error as e: + raise NFSException(e.args[1], -e.args[0]) + + pseudo_path = normalize_path(pseudo_path) + + if not self._fetch_export(cluster_id, pseudo_path): + export = self.create_export_from_dict( + cluster_id, + self._gen_export_id(cluster_id), + { + "pseudo": pseudo_path, + "path": path, + "access_type": access_type, + "squash": squash, + "fsal": { + "name": NFS_GANESHA_SUPPORTED_FSALS[0], + "fs_name": fs_name, + }, + "clients": clients, + "sectype": sectype, + } + ) + log.debug("creating cephfs export %s", export) + self._create_export_user(export) + self._save_export(cluster_id, export) + result = { + "bind": export.pseudo, + "fs": fs_name, + "path": export.path, + "cluster": cluster_id, + "mode": export.access_type, + } + return result + raise NonFatalError("Export already exists") + + def create_rgw_export(self, + cluster_id: str, + pseudo_path: str, + access_type: str, + read_only: bool, + squash: str, + bucket: Optional[str] = None, + user_id: Optional[str] = None, + clients: list = [], + sectype: Optional[List[str]] = None) -> Dict[str, Any]: + pseudo_path = normalize_path(pseudo_path) + + if not bucket and not user_id: + raise ErrorResponse("Must specify either bucket or user_id") + + if not self._fetch_export(cluster_id, pseudo_path): + export = self.create_export_from_dict( + cluster_id, + self._gen_export_id(cluster_id), + { + "pseudo": pseudo_path, + "path": bucket or '/', + "access_type": access_type, + "squash": squash, + "fsal": { + "name": NFS_GANESHA_SUPPORTED_FSALS[1], + "user_id": user_id, + }, + "clients": clients, + "sectype": sectype, + } + ) + log.debug("creating rgw export %s", export) + self._create_export_user(export) + self._save_export(cluster_id, export) + result = { + "bind": export.pseudo, + "path": export.path, + "cluster": cluster_id, + "mode": export.access_type, + "squash": export.squash, + } + return result + raise NonFatalError("Export already exists") + + def _apply_export( + self, + cluster_id: str, + new_export_dict: Dict, + ) -> Dict[str, str]: + for k in ['path', 'pseudo']: + if k not in new_export_dict: + raise NFSInvalidOperation(f'Export missing required field {k}') + if cluster_id not in self.exports: + self.exports[cluster_id] = [] + + new_export_dict['path'] = normalize_path(new_export_dict['path']) + new_export_dict['pseudo'] = normalize_path(new_export_dict['pseudo']) + + old_export = self._fetch_export(cluster_id, new_export_dict['pseudo']) + if old_export: + # Check if export id matches + if new_export_dict.get('export_id'): + if old_export.export_id != new_export_dict.get('export_id'): + raise NFSInvalidOperation('Export ID changed, Cannot update export') + else: + new_export_dict['export_id'] = old_export.export_id + elif new_export_dict.get('export_id'): + old_export = self._fetch_export_obj(cluster_id, new_export_dict['export_id']) + if old_export: + # re-fetch via old pseudo + old_export = self._fetch_export(cluster_id, old_export.pseudo) + assert old_export + log.debug("export %s pseudo %s -> %s", + old_export.export_id, old_export.pseudo, new_export_dict['pseudo']) + + new_export = self.create_export_from_dict( + cluster_id, + new_export_dict.get('export_id', self._gen_export_id(cluster_id)), + new_export_dict + ) + + if not old_export: + self._create_export_user(new_export) + self._save_export(cluster_id, new_export) + return {"pseudo": new_export.pseudo, "state": "added"} + + need_nfs_service_restart = True + if old_export.fsal.name != new_export.fsal.name: + raise NFSInvalidOperation('FSAL change not allowed') + if old_export.pseudo != new_export.pseudo: + log.debug('export %s pseudo %s -> %s', + new_export.export_id, old_export.pseudo, new_export.pseudo) + + if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]: + old_fsal = cast(CephFSFSAL, old_export.fsal) + new_fsal = cast(CephFSFSAL, new_export.fsal) + if old_fsal.user_id != new_fsal.user_id: + self._delete_export_user(old_export) + self._create_export_user(new_export) + elif ( + old_export.path != new_export.path + or old_fsal.fs_name != new_fsal.fs_name + ): + self._update_user_id( + cluster_id, + new_export.path, + cast(str, new_fsal.fs_name), + cast(str, new_fsal.user_id) + ) + new_fsal.cephx_key = old_fsal.cephx_key + else: + expected_mds_caps = 'allow rw path={}'.format(new_export.path) + entity = new_fsal.user_id + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth get', + 'entity': 'client.{}'.format(entity), + 'format': 'json', + }) + if ret: + raise NFSException(f'Failed to fetch caps for {entity}: {err}') + actual_mds_caps = json.loads(out)[0]['caps'].get('mds') + if actual_mds_caps != expected_mds_caps: + self._update_user_id( + cluster_id, + new_export.path, + cast(str, new_fsal.fs_name), + cast(str, new_fsal.user_id) + ) + elif old_export.pseudo == new_export.pseudo: + need_nfs_service_restart = False + new_fsal.cephx_key = old_fsal.cephx_key + + if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]: + old_rgw_fsal = cast(RGWFSAL, old_export.fsal) + new_rgw_fsal = cast(RGWFSAL, new_export.fsal) + if old_rgw_fsal.user_id != new_rgw_fsal.user_id: + self._delete_export_user(old_export) + self._create_export_user(new_export) + elif old_rgw_fsal.access_key_id != new_rgw_fsal.access_key_id: + raise NFSInvalidOperation('access_key_id change is not allowed') + elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key: + raise NFSInvalidOperation('secret_access_key change is not allowed') + + self.exports[cluster_id].remove(old_export) + + self._update_export(cluster_id, new_export, need_nfs_service_restart) + + return {"pseudo": new_export.pseudo, "state": "updated"} + + def _rados(self, cluster_id: str) -> NFSRados: + """Return a new NFSRados object for the given cluster id.""" + return NFSRados(self.mgr.rados, cluster_id) |