From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/pybind/mgr/nfs/__init__.py | 7 + src/pybind/mgr/nfs/cluster.py | 309 +++++++++ src/pybind/mgr/nfs/exception.py | 32 + src/pybind/mgr/nfs/export.py | 856 +++++++++++++++++++++++++ src/pybind/mgr/nfs/ganesha_conf.py | 548 ++++++++++++++++ src/pybind/mgr/nfs/module.py | 189 ++++++ src/pybind/mgr/nfs/tests/__init__.py | 0 src/pybind/mgr/nfs/tests/test_nfs.py | 1156 ++++++++++++++++++++++++++++++++++ src/pybind/mgr/nfs/utils.py | 104 +++ 9 files changed, 3201 insertions(+) create mode 100644 src/pybind/mgr/nfs/__init__.py create mode 100644 src/pybind/mgr/nfs/cluster.py create mode 100644 src/pybind/mgr/nfs/exception.py create mode 100644 src/pybind/mgr/nfs/export.py create mode 100644 src/pybind/mgr/nfs/ganesha_conf.py create mode 100644 src/pybind/mgr/nfs/module.py create mode 100644 src/pybind/mgr/nfs/tests/__init__.py create mode 100644 src/pybind/mgr/nfs/tests/test_nfs.py create mode 100644 src/pybind/mgr/nfs/utils.py (limited to 'src/pybind/mgr/nfs') diff --git a/src/pybind/mgr/nfs/__init__.py b/src/pybind/mgr/nfs/__init__.py new file mode 100644 index 000000000..4e2257788 --- /dev/null +++ b/src/pybind/mgr/nfs/__init__.py @@ -0,0 +1,7 @@ +# flake8: noqa + +import os +if 'UNITTEST' in os.environ: + import tests + +from .module import Module diff --git a/src/pybind/mgr/nfs/cluster.py b/src/pybind/mgr/nfs/cluster.py new file mode 100644 index 000000000..d558a3a37 --- /dev/null +++ b/src/pybind/mgr/nfs/cluster.py @@ -0,0 +1,309 @@ +import ipaddress +import logging +import re +import socket +from typing import cast, Dict, List, Any, Union, Optional, TYPE_CHECKING + +from mgr_module import NFS_POOL_NAME as POOL_NAME +from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec, IngressSpec +from object_format import ErrorResponse + +import orchestrator +from orchestrator.module import IngressType + +from .exception import NFSInvalidOperation, ClusterNotFound +from .utils import ( + ManualRestartRequired, + NonFatalError, + available_clusters, + conf_obj_name, + restart_nfs_service, + user_conf_obj_name) +from .export import NFSRados + +if TYPE_CHECKING: + from nfs.module import Module + from mgr_module import MgrModule + + +log = logging.getLogger(__name__) + + +def resolve_ip(hostname: str) -> str: + try: + r = socket.getaddrinfo(hostname, None, flags=socket.AI_CANONNAME, + type=socket.SOCK_STREAM) + # pick first v4 IP, if present + for a in r: + if a[0] == socket.AF_INET: + return a[4][0] + return r[0][4][0] + except socket.gaierror as e: + raise NFSInvalidOperation(f"Cannot resolve IP for host {hostname}: {e}") + + +def create_ganesha_pool(mgr: 'MgrModule') -> None: + pool_list = [p['pool_name'] for p in mgr.get_osdmap().dump().get('pools', [])] + if POOL_NAME not in pool_list: + mgr.check_mon_command({'prefix': 'osd pool create', + 'pool': POOL_NAME, + 'yes_i_really_mean_it': True}) + mgr.check_mon_command({'prefix': 'osd pool application enable', + 'pool': POOL_NAME, + 'app': 'nfs'}) + log.debug("Successfully created nfs-ganesha pool %s", POOL_NAME) + + +class NFSCluster: + def __init__(self, mgr: 'Module') -> None: + self.mgr = mgr + + def _call_orch_apply_nfs( + self, + cluster_id: str, + placement: Optional[str] = None, + virtual_ip: Optional[str] = None, + ingress_mode: Optional[IngressType] = None, + port: Optional[int] = None, + ) -> None: + if not port: + port = 2049 # default nfs port + if virtual_ip: + # nfs + ingress + # run NFS on non-standard port + if not ingress_mode: + ingress_mode = IngressType.default + ingress_mode = ingress_mode.canonicalize() + pspec = PlacementSpec.from_string(placement) + if ingress_mode == IngressType.keepalive_only: + # enforce count=1 for nfs over keepalive only + pspec.count = 1 + + ganesha_port = 10000 + port # semi-arbitrary, fix me someday + frontend_port: Optional[int] = port + virtual_ip_for_ganesha: Optional[str] = None + keepalive_only: bool = False + enable_haproxy_protocol: bool = False + if ingress_mode == IngressType.haproxy_protocol: + enable_haproxy_protocol = True + elif ingress_mode == IngressType.keepalive_only: + keepalive_only = True + virtual_ip_for_ganesha = virtual_ip.split('/')[0] + ganesha_port = port + frontend_port = None + + spec = NFSServiceSpec(service_type='nfs', service_id=cluster_id, + placement=pspec, + # use non-default port so we don't conflict with ingress + port=ganesha_port, + virtual_ip=virtual_ip_for_ganesha, + enable_haproxy_protocol=enable_haproxy_protocol) + completion = self.mgr.apply_nfs(spec) + orchestrator.raise_if_exception(completion) + ispec = IngressSpec(service_type='ingress', + service_id='nfs.' + cluster_id, + backend_service='nfs.' + cluster_id, + placement=pspec, + frontend_port=frontend_port, + monitor_port=7000 + port, # semi-arbitrary, fix me someday + virtual_ip=virtual_ip, + keepalive_only=keepalive_only, + enable_haproxy_protocol=enable_haproxy_protocol) + completion = self.mgr.apply_ingress(ispec) + orchestrator.raise_if_exception(completion) + else: + # standalone nfs + spec = NFSServiceSpec(service_type='nfs', service_id=cluster_id, + placement=PlacementSpec.from_string(placement), + port=port) + completion = self.mgr.apply_nfs(spec) + orchestrator.raise_if_exception(completion) + log.debug("Successfully deployed nfs daemons with cluster id %s and placement %s", + cluster_id, placement) + + def create_empty_rados_obj(self, cluster_id: str) -> None: + common_conf = conf_obj_name(cluster_id) + self._rados(cluster_id).write_obj('', conf_obj_name(cluster_id)) + log.info("Created empty object:%s", common_conf) + + def delete_config_obj(self, cluster_id: str) -> None: + self._rados(cluster_id).remove_all_obj() + log.info("Deleted %s object and all objects in %s", + conf_obj_name(cluster_id), cluster_id) + + def create_nfs_cluster( + self, + cluster_id: str, + placement: Optional[str], + virtual_ip: Optional[str], + ingress: Optional[bool] = None, + ingress_mode: Optional[IngressType] = None, + port: Optional[int] = None, + ) -> None: + try: + if virtual_ip: + # validate virtual_ip value: ip_address throws a ValueError + # exception in case it's not a valid ipv4 or ipv6 address + ip = virtual_ip.split('/')[0] + ipaddress.ip_address(ip) + if virtual_ip and not ingress: + raise NFSInvalidOperation('virtual_ip can only be provided with ingress enabled') + if not virtual_ip and ingress: + raise NFSInvalidOperation('ingress currently requires a virtual_ip') + if ingress_mode and not ingress: + raise NFSInvalidOperation('--ingress-mode must be passed along with --ingress') + invalid_str = re.search('[^A-Za-z0-9-_.]', cluster_id) + if invalid_str: + raise NFSInvalidOperation(f"cluster id {cluster_id} is invalid. " + f"{invalid_str.group()} is char not permitted") + + create_ganesha_pool(self.mgr) + + self.create_empty_rados_obj(cluster_id) + + if cluster_id not in available_clusters(self.mgr): + self._call_orch_apply_nfs(cluster_id, placement, virtual_ip, ingress_mode, port) + return + raise NonFatalError(f"{cluster_id} cluster already exists") + except Exception as e: + log.exception(f"NFS Cluster {cluster_id} could not be created") + raise ErrorResponse.wrap(e) + + def delete_nfs_cluster(self, cluster_id: str) -> None: + try: + cluster_list = available_clusters(self.mgr) + if cluster_id in cluster_list: + self.mgr.export_mgr.delete_all_exports(cluster_id) + completion = self.mgr.remove_service('ingress.nfs.' + cluster_id) + orchestrator.raise_if_exception(completion) + completion = self.mgr.remove_service('nfs.' + cluster_id) + orchestrator.raise_if_exception(completion) + self.delete_config_obj(cluster_id) + return + raise NonFatalError("Cluster does not exist") + except Exception as e: + log.exception(f"Failed to delete NFS Cluster {cluster_id}") + raise ErrorResponse.wrap(e) + + def list_nfs_cluster(self) -> List[str]: + try: + return available_clusters(self.mgr) + except Exception as e: + log.exception("Failed to list NFS Cluster") + raise ErrorResponse.wrap(e) + + def _show_nfs_cluster_info(self, cluster_id: str) -> Dict[str, Any]: + completion = self.mgr.list_daemons(daemon_type='nfs') + # Here completion.result is a list DaemonDescription objects + clusters = orchestrator.raise_if_exception(completion) + backends: List[Dict[str, Union[Any]]] = [] + + for cluster in clusters: + if cluster_id == cluster.service_id(): + assert cluster.hostname + try: + if cluster.ip: + ip = cluster.ip + else: + c = self.mgr.get_hosts() + orchestrator.raise_if_exception(c) + hosts = [h for h in c.result or [] + if h.hostname == cluster.hostname] + if hosts: + ip = resolve_ip(hosts[0].addr) + else: + # sigh + ip = resolve_ip(cluster.hostname) + backends.append({ + "hostname": cluster.hostname, + "ip": ip, + "port": cluster.ports[0] if cluster.ports else None + }) + except orchestrator.OrchestratorError: + continue + + r: Dict[str, Any] = { + 'virtual_ip': None, + 'backend': backends, + } + sc = self.mgr.describe_service(service_type='ingress') + services = orchestrator.raise_if_exception(sc) + for i in services: + spec = cast(IngressSpec, i.spec) + if spec.backend_service == f'nfs.{cluster_id}': + r['virtual_ip'] = i.virtual_ip.split('/')[0] if i.virtual_ip else None + if i.ports: + r['port'] = i.ports[0] + if len(i.ports) > 1: + r['monitor_port'] = i.ports[1] + log.debug("Successfully fetched %s info: %s", cluster_id, r) + return r + + def show_nfs_cluster_info(self, cluster_id: Optional[str] = None) -> Dict[str, Any]: + try: + if cluster_id and cluster_id not in available_clusters(self.mgr): + raise ClusterNotFound() + info_res = {} + if cluster_id: + cluster_ls = [cluster_id] + else: + cluster_ls = available_clusters(self.mgr) + + for cluster_id in cluster_ls: + res = self._show_nfs_cluster_info(cluster_id) + if res: + info_res[cluster_id] = res + return info_res + except Exception as e: + log.exception("Failed to show info for cluster") + raise ErrorResponse.wrap(e) + + def get_nfs_cluster_config(self, cluster_id: str) -> str: + try: + if cluster_id in available_clusters(self.mgr): + rados_obj = self._rados(cluster_id) + conf = rados_obj.read_obj(user_conf_obj_name(cluster_id)) + return conf or "" + raise ClusterNotFound() + except Exception as e: + log.exception(f"Fetching NFS-Ganesha Config failed for {cluster_id}") + raise ErrorResponse.wrap(e) + + def set_nfs_cluster_config(self, cluster_id: str, nfs_config: str) -> None: + try: + if cluster_id in available_clusters(self.mgr): + rados_obj = self._rados(cluster_id) + if rados_obj.check_user_config(): + raise NonFatalError("NFS-Ganesha User Config already exists") + rados_obj.write_obj(nfs_config, user_conf_obj_name(cluster_id), + conf_obj_name(cluster_id)) + log.debug("Successfully saved %s's user config: \n %s", cluster_id, nfs_config) + restart_nfs_service(self.mgr, cluster_id) + return + raise ClusterNotFound() + except NotImplementedError: + raise ManualRestartRequired("NFS-Ganesha Config Added Successfully") + except Exception as e: + log.exception(f"Setting NFS-Ganesha Config failed for {cluster_id}") + raise ErrorResponse.wrap(e) + + def reset_nfs_cluster_config(self, cluster_id: str) -> None: + try: + if cluster_id in available_clusters(self.mgr): + rados_obj = self._rados(cluster_id) + if not rados_obj.check_user_config(): + raise NonFatalError("NFS-Ganesha User Config does not exist") + rados_obj.remove_obj(user_conf_obj_name(cluster_id), + conf_obj_name(cluster_id)) + restart_nfs_service(self.mgr, cluster_id) + return + raise ClusterNotFound() + except NotImplementedError: + raise ManualRestartRequired("NFS-Ganesha Config Removed Successfully") + except Exception as e: + log.exception(f"Resetting NFS-Ganesha Config failed for {cluster_id}") + raise ErrorResponse.wrap(e) + + def _rados(self, cluster_id: str) -> NFSRados: + """Return a new NFSRados object for the given cluster id.""" + return NFSRados(self.mgr.rados, cluster_id) diff --git a/src/pybind/mgr/nfs/exception.py b/src/pybind/mgr/nfs/exception.py new file mode 100644 index 000000000..6c6e3d9f3 --- /dev/null +++ b/src/pybind/mgr/nfs/exception.py @@ -0,0 +1,32 @@ +import errno +from typing import Optional + + +class NFSException(Exception): + def __init__(self, err_msg: str, errno: int = -1) -> None: + super(NFSException, self).__init__(errno, err_msg) + self.errno = errno + self.err_msg = err_msg + + def __str__(self) -> str: + return self.err_msg + + +class NFSInvalidOperation(NFSException): + def __init__(self, err_msg: str) -> None: + super(NFSInvalidOperation, self).__init__(err_msg, -errno.EINVAL) + + +class NFSObjectNotFound(NFSException): + def __init__(self, err_msg: str) -> None: + super(NFSObjectNotFound, self).__init__(err_msg, -errno.ENOENT) + + +class FSNotFound(NFSObjectNotFound): + def __init__(self, fs_name: Optional[str]) -> None: + super(FSNotFound, self).__init__(f'filesystem {fs_name} not found') + + +class ClusterNotFound(NFSObjectNotFound): + def __init__(self) -> None: + super(ClusterNotFound, self).__init__('cluster does not exist') diff --git a/src/pybind/mgr/nfs/export.py b/src/pybind/mgr/nfs/export.py new file mode 100644 index 000000000..5887c898f --- /dev/null +++ b/src/pybind/mgr/nfs/export.py @@ -0,0 +1,856 @@ +import errno +import json +import logging +from typing import ( + List, + Any, + Dict, + Optional, + TYPE_CHECKING, + TypeVar, + Callable, + Set, + cast) +from os.path import normpath +import cephfs + +from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES + +from object_format import ErrorResponse +from orchestrator import NoOrchestrator +from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS + +from .ganesha_conf import ( + CephFSFSAL, + Export, + GaneshaConfParser, + RGWFSAL, + RawBlock, + format_block) +from .exception import NFSException, NFSInvalidOperation, FSNotFound, NFSObjectNotFound +from .utils import ( + CONF_PREFIX, + EXPORT_PREFIX, + NonFatalError, + USER_CONF_PREFIX, + export_obj_name, + conf_obj_name, + available_clusters, + check_fs, + restart_nfs_service, cephfs_path_is_dir) + +if TYPE_CHECKING: + from nfs.module import Module + +FuncT = TypeVar('FuncT', bound=Callable) + +log = logging.getLogger(__name__) + + +def known_cluster_ids(mgr: 'Module') -> Set[str]: + """Return the set of known cluster IDs.""" + try: + clusters = set(available_clusters(mgr)) + except NoOrchestrator: + clusters = nfs_rados_configs(mgr.rados) + return clusters + + +def _check_rados_notify(ioctx: Any, obj: str) -> None: + try: + ioctx.notify(obj) + except TimedOut: + log.exception("Ganesha timed out") + + +def normalize_path(path: str) -> str: + if path: + path = normpath(path.strip()) + if path[:2] == "//": + path = path[1:] + return path + + +class NFSRados: + def __init__(self, rados: 'Rados', namespace: str) -> None: + self.rados = rados + self.pool = POOL_NAME + self.namespace = namespace + + def _make_rados_url(self, obj: str) -> str: + return "rados://{}/{}/{}".format(self.pool, self.namespace, obj) + + def _create_url_block(self, obj_name: str) -> RawBlock: + return RawBlock('%url', values={'value': self._make_rados_url(obj_name)}) + + def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + ioctx.write_full(obj, conf_block.encode('utf-8')) + if not config_obj: + # Return after creating empty common config object + return + log.debug("write configuration into rados object %s/%s/%s", + self.pool, self.namespace, obj) + + # Add created obj url to common config obj + ioctx.append(config_obj, format_block( + self._create_url_block(obj)).encode('utf-8')) + _check_rados_notify(ioctx, config_obj) + log.debug("Added %s url to %s", obj, config_obj) + + def read_obj(self, obj: str) -> Optional[str]: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + try: + return ioctx.read(obj, 1048576).decode() + except ObjectNotFound: + return None + + def update_obj(self, conf_block: str, obj: str, config_obj: str, + should_notify: Optional[bool] = True) -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + ioctx.write_full(obj, conf_block.encode('utf-8')) + log.debug("write configuration into rados object %s/%s/%s", + self.pool, self.namespace, obj) + if should_notify: + _check_rados_notify(ioctx, config_obj) + log.debug("Update export %s in %s", obj, config_obj) + + def remove_obj(self, obj: str, config_obj: str) -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + export_urls = ioctx.read(config_obj) + url = '%url "{}"\n\n'.format(self._make_rados_url(obj)) + export_urls = export_urls.replace(url.encode('utf-8'), b'') + ioctx.remove_object(obj) + ioctx.write_full(config_obj, export_urls) + _check_rados_notify(ioctx, config_obj) + log.debug("Object deleted: %s", url) + + def remove_all_obj(self) -> None: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + for obj in ioctx.list_objects(): + obj.remove() + + def check_user_config(self) -> bool: + with self.rados.open_ioctx(self.pool) as ioctx: + ioctx.set_namespace(self.namespace) + for obj in ioctx.list_objects(): + if obj.key.startswith(USER_CONF_PREFIX): + return True + return False + + +def nfs_rados_configs(rados: 'Rados', nfs_pool: str = POOL_NAME) -> Set[str]: + """Return a set of all the namespaces in the nfs_pool where nfs + configuration objects are found. The namespaces also correspond + to the cluster ids. + """ + ns: Set[str] = set() + prefixes = (EXPORT_PREFIX, CONF_PREFIX, USER_CONF_PREFIX) + with rados.open_ioctx(nfs_pool) as ioctx: + ioctx.set_namespace(LIBRADOS_ALL_NSPACES) + for obj in ioctx.list_objects(): + if obj.key.startswith(prefixes): + ns.add(obj.nspace) + return ns + + +class AppliedExportResults: + """Gathers the results of multiple changed exports. + Returned by apply_export. + """ + + def __init__(self) -> None: + self.changes: List[Dict[str, str]] = [] + self.has_error = False + + def append(self, value: Dict[str, str]) -> None: + if value.get("state", "") == "error": + self.has_error = True + self.changes.append(value) + + def to_simplified(self) -> List[Dict[str, str]]: + return self.changes + + def mgr_return_value(self) -> int: + return -errno.EIO if self.has_error else 0 + + +class ExportMgr: + def __init__( + self, + mgr: 'Module', + export_ls: Optional[Dict[str, List[Export]]] = None + ) -> None: + self.mgr = mgr + self.rados_pool = POOL_NAME + self._exports: Optional[Dict[str, List[Export]]] = export_ls + + @property + def exports(self) -> Dict[str, List[Export]]: + if self._exports is None: + self._exports = {} + log.info("Begin export parsing") + for cluster_id in known_cluster_ids(self.mgr): + self.export_conf_objs = [] # type: List[Export] + self._read_raw_config(cluster_id) + self._exports[cluster_id] = self.export_conf_objs + log.info("Exports parsed successfully %s", self.exports.items()) + return self._exports + + def _fetch_export( + self, + cluster_id: str, + pseudo_path: str + ) -> Optional[Export]: + try: + for ex in self.exports[cluster_id]: + if ex.pseudo == pseudo_path: + return ex + return None + except KeyError: + log.info('no exports for cluster %s', cluster_id) + return None + + def _fetch_export_id( + self, + cluster_id: str, + export_id: int + ) -> Optional[Export]: + try: + for ex in self.exports[cluster_id]: + if ex.export_id == export_id: + return ex + return None + except KeyError: + log.info(f'no exports for cluster {cluster_id}') + return None + + def _delete_export_user(self, export: Export) -> None: + if isinstance(export.fsal, CephFSFSAL): + assert export.fsal.user_id + self.mgr.check_mon_command({ + 'prefix': 'auth rm', + 'entity': 'client.{}'.format(export.fsal.user_id), + }) + log.info("Deleted export user %s", export.fsal.user_id) + elif isinstance(export.fsal, RGWFSAL): + # do nothing; we're using the bucket owner creds. + pass + + def _create_export_user(self, export: Export) -> None: + if isinstance(export.fsal, CephFSFSAL): + fsal = cast(CephFSFSAL, export.fsal) + assert fsal.fs_name + fsal.user_id = f"nfs.{export.cluster_id}.{export.export_id}" + fsal.cephx_key = self._create_user_key( + export.cluster_id, fsal.user_id, export.path, fsal.fs_name + ) + log.debug("Successfully created user %s for cephfs path %s", fsal.user_id, export.path) + + elif isinstance(export.fsal, RGWFSAL): + rgwfsal = cast(RGWFSAL, export.fsal) + if not rgwfsal.user_id: + assert export.path + ret, out, err = self.mgr.tool_exec( + ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path] + ) + if ret: + raise NFSException(f'Failed to fetch owner for bucket {export.path}') + j = json.loads(out) + owner = j.get('owner', '') + rgwfsal.user_id = owner + assert rgwfsal.user_id + ret, out, err = self.mgr.tool_exec([ + 'radosgw-admin', 'user', 'info', '--uid', rgwfsal.user_id + ]) + if ret: + raise NFSException( + f'Failed to fetch key for bucket {export.path} owner {rgwfsal.user_id}' + ) + j = json.loads(out) + + # FIXME: make this more tolerate of unexpected output? + rgwfsal.access_key_id = j['keys'][0]['access_key'] + rgwfsal.secret_access_key = j['keys'][0]['secret_key'] + log.debug("Successfully fetched user %s for RGW path %s", rgwfsal.user_id, export.path) + + def _gen_export_id(self, cluster_id: str) -> int: + exports = sorted([ex.export_id for ex in self.exports[cluster_id]]) + nid = 1 + for e_id in exports: + if e_id == nid: + nid += 1 + else: + break + return nid + + def _read_raw_config(self, rados_namespace: str) -> None: + with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx: + ioctx.set_namespace(rados_namespace) + for obj in ioctx.list_objects(): + if obj.key.startswith(EXPORT_PREFIX): + size, _ = obj.stat() + raw_config = obj.read(size) + raw_config = raw_config.decode("utf-8") + log.debug("read export configuration from rados " + "object %s/%s/%s", self.rados_pool, + rados_namespace, obj.key) + self.export_conf_objs.append(Export.from_export_block( + GaneshaConfParser(raw_config).parse()[0], rados_namespace)) + + def _save_export(self, cluster_id: str, export: Export) -> None: + self.exports[cluster_id].append(export) + self._rados(cluster_id).write_obj( + format_block(export.to_export_block()), + export_obj_name(export.export_id), + conf_obj_name(export.cluster_id) + ) + + def _delete_export( + self, + cluster_id: str, + pseudo_path: Optional[str], + export_obj: Optional[Export] = None + ) -> None: + try: + if export_obj: + export: Optional[Export] = export_obj + else: + assert pseudo_path + export = self._fetch_export(cluster_id, pseudo_path) + + if export: + if pseudo_path: + self._rados(cluster_id).remove_obj( + export_obj_name(export.export_id), conf_obj_name(cluster_id)) + self.exports[cluster_id].remove(export) + self._delete_export_user(export) + if not self.exports[cluster_id]: + del self.exports[cluster_id] + log.debug("Deleted all exports for cluster %s", cluster_id) + return None + raise NonFatalError("Export does not exist") + except Exception as e: + log.exception(f"Failed to delete {pseudo_path} export for {cluster_id}") + raise ErrorResponse.wrap(e) + + def _fetch_export_obj(self, cluster_id: str, ex_id: int) -> Optional[Export]: + try: + with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx: + ioctx.set_namespace(cluster_id) + export = Export.from_export_block( + GaneshaConfParser( + ioctx.read(export_obj_name(ex_id)).decode("utf-8") + ).parse()[0], + cluster_id + ) + return export + except ObjectNotFound: + log.exception("Export ID: %s not found", ex_id) + return None + + def _update_export(self, cluster_id: str, export: Export, + need_nfs_service_restart: bool) -> None: + self.exports[cluster_id].append(export) + self._rados(cluster_id).update_obj( + format_block(export.to_export_block()), + export_obj_name(export.export_id), conf_obj_name(export.cluster_id), + should_notify=not need_nfs_service_restart) + if need_nfs_service_restart: + restart_nfs_service(self.mgr, export.cluster_id) + + def _validate_cluster_id(self, cluster_id: str) -> None: + """Raise an exception if cluster_id is not valid.""" + clusters = known_cluster_ids(self.mgr) + log.debug("checking for %r in known nfs clusters: %r", + cluster_id, clusters) + if cluster_id not in clusters: + raise ErrorResponse(f"Cluster {cluster_id!r} does not exist", + return_value=-errno.ENOENT) + + def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Dict[str, Any]: + self._validate_cluster_id(kwargs['cluster_id']) + # if addr(s) are provided, construct client list and adjust outer block + clients = [] + if addr: + clients = [{ + 'addresses': addr, + 'access_type': 'ro' if kwargs['read_only'] else 'rw', + 'squash': kwargs['squash'], + }] + kwargs['squash'] = 'none' + kwargs['clients'] = clients + + if clients: + kwargs['access_type'] = "none" + elif kwargs['read_only']: + kwargs['access_type'] = "RO" + else: + kwargs['access_type'] = "RW" + + if kwargs['cluster_id'] not in self.exports: + self.exports[kwargs['cluster_id']] = [] + + try: + fsal_type = kwargs.pop('fsal_type') + if fsal_type == 'cephfs': + return self.create_cephfs_export(**kwargs) + if fsal_type == 'rgw': + return self.create_rgw_export(**kwargs) + raise NotImplementedError() + except Exception as e: + log.exception( + f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}") + raise ErrorResponse.wrap(e) + + def delete_export(self, + cluster_id: str, + pseudo_path: str) -> None: + self._validate_cluster_id(cluster_id) + return self._delete_export(cluster_id, pseudo_path) + + def delete_all_exports(self, cluster_id: str) -> None: + try: + export_list = list(self.exports[cluster_id]) + except KeyError: + log.info("No exports to delete") + return + for export in export_list: + try: + self._delete_export(cluster_id=cluster_id, pseudo_path=None, + export_obj=export) + except Exception as e: + raise NFSException(f"Failed to delete export {export.export_id}: {e}") + log.info("All exports successfully deleted for cluster id: %s", cluster_id) + + def list_all_exports(self) -> List[Dict[str, Any]]: + r = [] + for cluster_id, ls in self.exports.items(): + r.extend([e.to_dict() for e in ls]) + return r + + def list_exports(self, + cluster_id: str, + detailed: bool = False) -> List[Any]: + self._validate_cluster_id(cluster_id) + try: + if detailed: + result_d = [export.to_dict() for export in self.exports[cluster_id]] + return result_d + else: + result_ps = [export.pseudo for export in self.exports[cluster_id]] + return result_ps + + except KeyError: + log.warning("No exports to list for %s", cluster_id) + return [] + except Exception as e: + log.exception(f"Failed to list exports for {cluster_id}") + raise ErrorResponse.wrap(e) + + def _get_export_dict(self, cluster_id: str, pseudo_path: str) -> Optional[Dict[str, Any]]: + export = self._fetch_export(cluster_id, pseudo_path) + if export: + return export.to_dict() + log.warning(f"No {pseudo_path} export to show for {cluster_id}") + return None + + def get_export( + self, + cluster_id: str, + pseudo_path: str, + ) -> Dict[str, Any]: + self._validate_cluster_id(cluster_id) + try: + export_dict = self._get_export_dict(cluster_id, pseudo_path) + log.info(f"Fetched {export_dict!r} for {cluster_id!r}, {pseudo_path!r}") + return export_dict if export_dict else {} + except Exception as e: + log.exception(f"Failed to get {pseudo_path} export for {cluster_id}") + raise ErrorResponse.wrap(e) + + def get_export_by_id( + self, + cluster_id: str, + export_id: int + ) -> Optional[Dict[str, Any]]: + export = self._fetch_export_id(cluster_id, export_id) + return export.to_dict() if export else None + + def get_export_by_pseudo( + self, + cluster_id: str, + pseudo_path: str + ) -> Optional[Dict[str, Any]]: + export = self._fetch_export(cluster_id, pseudo_path) + return export.to_dict() if export else None + + # This method is used by the dashboard module (../dashboard/controllers/nfs.py) + # Do not change interface without updating the Dashboard code + def apply_export(self, cluster_id: str, export_config: str) -> AppliedExportResults: + try: + exports = self._read_export_config(cluster_id, export_config) + except Exception as e: + log.exception(f'Failed to update export: {e}') + raise ErrorResponse.wrap(e) + + aeresults = AppliedExportResults() + for export in exports: + aeresults.append(self._change_export(cluster_id, export)) + return aeresults + + def _read_export_config(self, cluster_id: str, export_config: str) -> List[Dict]: + if not export_config: + raise NFSInvalidOperation("Empty Config!!") + try: + j = json.loads(export_config) + except ValueError: + # okay, not JSON. is it an EXPORT block? + try: + blocks = GaneshaConfParser(export_config).parse() + exports = [ + Export.from_export_block(block, cluster_id) + for block in blocks + ] + j = [export.to_dict() for export in exports] + except Exception as ex: + raise NFSInvalidOperation(f"Input must be JSON or a ganesha EXPORT block: {ex}") + # check export type - always return a list + if isinstance(j, list): + return j # j is already a list object + return [j] # return a single object list, with j as the only item + + def _change_export(self, cluster_id: str, export: Dict) -> Dict[str, str]: + try: + return self._apply_export(cluster_id, export) + except NotImplementedError: + # in theory, the NotImplementedError here may be raised by a hook back to + # an orchestration module. If the orchestration module supports it the NFS + # servers may be restarted. If not supported the expectation is that an + # (unfortunately generic) NotImplementedError will be raised. We then + # indicate to the user that manual intervention may be needed now that the + # configuration changes have been applied. + return { + "pseudo": export['pseudo'], + "state": "warning", + "msg": "changes applied (Manual restart of NFS Pods required)", + } + except Exception as ex: + msg = f'Failed to apply export: {ex}' + log.exception(msg) + return {"state": "error", "msg": msg} + + def _update_user_id( + self, + cluster_id: str, + path: str, + fs_name: str, + user_id: str + ) -> None: + osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format( + self.rados_pool, cluster_id, fs_name) + # NFS-Ganesha can dynamically enforce an export's access type changes, but Ceph server + # daemons can't dynamically enforce changes in Ceph user caps of the Ceph clients. To + # allow dynamic updates of CephFS NFS exports, always set FSAL Ceph user's MDS caps with + # path restricted read-write access. Rely on the ganesha servers to enforce the export + # access type requested for the NFS clients. + self.mgr.check_mon_command({ + 'prefix': 'auth caps', + 'entity': f'client.{user_id}', + 'caps': ['mon', 'allow r', 'osd', osd_cap, 'mds', 'allow rw path={}'.format(path)], + }) + + log.info("Export user updated %s", user_id) + + def _create_user_key( + self, + cluster_id: str, + entity: str, + path: str, + fs_name: str, + ) -> str: + osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format( + self.rados_pool, cluster_id, fs_name) + nfs_caps = [ + 'mon', 'allow r', + 'osd', osd_cap, + 'mds', 'allow rw path={}'.format(path) + ] + + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth get-or-create', + 'entity': 'client.{}'.format(entity), + 'caps': nfs_caps, + 'format': 'json', + }) + if ret == -errno.EINVAL and 'does not match' in err: + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth caps', + 'entity': 'client.{}'.format(entity), + 'caps': nfs_caps, + 'format': 'json', + }) + if err: + raise NFSException(f'Failed to update caps for {entity}: {err}') + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth get', + 'entity': 'client.{}'.format(entity), + 'format': 'json', + }) + if err: + raise NFSException(f'Failed to fetch caps for {entity}: {err}') + + json_res = json.loads(out) + log.info("Export user created is %s", json_res[0]['entity']) + return json_res[0]['key'] + + def create_export_from_dict(self, + cluster_id: str, + ex_id: int, + ex_dict: Dict[str, Any]) -> Export: + pseudo_path = ex_dict.get("pseudo") + if not pseudo_path: + raise NFSInvalidOperation("export must specify pseudo path") + + path = ex_dict.get("path") + if path is None: + raise NFSInvalidOperation("export must specify path") + path = normalize_path(path) + + fsal = ex_dict.get("fsal", {}) + fsal_type = fsal.get("name") + if fsal_type == NFS_GANESHA_SUPPORTED_FSALS[1]: + if '/' in path and path != '/': + raise NFSInvalidOperation('"/" is not allowed in path with bucket name') + elif fsal_type == NFS_GANESHA_SUPPORTED_FSALS[0]: + fs_name = fsal.get("fs_name") + if not fs_name: + raise NFSInvalidOperation("export FSAL must specify fs_name") + if not check_fs(self.mgr, fs_name): + raise FSNotFound(fs_name) + + user_id = f"nfs.{cluster_id}.{ex_id}" + if "user_id" in fsal and fsal["user_id"] != user_id: + raise NFSInvalidOperation(f"export FSAL user_id must be '{user_id}'") + else: + raise NFSInvalidOperation(f"NFS Ganesha supported FSALs are {NFS_GANESHA_SUPPORTED_FSALS}." + "Export must specify any one of it.") + + ex_dict["fsal"] = fsal + ex_dict["cluster_id"] = cluster_id + export = Export.from_dict(ex_id, ex_dict) + export.validate(self.mgr) + log.debug("Successfully created %s export-%s from dict for cluster %s", + fsal_type, ex_id, cluster_id) + return export + + def create_cephfs_export(self, + fs_name: str, + cluster_id: str, + pseudo_path: str, + read_only: bool, + path: str, + squash: str, + access_type: str, + clients: list = [], + sectype: Optional[List[str]] = None) -> Dict[str, Any]: + + try: + cephfs_path_is_dir(self.mgr, fs_name, path) + except NotADirectoryError: + raise NFSException(f"path {path} is not a dir", -errno.ENOTDIR) + except cephfs.ObjectNotFound: + raise NFSObjectNotFound(f"path {path} does not exist") + except cephfs.Error as e: + raise NFSException(e.args[1], -e.args[0]) + + pseudo_path = normalize_path(pseudo_path) + + if not self._fetch_export(cluster_id, pseudo_path): + export = self.create_export_from_dict( + cluster_id, + self._gen_export_id(cluster_id), + { + "pseudo": pseudo_path, + "path": path, + "access_type": access_type, + "squash": squash, + "fsal": { + "name": NFS_GANESHA_SUPPORTED_FSALS[0], + "fs_name": fs_name, + }, + "clients": clients, + "sectype": sectype, + } + ) + log.debug("creating cephfs export %s", export) + self._create_export_user(export) + self._save_export(cluster_id, export) + result = { + "bind": export.pseudo, + "fs": fs_name, + "path": export.path, + "cluster": cluster_id, + "mode": export.access_type, + } + return result + raise NonFatalError("Export already exists") + + def create_rgw_export(self, + cluster_id: str, + pseudo_path: str, + access_type: str, + read_only: bool, + squash: str, + bucket: Optional[str] = None, + user_id: Optional[str] = None, + clients: list = [], + sectype: Optional[List[str]] = None) -> Dict[str, Any]: + pseudo_path = normalize_path(pseudo_path) + + if not bucket and not user_id: + raise ErrorResponse("Must specify either bucket or user_id") + + if not self._fetch_export(cluster_id, pseudo_path): + export = self.create_export_from_dict( + cluster_id, + self._gen_export_id(cluster_id), + { + "pseudo": pseudo_path, + "path": bucket or '/', + "access_type": access_type, + "squash": squash, + "fsal": { + "name": NFS_GANESHA_SUPPORTED_FSALS[1], + "user_id": user_id, + }, + "clients": clients, + "sectype": sectype, + } + ) + log.debug("creating rgw export %s", export) + self._create_export_user(export) + self._save_export(cluster_id, export) + result = { + "bind": export.pseudo, + "path": export.path, + "cluster": cluster_id, + "mode": export.access_type, + "squash": export.squash, + } + return result + raise NonFatalError("Export already exists") + + def _apply_export( + self, + cluster_id: str, + new_export_dict: Dict, + ) -> Dict[str, str]: + for k in ['path', 'pseudo']: + if k not in new_export_dict: + raise NFSInvalidOperation(f'Export missing required field {k}') + if cluster_id not in self.exports: + self.exports[cluster_id] = [] + + new_export_dict['path'] = normalize_path(new_export_dict['path']) + new_export_dict['pseudo'] = normalize_path(new_export_dict['pseudo']) + + old_export = self._fetch_export(cluster_id, new_export_dict['pseudo']) + if old_export: + # Check if export id matches + if new_export_dict.get('export_id'): + if old_export.export_id != new_export_dict.get('export_id'): + raise NFSInvalidOperation('Export ID changed, Cannot update export') + else: + new_export_dict['export_id'] = old_export.export_id + elif new_export_dict.get('export_id'): + old_export = self._fetch_export_obj(cluster_id, new_export_dict['export_id']) + if old_export: + # re-fetch via old pseudo + old_export = self._fetch_export(cluster_id, old_export.pseudo) + assert old_export + log.debug("export %s pseudo %s -> %s", + old_export.export_id, old_export.pseudo, new_export_dict['pseudo']) + + new_export = self.create_export_from_dict( + cluster_id, + new_export_dict.get('export_id', self._gen_export_id(cluster_id)), + new_export_dict + ) + + if not old_export: + self._create_export_user(new_export) + self._save_export(cluster_id, new_export) + return {"pseudo": new_export.pseudo, "state": "added"} + + need_nfs_service_restart = True + if old_export.fsal.name != new_export.fsal.name: + raise NFSInvalidOperation('FSAL change not allowed') + if old_export.pseudo != new_export.pseudo: + log.debug('export %s pseudo %s -> %s', + new_export.export_id, old_export.pseudo, new_export.pseudo) + + if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]: + old_fsal = cast(CephFSFSAL, old_export.fsal) + new_fsal = cast(CephFSFSAL, new_export.fsal) + if old_fsal.user_id != new_fsal.user_id: + self._delete_export_user(old_export) + self._create_export_user(new_export) + elif ( + old_export.path != new_export.path + or old_fsal.fs_name != new_fsal.fs_name + ): + self._update_user_id( + cluster_id, + new_export.path, + cast(str, new_fsal.fs_name), + cast(str, new_fsal.user_id) + ) + new_fsal.cephx_key = old_fsal.cephx_key + else: + expected_mds_caps = 'allow rw path={}'.format(new_export.path) + entity = new_fsal.user_id + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth get', + 'entity': 'client.{}'.format(entity), + 'format': 'json', + }) + if ret: + raise NFSException(f'Failed to fetch caps for {entity}: {err}') + actual_mds_caps = json.loads(out)[0]['caps'].get('mds') + if actual_mds_caps != expected_mds_caps: + self._update_user_id( + cluster_id, + new_export.path, + cast(str, new_fsal.fs_name), + cast(str, new_fsal.user_id) + ) + elif old_export.pseudo == new_export.pseudo: + need_nfs_service_restart = False + new_fsal.cephx_key = old_fsal.cephx_key + + if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]: + old_rgw_fsal = cast(RGWFSAL, old_export.fsal) + new_rgw_fsal = cast(RGWFSAL, new_export.fsal) + if old_rgw_fsal.user_id != new_rgw_fsal.user_id: + self._delete_export_user(old_export) + self._create_export_user(new_export) + elif old_rgw_fsal.access_key_id != new_rgw_fsal.access_key_id: + raise NFSInvalidOperation('access_key_id change is not allowed') + elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key: + raise NFSInvalidOperation('secret_access_key change is not allowed') + + self.exports[cluster_id].remove(old_export) + + self._update_export(cluster_id, new_export, need_nfs_service_restart) + + return {"pseudo": new_export.pseudo, "state": "updated"} + + def _rados(self, cluster_id: str) -> NFSRados: + """Return a new NFSRados object for the given cluster id.""" + return NFSRados(self.mgr.rados, cluster_id) diff --git a/src/pybind/mgr/nfs/ganesha_conf.py b/src/pybind/mgr/nfs/ganesha_conf.py new file mode 100644 index 000000000..31aaa4ea1 --- /dev/null +++ b/src/pybind/mgr/nfs/ganesha_conf.py @@ -0,0 +1,548 @@ +from typing import cast, List, Dict, Any, Optional, TYPE_CHECKING +from os.path import isabs + +from mgr_module import NFS_GANESHA_SUPPORTED_FSALS + +from .exception import NFSInvalidOperation, FSNotFound +from .utils import check_fs + +if TYPE_CHECKING: + from nfs.module import Module + + +def _indentation(depth: int, size: int = 4) -> str: + return " " * (depth * size) + + +def _format_val(block_name: str, key: str, val: str) -> str: + if isinstance(val, list): + return ', '.join([_format_val(block_name, key, v) for v in val]) + if isinstance(val, bool): + return str(val).lower() + if isinstance(val, int) or (block_name == 'CLIENT' + and key == 'clients'): + return '{}'.format(val) + return '"{}"'.format(val) + + +def _validate_squash(squash: str) -> None: + valid_squash_ls = [ + "root", "root_squash", "rootsquash", "rootid", "root_id_squash", + "rootidsquash", "all", "all_squash", "allsquash", "all_anomnymous", + "allanonymous", "no_root_squash", "none", "noidsquash", + ] + if squash.lower() not in valid_squash_ls: + raise NFSInvalidOperation( + f"squash {squash} not in valid list {valid_squash_ls}" + ) + + +def _validate_access_type(access_type: str) -> None: + valid_access_types = ['rw', 'ro', 'none'] + if not isinstance(access_type, str) or access_type.lower() not in valid_access_types: + raise NFSInvalidOperation( + f'{access_type} is invalid, valid access type are' + f'{valid_access_types}' + ) + + +def _validate_sec_type(sec_type: str) -> None: + valid_sec_types = ["none", "sys", "krb5", "krb5i", "krb5p"] + if not isinstance(sec_type, str) or sec_type not in valid_sec_types: + raise NFSInvalidOperation( + f"SecType {sec_type} invalid, valid types are {valid_sec_types}") + + +class RawBlock(): + def __init__(self, block_name: str, blocks: List['RawBlock'] = [], values: Dict[str, Any] = {}): + if not values: # workaround mutable default argument + values = {} + if not blocks: # workaround mutable default argument + blocks = [] + self.block_name = block_name + self.blocks = blocks + self.values = values + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, RawBlock): + return False + return self.block_name == other.block_name and \ + self.blocks == other.blocks and \ + self.values == other.values + + def __repr__(self) -> str: + return f'RawBlock({self.block_name!r}, {self.blocks!r}, {self.values!r})' + + +class GaneshaConfParser: + def __init__(self, raw_config: str): + self.pos = 0 + self.text = "" + for line in raw_config.split("\n"): + line = line.lstrip() + + if line.startswith("%"): + self.text += line.replace('"', "") + self.text += "\n" + else: + self.text += "".join(line.split()) + + def stream(self) -> str: + return self.text[self.pos:] + + def last_context(self) -> str: + return f'"...{self.text[max(0, self.pos - 30):self.pos]}{self.stream()[:30]}"' + + def parse_block_name(self) -> str: + idx = self.stream().find('{') + if idx == -1: + raise Exception(f"Cannot find block name at {self.last_context()}") + block_name = self.stream()[:idx] + self.pos += idx + 1 + return block_name + + def parse_block_or_section(self) -> RawBlock: + if self.stream().startswith("%url "): + # section line + self.pos += 5 + idx = self.stream().find('\n') + if idx == -1: + value = self.stream() + self.pos += len(value) + else: + value = self.stream()[:idx] + self.pos += idx + 1 + block_dict = RawBlock('%url', values={'value': value}) + return block_dict + + block_dict = RawBlock(self.parse_block_name().upper()) + self.parse_block_body(block_dict) + if self.stream()[0] != '}': + raise Exception("No closing bracket '}' found at the end of block") + self.pos += 1 + return block_dict + + def parse_parameter_value(self, raw_value: str) -> Any: + if raw_value.find(',') != -1: + return [self.parse_parameter_value(v.strip()) + for v in raw_value.split(',')] + try: + return int(raw_value) + except ValueError: + if raw_value == "true": + return True + if raw_value == "false": + return False + if raw_value.find('"') == 0: + return raw_value[1:-1] + return raw_value + + def parse_stanza(self, block_dict: RawBlock) -> None: + equal_idx = self.stream().find('=') + if equal_idx == -1: + raise Exception("Malformed stanza: no equal symbol found.") + semicolon_idx = self.stream().find(';') + parameter_name = self.stream()[:equal_idx].lower() + parameter_value = self.stream()[equal_idx + 1:semicolon_idx] + block_dict.values[parameter_name] = self.parse_parameter_value(parameter_value) + self.pos += semicolon_idx + 1 + + def parse_block_body(self, block_dict: RawBlock) -> None: + while True: + if self.stream().find('}') == 0: + # block end + return + + last_pos = self.pos + semicolon_idx = self.stream().find(';') + lbracket_idx = self.stream().find('{') + is_semicolon = (semicolon_idx != -1) + is_lbracket = (lbracket_idx != -1) + is_semicolon_lt_lbracket = (semicolon_idx < lbracket_idx) + + if is_semicolon and ((is_lbracket and is_semicolon_lt_lbracket) or not is_lbracket): + self.parse_stanza(block_dict) + elif is_lbracket and ((is_semicolon and not is_semicolon_lt_lbracket) + or (not is_semicolon)): + block_dict.blocks.append(self.parse_block_or_section()) + else: + raise Exception("Malformed stanza: no semicolon found.") + + if last_pos == self.pos: + raise Exception("Infinite loop while parsing block content") + + def parse(self) -> List[RawBlock]: + blocks = [] + while self.stream(): + blocks.append(self.parse_block_or_section()) + return blocks + + +class FSAL(object): + def __init__(self, name: str) -> None: + self.name = name + + @classmethod + def from_dict(cls, fsal_dict: Dict[str, Any]) -> 'FSAL': + if fsal_dict.get('name') == NFS_GANESHA_SUPPORTED_FSALS[0]: + return CephFSFSAL.from_dict(fsal_dict) + if fsal_dict.get('name') == NFS_GANESHA_SUPPORTED_FSALS[1]: + return RGWFSAL.from_dict(fsal_dict) + raise NFSInvalidOperation(f'Unknown FSAL {fsal_dict.get("name")}') + + @classmethod + def from_fsal_block(cls, fsal_block: RawBlock) -> 'FSAL': + if fsal_block.values.get('name') == NFS_GANESHA_SUPPORTED_FSALS[0]: + return CephFSFSAL.from_fsal_block(fsal_block) + if fsal_block.values.get('name') == NFS_GANESHA_SUPPORTED_FSALS[1]: + return RGWFSAL.from_fsal_block(fsal_block) + raise NFSInvalidOperation(f'Unknown FSAL {fsal_block.values.get("name")}') + + def to_fsal_block(self) -> RawBlock: + raise NotImplementedError + + def to_dict(self) -> Dict[str, Any]: + raise NotImplementedError + + +class CephFSFSAL(FSAL): + def __init__(self, + name: str, + user_id: Optional[str] = None, + fs_name: Optional[str] = None, + sec_label_xattr: Optional[str] = None, + cephx_key: Optional[str] = None) -> None: + super().__init__(name) + assert name == 'CEPH' + self.fs_name = fs_name + self.user_id = user_id + self.sec_label_xattr = sec_label_xattr + self.cephx_key = cephx_key + + @classmethod + def from_fsal_block(cls, fsal_block: RawBlock) -> 'CephFSFSAL': + return cls(fsal_block.values['name'], + fsal_block.values.get('user_id'), + fsal_block.values.get('filesystem'), + fsal_block.values.get('sec_label_xattr'), + fsal_block.values.get('secret_access_key')) + + def to_fsal_block(self) -> RawBlock: + result = RawBlock('FSAL', values={'name': self.name}) + + if self.user_id: + result.values['user_id'] = self.user_id + if self.fs_name: + result.values['filesystem'] = self.fs_name + if self.sec_label_xattr: + result.values['sec_label_xattr'] = self.sec_label_xattr + if self.cephx_key: + result.values['secret_access_key'] = self.cephx_key + return result + + @classmethod + def from_dict(cls, fsal_dict: Dict[str, Any]) -> 'CephFSFSAL': + return cls(fsal_dict['name'], + fsal_dict.get('user_id'), + fsal_dict.get('fs_name'), + fsal_dict.get('sec_label_xattr'), + fsal_dict.get('cephx_key')) + + def to_dict(self) -> Dict[str, str]: + r = {'name': self.name} + if self.user_id: + r['user_id'] = self.user_id + if self.fs_name: + r['fs_name'] = self.fs_name + if self.sec_label_xattr: + r['sec_label_xattr'] = self.sec_label_xattr + return r + + +class RGWFSAL(FSAL): + def __init__(self, + name: str, + user_id: Optional[str] = None, + access_key_id: Optional[str] = None, + secret_access_key: Optional[str] = None + ) -> None: + super().__init__(name) + assert name == 'RGW' + # RGW user uid + self.user_id = user_id + # S3 credentials + self.access_key_id = access_key_id + self.secret_access_key = secret_access_key + + @classmethod + def from_fsal_block(cls, fsal_block: RawBlock) -> 'RGWFSAL': + return cls(fsal_block.values['name'], + fsal_block.values.get('user_id'), + fsal_block.values.get('access_key_id'), + fsal_block.values.get('secret_access_key')) + + def to_fsal_block(self) -> RawBlock: + result = RawBlock('FSAL', values={'name': self.name}) + + if self.user_id: + result.values['user_id'] = self.user_id + if self.access_key_id: + result.values['access_key_id'] = self.access_key_id + if self.secret_access_key: + result.values['secret_access_key'] = self.secret_access_key + return result + + @classmethod + def from_dict(cls, fsal_dict: Dict[str, str]) -> 'RGWFSAL': + return cls(fsal_dict['name'], + fsal_dict.get('user_id'), + fsal_dict.get('access_key_id'), + fsal_dict.get('secret_access_key')) + + def to_dict(self) -> Dict[str, str]: + r = {'name': self.name} + if self.user_id: + r['user_id'] = self.user_id + if self.access_key_id: + r['access_key_id'] = self.access_key_id + if self.secret_access_key: + r['secret_access_key'] = self.secret_access_key + return r + + +class Client: + def __init__(self, + addresses: List[str], + access_type: str, + squash: str): + self.addresses = addresses + self.access_type = access_type + self.squash = squash + + @classmethod + def from_client_block(cls, client_block: RawBlock) -> 'Client': + addresses = client_block.values.get('clients', []) + if isinstance(addresses, str): + addresses = [addresses] + return cls(addresses, + client_block.values.get('access_type', None), + client_block.values.get('squash', None)) + + def to_client_block(self) -> RawBlock: + result = RawBlock('CLIENT', values={'clients': self.addresses}) + if self.access_type: + result.values['access_type'] = self.access_type + if self.squash: + result.values['squash'] = self.squash + return result + + @classmethod + def from_dict(cls, client_dict: Dict[str, Any]) -> 'Client': + return cls(client_dict['addresses'], client_dict['access_type'], + client_dict['squash']) + + def to_dict(self) -> Dict[str, Any]: + return { + 'addresses': self.addresses, + 'access_type': self.access_type, + 'squash': self.squash + } + + +class Export: + def __init__( + self, + export_id: int, + path: str, + cluster_id: str, + pseudo: str, + access_type: str, + squash: str, + security_label: bool, + protocols: List[int], + transports: List[str], + fsal: FSAL, + clients: Optional[List[Client]] = None, + sectype: Optional[List[str]] = None) -> None: + self.export_id = export_id + self.path = path + self.fsal = fsal + self.cluster_id = cluster_id + self.pseudo = pseudo + self.access_type = access_type + self.squash = squash + self.attr_expiration_time = 0 + self.security_label = security_label + self.protocols = protocols + self.transports = transports + self.clients: List[Client] = clients or [] + self.sectype = sectype + + @classmethod + def from_export_block(cls, export_block: RawBlock, cluster_id: str) -> 'Export': + fsal_blocks = [b for b in export_block.blocks + if b.block_name == "FSAL"] + + client_blocks = [b for b in export_block.blocks + if b.block_name == "CLIENT"] + + protocols = export_block.values.get('protocols') + if not isinstance(protocols, list): + protocols = [protocols] + + transports = export_block.values.get('transports') + if isinstance(transports, str): + transports = [transports] + elif not transports: + transports = [] + + # if this module wrote the ganesha conf the param is camelcase + # "SecType". but for compatiblity with manually edited ganesha confs, + # accept "sectype" too. + sectype = (export_block.values.get("SecType") + or export_block.values.get("sectype") or None) + return cls(export_block.values['export_id'], + export_block.values['path'], + cluster_id, + export_block.values['pseudo'], + export_block.values.get('access_type', 'none'), + export_block.values.get('squash', 'no_root_squash'), + export_block.values.get('security_label', True), + protocols, + transports, + FSAL.from_fsal_block(fsal_blocks[0]), + [Client.from_client_block(client) + for client in client_blocks], + sectype=sectype) + + def to_export_block(self) -> RawBlock: + values = { + 'export_id': self.export_id, + 'path': self.path, + 'pseudo': self.pseudo, + 'access_type': self.access_type, + 'squash': self.squash, + 'attr_expiration_time': self.attr_expiration_time, + 'security_label': self.security_label, + 'protocols': self.protocols, + 'transports': self.transports, + } + if self.sectype: + values['SecType'] = self.sectype + result = RawBlock("EXPORT", values=values) + result.blocks = [ + self.fsal.to_fsal_block() + ] + [ + client.to_client_block() + for client in self.clients + ] + return result + + @classmethod + def from_dict(cls, export_id: int, ex_dict: Dict[str, Any]) -> 'Export': + return cls(export_id, + ex_dict.get('path', '/'), + ex_dict['cluster_id'], + ex_dict['pseudo'], + ex_dict.get('access_type', 'RO'), + ex_dict.get('squash', 'no_root_squash'), + ex_dict.get('security_label', True), + ex_dict.get('protocols', [4]), + ex_dict.get('transports', ['TCP']), + FSAL.from_dict(ex_dict.get('fsal', {})), + [Client.from_dict(client) for client in ex_dict.get('clients', [])], + sectype=ex_dict.get("sectype")) + + def to_dict(self) -> Dict[str, Any]: + values = { + 'export_id': self.export_id, + 'path': self.path, + 'cluster_id': self.cluster_id, + 'pseudo': self.pseudo, + 'access_type': self.access_type, + 'squash': self.squash, + 'security_label': self.security_label, + 'protocols': sorted([p for p in self.protocols]), + 'transports': sorted([t for t in self.transports]), + 'fsal': self.fsal.to_dict(), + 'clients': [client.to_dict() for client in self.clients] + } + if self.sectype: + values['sectype'] = self.sectype + return values + + def validate(self, mgr: 'Module') -> None: + if not isabs(self.pseudo) or self.pseudo == "/": + raise NFSInvalidOperation( + f"pseudo path {self.pseudo} is invalid. It should be an absolute " + "path and it cannot be just '/'." + ) + + _validate_squash(self.squash) + _validate_access_type(self.access_type) + + if not isinstance(self.security_label, bool): + raise NFSInvalidOperation('security_label must be a boolean value') + + for p in self.protocols: + if p not in [3, 4]: + raise NFSInvalidOperation(f"Invalid protocol {p}") + + valid_transport = ["UDP", "TCP"] + for trans in self.transports: + if trans.upper() not in valid_transport: + raise NFSInvalidOperation(f'{trans} is not a valid transport protocol') + + for client in self.clients: + if client.squash: + _validate_squash(client.squash) + if client.access_type: + _validate_access_type(client.access_type) + + if self.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]: + fs = cast(CephFSFSAL, self.fsal) + if not fs.fs_name or not check_fs(mgr, fs.fs_name): + raise FSNotFound(fs.fs_name) + elif self.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]: + rgw = cast(RGWFSAL, self.fsal) # noqa + pass + else: + raise NFSInvalidOperation('FSAL {self.fsal.name} not supported') + + for st in (self.sectype or []): + _validate_sec_type(st) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Export): + return False + return self.to_dict() == other.to_dict() + + +def _format_block_body(block: RawBlock, depth: int = 0) -> str: + conf_str = "" + for blo in block.blocks: + conf_str += format_block(blo, depth) + + for key, val in block.values.items(): + if val is not None: + conf_str += _indentation(depth) + fval = _format_val(block.block_name, key, val) + conf_str += '{} = {};\n'.format(key, fval) + return conf_str + + +def format_block(block: RawBlock, depth: int = 0) -> str: + """Format a raw block object into text suitable as a ganesha configuration + block. + """ + if block.block_name == "%url": + return '%url "{}"\n\n'.format(block.values['value']) + + conf_str = "" + conf_str += _indentation(depth) + conf_str += format(block.block_name) + conf_str += " {\n" + conf_str += _format_block_body(block, depth + 1) + conf_str += _indentation(depth) + conf_str += "}\n" + return conf_str diff --git a/src/pybind/mgr/nfs/module.py b/src/pybind/mgr/nfs/module.py new file mode 100644 index 000000000..a984500ee --- /dev/null +++ b/src/pybind/mgr/nfs/module.py @@ -0,0 +1,189 @@ +import logging +import threading +from typing import Tuple, Optional, List, Dict, Any + +from mgr_module import MgrModule, CLICommand, Option, CLICheckNonemptyFileInput +import object_format +import orchestrator +from orchestrator.module import IngressType + +from .export import ExportMgr, AppliedExportResults +from .cluster import NFSCluster +from .utils import available_clusters + +log = logging.getLogger(__name__) + + +class Module(orchestrator.OrchestratorClientMixin, MgrModule): + MODULE_OPTIONS: List[Option] = [] + + def __init__(self, *args: str, **kwargs: Any) -> None: + self.inited = False + self.lock = threading.Lock() + super(Module, self).__init__(*args, **kwargs) + with self.lock: + self.export_mgr = ExportMgr(self) + self.nfs = NFSCluster(self) + self.inited = True + + @CLICommand('nfs export create cephfs', perm='rw') + @object_format.Responder() + def _cmd_nfs_export_create_cephfs( + self, + cluster_id: str, + pseudo_path: str, + fsname: str, + path: Optional[str] = '/', + readonly: Optional[bool] = False, + client_addr: Optional[List[str]] = None, + squash: str = 'none', + sectype: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """Create a CephFS export""" + return self.export_mgr.create_export( + fsal_type='cephfs', + fs_name=fsname, + cluster_id=cluster_id, + pseudo_path=pseudo_path, + read_only=readonly, + path=path, + squash=squash, + addr=client_addr, + sectype=sectype, + ) + + @CLICommand('nfs export create rgw', perm='rw') + @object_format.Responder() + def _cmd_nfs_export_create_rgw( + self, + cluster_id: str, + pseudo_path: str, + bucket: Optional[str] = None, + user_id: Optional[str] = None, + readonly: Optional[bool] = False, + client_addr: Optional[List[str]] = None, + squash: str = 'none', + sectype: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """Create an RGW export""" + return self.export_mgr.create_export( + fsal_type='rgw', + bucket=bucket, + user_id=user_id, + cluster_id=cluster_id, + pseudo_path=pseudo_path, + read_only=readonly, + squash=squash, + addr=client_addr, + sectype=sectype, + ) + + @CLICommand('nfs export rm', perm='rw') + @object_format.EmptyResponder() + def _cmd_nfs_export_rm(self, cluster_id: str, pseudo_path: str) -> None: + """Remove a cephfs export""" + return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo_path) + + @CLICommand('nfs export delete', perm='rw') + @object_format.EmptyResponder() + def _cmd_nfs_export_delete(self, cluster_id: str, pseudo_path: str) -> None: + """Delete a cephfs export (DEPRECATED)""" + return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo_path) + + @CLICommand('nfs export ls', perm='r') + @object_format.Responder() + def _cmd_nfs_export_ls(self, cluster_id: str, detailed: bool = False) -> List[Any]: + """List exports of a NFS cluster""" + return self.export_mgr.list_exports(cluster_id=cluster_id, detailed=detailed) + + @CLICommand('nfs export info', perm='r') + @object_format.Responder() + def _cmd_nfs_export_info(self, cluster_id: str, pseudo_path: str) -> Dict[str, Any]: + """Fetch a export of a NFS cluster given the pseudo path/binding""" + return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=pseudo_path) + + @CLICommand('nfs export get', perm='r') + @object_format.Responder() + def _cmd_nfs_export_get(self, cluster_id: str, pseudo_path: str) -> Dict[str, Any]: + """Fetch a export of a NFS cluster given the pseudo path/binding (DEPRECATED)""" + return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=pseudo_path) + + @CLICommand('nfs export apply', perm='rw') + @CLICheckNonemptyFileInput(desc='Export JSON or Ganesha EXPORT specification') + @object_format.Responder() + def _cmd_nfs_export_apply(self, cluster_id: str, inbuf: str) -> AppliedExportResults: + """Create or update an export by `-i `""" + return self.export_mgr.apply_export(cluster_id, export_config=inbuf) + + @CLICommand('nfs cluster create', perm='rw') + @object_format.EmptyResponder() + def _cmd_nfs_cluster_create(self, + cluster_id: str, + placement: Optional[str] = None, + ingress: Optional[bool] = None, + virtual_ip: Optional[str] = None, + ingress_mode: Optional[IngressType] = None, + port: Optional[int] = None) -> None: + """Create an NFS Cluster""" + return self.nfs.create_nfs_cluster(cluster_id=cluster_id, placement=placement, + virtual_ip=virtual_ip, ingress=ingress, + ingress_mode=ingress_mode, port=port) + + @CLICommand('nfs cluster rm', perm='rw') + @object_format.EmptyResponder() + def _cmd_nfs_cluster_rm(self, cluster_id: str) -> None: + """Removes an NFS Cluster""" + return self.nfs.delete_nfs_cluster(cluster_id=cluster_id) + + @CLICommand('nfs cluster delete', perm='rw') + @object_format.EmptyResponder() + def _cmd_nfs_cluster_delete(self, cluster_id: str) -> None: + """Removes an NFS Cluster (DEPRECATED)""" + return self.nfs.delete_nfs_cluster(cluster_id=cluster_id) + + @CLICommand('nfs cluster ls', perm='r') + @object_format.Responder() + def _cmd_nfs_cluster_ls(self) -> List[str]: + """List NFS Clusters""" + return self.nfs.list_nfs_cluster() + + @CLICommand('nfs cluster info', perm='r') + @object_format.Responder() + def _cmd_nfs_cluster_info(self, cluster_id: Optional[str] = None) -> Dict[str, Any]: + """Displays NFS Cluster info""" + return self.nfs.show_nfs_cluster_info(cluster_id=cluster_id) + + @CLICommand('nfs cluster config get', perm='r') + @object_format.ErrorResponseHandler() + def _cmd_nfs_cluster_config_get(self, cluster_id: str) -> Tuple[int, str, str]: + """Fetch NFS-Ganesha config""" + conf = self.nfs.get_nfs_cluster_config(cluster_id=cluster_id) + return 0, conf, "" + + @CLICommand('nfs cluster config set', perm='rw') + @CLICheckNonemptyFileInput(desc='NFS-Ganesha Configuration') + @object_format.EmptyResponder() + def _cmd_nfs_cluster_config_set(self, cluster_id: str, inbuf: str) -> None: + """Set NFS-Ganesha config by `-i `""" + return self.nfs.set_nfs_cluster_config(cluster_id=cluster_id, nfs_config=inbuf) + + @CLICommand('nfs cluster config reset', perm='rw') + @object_format.EmptyResponder() + def _cmd_nfs_cluster_config_reset(self, cluster_id: str) -> None: + """Reset NFS-Ganesha Config to default""" + return self.nfs.reset_nfs_cluster_config(cluster_id=cluster_id) + + def fetch_nfs_export_obj(self) -> ExportMgr: + return self.export_mgr + + def export_ls(self) -> List[Dict[Any, Any]]: + return self.export_mgr.list_all_exports() + + def export_get(self, cluster_id: str, export_id: int) -> Optional[Dict[str, Any]]: + return self.export_mgr.get_export_by_id(cluster_id, export_id) + + def export_rm(self, cluster_id: str, pseudo: str) -> None: + self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo) + + def cluster_ls(self) -> List[str]: + return available_clusters(self) diff --git a/src/pybind/mgr/nfs/tests/__init__.py b/src/pybind/mgr/nfs/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/pybind/mgr/nfs/tests/test_nfs.py b/src/pybind/mgr/nfs/tests/test_nfs.py new file mode 100644 index 000000000..5b4d5fe7e --- /dev/null +++ b/src/pybind/mgr/nfs/tests/test_nfs.py @@ -0,0 +1,1156 @@ +# flake8: noqa +import json +import pytest +from typing import Optional, Tuple, Iterator, List, Any + +from contextlib import contextmanager +from unittest import mock +from unittest.mock import MagicMock +from mgr_module import MgrModule, NFS_POOL_NAME + +from rados import ObjectNotFound + +from ceph.deployment.service_spec import NFSServiceSpec +from nfs import Module +from nfs.export import ExportMgr, normalize_path +from nfs.ganesha_conf import GaneshaConfParser, Export, RawBlock +from nfs.cluster import NFSCluster +from orchestrator import ServiceDescription, DaemonDescription, OrchResult + + +class TestNFS: + cluster_id = "foo" + export_1 = """ +EXPORT { + Export_ID=1; + Protocols = 4; + Path = /; + Pseudo = /cephfs_a/; + Access_Type = RW; + Protocols = 4; + Attr_Expiration_Time = 0; + # Squash = root; + + FSAL { + Name = CEPH; + Filesystem = "a"; + User_Id = "ganesha"; + # Secret_Access_Key = "YOUR SECRET KEY HERE"; + } + + CLIENT + { + Clients = 192.168.0.10, 192.168.1.0/8; + Squash = None; + } + + CLIENT + { + Clients = 192.168.0.0/16; + Squash = All; + Access_Type = RO; + } +} +""" + + export_2 = """ +EXPORT +{ + Export_ID=2; + Path = "/"; + Pseudo = "/rgw"; + Access_Type = RW; + squash = AllAnonymous; + Protocols = 4, 3; + Transports = TCP, UDP; + + FSAL { + Name = RGW; + User_Id = "nfs.foo.bucket"; + Access_Key_Id ="the_access_key"; + Secret_Access_Key = "the_secret_key"; + } +} +""" + export_3 = """ +EXPORT { + FSAL { + name = "CEPH"; + user_id = "nfs.foo.1"; + filesystem = "a"; + secret_access_key = "AQCjU+hgjyReLBAAddJa0Dza/ZHqjX5+JiePMA=="; + } + export_id = 1; + path = "/"; + pseudo = "/a"; + access_type = "RW"; + squash = "none"; + attr_expiration_time = 0; + security_label = true; + protocols = 4; + transports = "TCP"; +} +""" + export_4 = """ +EXPORT { + FSAL { + name = "CEPH"; + user_id = "nfs.foo.1"; + filesystem = "a"; + secret_access_key = "AQCjU+hgjyReLBAAddJa0Dza/ZHqjX5+JiePMA=="; + } + export_id = 1; + path = "/secure/me"; + pseudo = "/secure1"; + access_type = "RW"; + squash = "no_root_squash"; + SecType = "krb5p", "krb5i"; + attr_expiration_time = 0; + security_label = true; + protocols = 4; + transports = "TCP"; +} +""" + + conf_nfs_foo = f''' +%url "rados://{NFS_POOL_NAME}/{cluster_id}/export-1" + +%url "rados://{NFS_POOL_NAME}/{cluster_id}/export-2"''' + + class RObject(object): + def __init__(self, key: str, raw: str) -> None: + self.key = key + self.raw = raw + + def read(self, _: Optional[int]) -> bytes: + return self.raw.encode('utf-8') + + def stat(self) -> Tuple[int, None]: + return len(self.raw), None + + def _ioctx_write_full_mock(self, key: str, content: bytes) -> None: + if key not in self.temp_store[self.temp_store_namespace]: + self.temp_store[self.temp_store_namespace][key] = \ + TestNFS.RObject(key, content.decode('utf-8')) + else: + self.temp_store[self.temp_store_namespace][key].raw = content.decode('utf-8') + + def _ioctx_remove_mock(self, key: str) -> None: + del self.temp_store[self.temp_store_namespace][key] + + def _ioctx_list_objects_mock(self) -> List['TestNFS.RObject']: + r = [obj for _, obj in self.temp_store[self.temp_store_namespace].items()] + return r + + def _ioctl_stat_mock(self, key): + return self.temp_store[self.temp_store_namespace][key].stat() + + def _ioctl_read_mock(self, key: str, size: Optional[Any] = None) -> bytes: + if key not in self.temp_store[self.temp_store_namespace]: + raise ObjectNotFound + return self.temp_store[self.temp_store_namespace][key].read(size) + + def _ioctx_set_namespace_mock(self, namespace: str) -> None: + self.temp_store_namespace = namespace + + def _reset_temp_store(self) -> None: + self.temp_store_namespace = None + self.temp_store = { + 'foo': { + 'export-1': TestNFS.RObject("export-1", self.export_1), + 'export-2': TestNFS.RObject("export-2", self.export_2), + 'conf-nfs.foo': TestNFS.RObject("conf-nfs.foo", self.conf_nfs_foo) + } + } + + @contextmanager + def _mock_orchestrator(self, enable: bool) -> Iterator: + self.io_mock = MagicMock() + self.io_mock.set_namespace.side_effect = self._ioctx_set_namespace_mock + self.io_mock.read = self._ioctl_read_mock + self.io_mock.stat = self._ioctl_stat_mock + self.io_mock.list_objects.side_effect = self._ioctx_list_objects_mock + self.io_mock.write_full.side_effect = self._ioctx_write_full_mock + self.io_mock.remove_object.side_effect = self._ioctx_remove_mock + + # mock nfs services + orch_nfs_services = [ + ServiceDescription(spec=NFSServiceSpec(service_id=self.cluster_id)) + ] if enable else [] + + orch_nfs_daemons = [ + DaemonDescription('nfs', 'foo.mydaemon', 'myhostname') + ] if enable else [] + + def mock_exec(cls, args): + if args[1:3] == ['bucket', 'stats']: + bucket_info = { + "owner": "bucket_owner_user", + } + return 0, json.dumps(bucket_info), '' + u = { + "user_id": "abc", + "display_name": "foo", + "email": "", + "suspended": 0, + "max_buckets": 1000, + "subusers": [], + "keys": [ + { + "user": "abc", + "access_key": "the_access_key", + "secret_key": "the_secret_key" + } + ], + "swift_keys": [], + "caps": [], + "op_mask": "read, write, delete", + "default_placement": "", + "default_storage_class": "", + "placement_tags": [], + "bucket_quota": { + "enabled": False, + "check_on_raw": False, + "max_size": -1, + "max_size_kb": 0, + "max_objects": -1 + }, + "user_quota": { + "enabled": False, + "check_on_raw": False, + "max_size": -1, + "max_size_kb": 0, + "max_objects": -1 + }, + "temp_url_keys": [], + "type": "rgw", + "mfa_ids": [] + } + if args[2] == 'list': + return 0, json.dumps([u]), '' + return 0, json.dumps(u), '' + + def mock_describe_service(cls, *args, **kwargs): + if kwargs['service_type'] == 'nfs': + return OrchResult(orch_nfs_services) + return OrchResult([]) + + def mock_list_daemons(cls, *args, **kwargs): + if kwargs['daemon_type'] == 'nfs': + return OrchResult(orch_nfs_daemons) + return OrchResult([]) + + with mock.patch('nfs.module.Module.describe_service', mock_describe_service) as describe_service, \ + mock.patch('nfs.module.Module.list_daemons', mock_list_daemons) as list_daemons, \ + mock.patch('nfs.module.Module.rados') as rados, \ + mock.patch('nfs.export.available_clusters', + return_value=[self.cluster_id]), \ + mock.patch('nfs.export.restart_nfs_service'), \ + mock.patch('nfs.cluster.restart_nfs_service'), \ + mock.patch.object(MgrModule, 'tool_exec', mock_exec), \ + mock.patch('nfs.export.check_fs', return_value=True), \ + mock.patch('nfs.ganesha_conf.check_fs', return_value=True), \ + mock.patch('nfs.export.ExportMgr._create_user_key', + return_value='thekeyforclientabc'), \ + mock.patch('nfs.export.cephfs_path_is_dir'): + + rados.open_ioctx.return_value.__enter__.return_value = self.io_mock + rados.open_ioctx.return_value.__exit__ = mock.Mock(return_value=None) + + self._reset_temp_store() + + yield + + def test_parse_daemon_raw_config(self) -> None: + expected_daemon_config = [ + RawBlock('NFS_CORE_PARAM', values={ + "enable_nlm": False, + "enable_rquota": False, + "protocols": 4, + "nfs_port": 14000 + }), + RawBlock('MDCACHE', values={ + "dir_chunk": 0 + }), + RawBlock('NFSV4', values={ + "recoverybackend": "rados_cluster", + "minor_versions": [1, 2] + }), + RawBlock('RADOS_KV', values={ + "pool": NFS_POOL_NAME, + "namespace": "vstart", + "userid": "vstart", + "nodeid": "a" + }), + RawBlock('RADOS_URLS', values={ + "userid": "vstart", + "watch_url": f"'rados://{NFS_POOL_NAME}/vstart/conf-nfs.vstart'" + }), + RawBlock('%url', values={ + "value": f"rados://{NFS_POOL_NAME}/vstart/conf-nfs.vstart" + }) + ] + daemon_raw_config = """ +NFS_CORE_PARAM { + Enable_NLM = false; + Enable_RQUOTA = false; + Protocols = 4; + NFS_Port = 14000; + } + + MDCACHE { + Dir_Chunk = 0; + } + + NFSv4 { + RecoveryBackend = rados_cluster; + Minor_Versions = 1, 2; + } + + RADOS_KV { + pool = {}; + namespace = vstart; + UserId = vstart; + nodeid = a; + } + + RADOS_URLS { + Userid = vstart; + watch_url = 'rados://{}/vstart/conf-nfs.vstart'; + } + + %url rados://{}/vstart/conf-nfs.vstart +""".replace('{}', NFS_POOL_NAME) + daemon_config = GaneshaConfParser(daemon_raw_config).parse() + assert daemon_config == expected_daemon_config + + def _validate_export_1(self, export: Export): + assert export.export_id == 1 + assert export.path == "/" + assert export.pseudo == "/cephfs_a/" + assert export.access_type == "RW" + # assert export.squash == "root_squash" # probably correct value + assert export.squash == "no_root_squash" + assert export.protocols == [4] + # assert export.transports == {"TCP", "UDP"} + assert export.fsal.name == "CEPH" + assert export.fsal.user_id == "ganesha" + assert export.fsal.fs_name == "a" + assert export.fsal.sec_label_xattr == None + assert len(export.clients) == 2 + assert export.clients[0].addresses == \ + ["192.168.0.10", "192.168.1.0/8"] + # assert export.clients[0].squash == "no_root_squash" # probably correct value + assert export.clients[0].squash == "None" + assert export.clients[0].access_type is None + assert export.clients[1].addresses == ["192.168.0.0/16"] + # assert export.clients[1].squash == "all_squash" # probably correct value + assert export.clients[1].squash == "All" + assert export.clients[1].access_type == "RO" + assert export.cluster_id == 'foo' + assert export.attr_expiration_time == 0 + # assert export.security_label == False # probably correct value + assert export.security_label == True + + def test_export_parser_1(self) -> None: + blocks = GaneshaConfParser(self.export_1).parse() + assert isinstance(blocks, list) + assert len(blocks) == 1 + export = Export.from_export_block(blocks[0], self.cluster_id) + self._validate_export_1(export) + + def _validate_export_2(self, export: Export): + assert export.export_id == 2 + assert export.path == "/" + assert export.pseudo == "/rgw" + assert export.access_type == "RW" + # assert export.squash == "all_squash" # probably correct value + assert export.squash == "AllAnonymous" + assert export.protocols == [4, 3] + assert set(export.transports) == {"TCP", "UDP"} + assert export.fsal.name == "RGW" + assert export.fsal.user_id == "nfs.foo.bucket" + assert export.fsal.access_key_id == "the_access_key" + assert export.fsal.secret_access_key == "the_secret_key" + assert len(export.clients) == 0 + assert export.cluster_id == 'foo' + + def test_export_parser_2(self) -> None: + blocks = GaneshaConfParser(self.export_2).parse() + assert isinstance(blocks, list) + assert len(blocks) == 1 + export = Export.from_export_block(blocks[0], self.cluster_id) + self._validate_export_2(export) + + def test_daemon_conf_parser(self) -> None: + blocks = GaneshaConfParser(self.conf_nfs_foo).parse() + assert isinstance(blocks, list) + assert len(blocks) == 2 + assert blocks[0].block_name == "%url" + assert blocks[0].values['value'] == f"rados://{NFS_POOL_NAME}/{self.cluster_id}/export-1" + assert blocks[1].block_name == "%url" + assert blocks[1].values['value'] == f"rados://{NFS_POOL_NAME}/{self.cluster_id}/export-2" + + def _do_mock_test(self, func, *args) -> None: + with self._mock_orchestrator(True): + func(*args) + self._reset_temp_store() + + def test_ganesha_conf(self) -> None: + self._do_mock_test(self._do_test_ganesha_conf) + + def _do_test_ganesha_conf(self) -> None: + nfs_mod = Module('nfs', '', '') + ganesha_conf = ExportMgr(nfs_mod) + exports = ganesha_conf.exports[self.cluster_id] + + assert len(exports) == 2 + + self._validate_export_1([e for e in exports if e.export_id == 1][0]) + self._validate_export_2([e for e in exports if e.export_id == 2][0]) + + def test_config_dict(self) -> None: + self._do_mock_test(self._do_test_config_dict) + + def _do_test_config_dict(self) -> None: + nfs_mod = Module('nfs', '', '') + conf = ExportMgr(nfs_mod) + export = [e for e in conf.exports['foo'] if e.export_id == 1][0] + ex_dict = export.to_dict() + + assert ex_dict == {'access_type': 'RW', + 'clients': [{'access_type': None, + 'addresses': ['192.168.0.10', '192.168.1.0/8'], + 'squash': 'None'}, + {'access_type': 'RO', + 'addresses': ['192.168.0.0/16'], + 'squash': 'All'}], + 'cluster_id': self.cluster_id, + 'export_id': 1, + 'fsal': {'fs_name': 'a', 'name': 'CEPH', 'user_id': 'ganesha'}, + 'path': '/', + 'protocols': [4], + 'pseudo': '/cephfs_a/', + 'security_label': True, + 'squash': 'no_root_squash', + 'transports': []} + + export = [e for e in conf.exports['foo'] if e.export_id == 2][0] + ex_dict = export.to_dict() + assert ex_dict == {'access_type': 'RW', + 'clients': [], + 'cluster_id': self.cluster_id, + 'export_id': 2, + 'fsal': {'name': 'RGW', + 'access_key_id': 'the_access_key', + 'secret_access_key': 'the_secret_key', + 'user_id': 'nfs.foo.bucket'}, + 'path': '/', + 'protocols': [3, 4], + 'pseudo': '/rgw', + 'security_label': True, + 'squash': 'AllAnonymous', + 'transports': ['TCP', 'UDP']} + + def test_config_from_dict(self) -> None: + self._do_mock_test(self._do_test_config_from_dict) + + def _do_test_config_from_dict(self) -> None: + export = Export.from_dict(1, { + 'export_id': 1, + 'path': '/', + 'cluster_id': self.cluster_id, + 'pseudo': '/cephfs_a', + 'access_type': 'RW', + 'squash': 'root_squash', + 'security_label': True, + 'protocols': [4], + 'transports': ['TCP', 'UDP'], + 'clients': [{ + 'addresses': ["192.168.0.10", "192.168.1.0/8"], + 'access_type': None, + 'squash': 'no_root_squash' + }, { + 'addresses': ["192.168.0.0/16"], + 'access_type': 'RO', + 'squash': 'all_squash' + }], + 'fsal': { + 'name': 'CEPH', + 'user_id': 'ganesha', + 'fs_name': 'a', + 'sec_label_xattr': 'security.selinux' + } + }) + + assert export.export_id == 1 + assert export.path == "/" + assert export.pseudo == "/cephfs_a" + assert export.access_type == "RW" + assert export.squash == "root_squash" + assert set(export.protocols) == {4} + assert set(export.transports) == {"TCP", "UDP"} + assert export.fsal.name == "CEPH" + assert export.fsal.user_id == "ganesha" + assert export.fsal.fs_name == "a" + assert export.fsal.sec_label_xattr == 'security.selinux' + assert len(export.clients) == 2 + assert export.clients[0].addresses == \ + ["192.168.0.10", "192.168.1.0/8"] + assert export.clients[0].squash == "no_root_squash" + assert export.clients[0].access_type is None + assert export.clients[1].addresses == ["192.168.0.0/16"] + assert export.clients[1].squash == "all_squash" + assert export.clients[1].access_type == "RO" + assert export.cluster_id == self.cluster_id + assert export.attr_expiration_time == 0 + assert export.security_label + + export = Export.from_dict(2, { + 'export_id': 2, + 'path': 'bucket', + 'pseudo': '/rgw', + 'cluster_id': self.cluster_id, + 'access_type': 'RW', + 'squash': 'all_squash', + 'security_label': False, + 'protocols': [4, 3], + 'transports': ['TCP', 'UDP'], + 'clients': [], + 'fsal': { + 'name': 'RGW', + 'user_id': 'rgw.foo.bucket', + 'access_key_id': 'the_access_key', + 'secret_access_key': 'the_secret_key' + } + }) + + assert export.export_id == 2 + assert export.path == "bucket" + assert export.pseudo == "/rgw" + assert export.access_type == "RW" + assert export.squash == "all_squash" + assert set(export.protocols) == {4, 3} + assert set(export.transports) == {"TCP", "UDP"} + assert export.fsal.name == "RGW" + assert export.fsal.user_id == "rgw.foo.bucket" + assert export.fsal.access_key_id == "the_access_key" + assert export.fsal.secret_access_key == "the_secret_key" + assert len(export.clients) == 0 + assert export.cluster_id == self.cluster_id + + @pytest.mark.parametrize( + "block", + [ + export_1, + export_2, + ] + ) + def test_export_from_to_export_block(self, block): + blocks = GaneshaConfParser(block).parse() + export = Export.from_export_block(blocks[0], self.cluster_id) + newblock = export.to_export_block() + export2 = Export.from_export_block(newblock, self.cluster_id) + newblock2 = export2.to_export_block() + assert newblock == newblock2 + + @pytest.mark.parametrize( + "block", + [ + export_1, + export_2, + ] + ) + def test_export_from_to_dict(self, block): + blocks = GaneshaConfParser(block).parse() + export = Export.from_export_block(blocks[0], self.cluster_id) + j = export.to_dict() + export2 = Export.from_dict(j['export_id'], j) + j2 = export2.to_dict() + assert j == j2 + + @pytest.mark.parametrize( + "block", + [ + export_1, + export_2, + ] + ) + def test_export_validate(self, block): + blocks = GaneshaConfParser(block).parse() + export = Export.from_export_block(blocks[0], self.cluster_id) + nfs_mod = Module('nfs', '', '') + with mock.patch('nfs.ganesha_conf.check_fs', return_value=True): + export.validate(nfs_mod) + + def test_update_export(self): + self._do_mock_test(self._do_test_update_export) + + def _do_test_update_export(self): + nfs_mod = Module('nfs', '', '') + conf = ExportMgr(nfs_mod) + r = conf.apply_export(self.cluster_id, json.dumps({ + 'export_id': 2, + 'path': 'bucket', + 'pseudo': '/rgw/bucket', + 'cluster_id': self.cluster_id, + 'access_type': 'RW', + 'squash': 'all_squash', + 'security_label': False, + 'protocols': [4, 3], + 'transports': ['TCP', 'UDP'], + 'clients': [{ + 'addresses': ["192.168.0.0/16"], + 'access_type': None, + 'squash': None + }], + 'fsal': { + 'name': 'RGW', + 'user_id': 'nfs.foo.bucket', + 'access_key_id': 'the_access_key', + 'secret_access_key': 'the_secret_key', + } + })) + assert len(r.changes) == 1 + + export = conf._fetch_export('foo', '/rgw/bucket') + assert export.export_id == 2 + assert export.path == "bucket" + assert export.pseudo == "/rgw/bucket" + assert export.access_type == "RW" + assert export.squash == "all_squash" + assert export.protocols == [4, 3] + assert export.transports == ["TCP", "UDP"] + assert export.fsal.name == "RGW" + assert export.fsal.access_key_id == "the_access_key" + assert export.fsal.secret_access_key == "the_secret_key" + assert len(export.clients) == 1 + assert export.clients[0].squash is None + assert export.clients[0].access_type is None + assert export.cluster_id == self.cluster_id + + # do it again, with changes + r = conf.apply_export(self.cluster_id, json.dumps({ + 'export_id': 2, + 'path': 'newbucket', + 'pseudo': '/rgw/bucket', + 'cluster_id': self.cluster_id, + 'access_type': 'RO', + 'squash': 'root', + 'security_label': False, + 'protocols': [4], + 'transports': ['TCP'], + 'clients': [{ + 'addresses': ["192.168.10.0/16"], + 'access_type': None, + 'squash': None + }], + 'fsal': { + 'name': 'RGW', + 'user_id': 'nfs.foo.newbucket', + 'access_key_id': 'the_access_key', + 'secret_access_key': 'the_secret_key', + } + })) + assert len(r.changes) == 1 + + export = conf._fetch_export('foo', '/rgw/bucket') + assert export.export_id == 2 + assert export.path == "newbucket" + assert export.pseudo == "/rgw/bucket" + assert export.access_type == "RO" + assert export.squash == "root" + assert export.protocols == [4] + assert export.transports == ["TCP"] + assert export.fsal.name == "RGW" + assert export.fsal.access_key_id == "the_access_key" + assert export.fsal.secret_access_key == "the_secret_key" + assert len(export.clients) == 1 + assert export.clients[0].squash is None + assert export.clients[0].access_type is None + assert export.cluster_id == self.cluster_id + + # again, but without export_id + r = conf.apply_export(self.cluster_id, json.dumps({ + 'path': 'newestbucket', + 'pseudo': '/rgw/bucket', + 'cluster_id': self.cluster_id, + 'access_type': 'RW', + 'squash': 'root', + 'security_label': False, + 'protocols': [4], + 'transports': ['TCP'], + 'clients': [{ + 'addresses': ["192.168.10.0/16"], + 'access_type': None, + 'squash': None + }], + 'fsal': { + 'name': 'RGW', + 'user_id': 'nfs.foo.newestbucket', + 'access_key_id': 'the_access_key', + 'secret_access_key': 'the_secret_key', + } + })) + assert len(r.changes) == 1 + + export = conf._fetch_export(self.cluster_id, '/rgw/bucket') + assert export.export_id == 2 + assert export.path == "newestbucket" + assert export.pseudo == "/rgw/bucket" + assert export.access_type == "RW" + assert export.squash == "root" + assert export.protocols == [4] + assert export.transports == ["TCP"] + assert export.fsal.name == "RGW" + assert export.fsal.access_key_id == "the_access_key" + assert export.fsal.secret_access_key == "the_secret_key" + assert len(export.clients) == 1 + assert export.clients[0].squash is None + assert export.clients[0].access_type is None + assert export.cluster_id == self.cluster_id + + def test_update_export_sectype(self): + self._do_mock_test(self._test_update_export_sectype) + + def _test_update_export_sectype(self): + nfs_mod = Module('nfs', '', '') + conf = ExportMgr(nfs_mod) + r = conf.apply_export(self.cluster_id, json.dumps({ + 'export_id': 2, + 'path': 'bucket', + 'pseudo': '/rgw/bucket', + 'cluster_id': self.cluster_id, + 'access_type': 'RW', + 'squash': 'all_squash', + 'security_label': False, + 'protocols': [4, 3], + 'transports': ['TCP', 'UDP'], + 'clients': [{ + 'addresses': ["192.168.0.0/16"], + 'access_type': None, + 'squash': None + }], + 'fsal': { + 'name': 'RGW', + 'user_id': 'nfs.foo.bucket', + 'access_key_id': 'the_access_key', + 'secret_access_key': 'the_secret_key', + } + })) + assert len(r.changes) == 1 + + # no sectype was given, key not present + info = conf._get_export_dict(self.cluster_id, "/rgw/bucket") + assert info["export_id"] == 2 + assert info["path"] == "bucket" + assert "sectype" not in info + + r = conf.apply_export(self.cluster_id, json.dumps({ + 'export_id': 2, + 'path': 'bucket', + 'pseudo': '/rgw/bucket', + 'cluster_id': self.cluster_id, + 'access_type': 'RW', + 'squash': 'all_squash', + 'security_label': False, + 'protocols': [4, 3], + 'transports': ['TCP', 'UDP'], + 'clients': [{ + 'addresses': ["192.168.0.0/16"], + 'access_type': None, + 'squash': None + }], + 'sectype': ["krb5p", "krb5i", "sys"], + 'fsal': { + 'name': 'RGW', + 'user_id': 'nfs.foo.bucket', + 'access_key_id': 'the_access_key', + 'secret_access_key': 'the_secret_key', + } + })) + assert len(r.changes) == 1 + + # assert sectype matches new value(s) + info = conf._get_export_dict(self.cluster_id, "/rgw/bucket") + assert info["export_id"] == 2 + assert info["path"] == "bucket" + assert info["sectype"] == ["krb5p", "krb5i", "sys"] + + def test_update_export_with_ganesha_conf(self): + self._do_mock_test(self._do_test_update_export_with_ganesha_conf) + + def _do_test_update_export_with_ganesha_conf(self): + nfs_mod = Module('nfs', '', '') + conf = ExportMgr(nfs_mod) + r = conf.apply_export(self.cluster_id, self.export_3) + assert len(r.changes) == 1 + + def test_update_export_with_ganesha_conf_sectype(self): + self._do_mock_test( + self._do_test_update_export_with_ganesha_conf_sectype, + self.export_4, ["krb5p", "krb5i"]) + + def test_update_export_with_ganesha_conf_sectype_lcase(self): + export_conf = self.export_4.replace("SecType", "sectype").replace("krb5i", "sys") + self._do_mock_test( + self._do_test_update_export_with_ganesha_conf_sectype, + export_conf, ["krb5p", "sys"]) + + def _do_test_update_export_with_ganesha_conf_sectype(self, export_conf, expect_sectype): + nfs_mod = Module('nfs', '', '') + conf = ExportMgr(nfs_mod) + r = conf.apply_export(self.cluster_id, export_conf) + assert len(r.changes) == 1 + + # assert sectype matches new value(s) + info = conf._get_export_dict(self.cluster_id, "/secure1") + assert info["export_id"] == 1 + assert info["path"] == "/secure/me" + assert info["sectype"] == expect_sectype + + def test_update_export_with_list(self): + self._do_mock_test(self._do_test_update_export_with_list) + + def _do_test_update_export_with_list(self): + nfs_mod = Module('nfs', '', '') + conf = ExportMgr(nfs_mod) + r = conf.apply_export(self.cluster_id, json.dumps([ + { + 'path': 'bucket', + 'pseudo': '/rgw/bucket', + 'cluster_id': self.cluster_id, + 'access_type': 'RW', + 'squash': 'root', + 'security_label': False, + 'protocols': [4], + 'transports': ['TCP'], + 'clients': [{ + 'addresses': ["192.168.0.0/16"], + 'access_type': None, + 'squash': None + }], + 'fsal': { + 'name': 'RGW', + 'user_id': 'nfs.foo.bucket', + 'access_key_id': 'the_access_key', + 'secret_access_key': 'the_secret_key', + } + }, + { + 'path': 'bucket2', + 'pseudo': '/rgw/bucket2', + 'cluster_id': self.cluster_id, + 'access_type': 'RO', + 'squash': 'root', + 'security_label': False, + 'protocols': [4], + 'transports': ['TCP'], + 'clients': [{ + 'addresses': ["192.168.0.0/16"], + 'access_type': None, + 'squash': None + }], + 'fsal': { + 'name': 'RGW', + 'user_id': 'nfs.foo.bucket2', + 'access_key_id': 'the_access_key', + 'secret_access_key': 'the_secret_key', + } + }, + ])) + # The input object above contains TWO items (two different pseudo paths) + # therefore we expect the result to report that two changes have been + # applied, rather than the typical 1 change. + assert len(r.changes) == 2 + + export = conf._fetch_export('foo', '/rgw/bucket') + assert export.export_id == 3 + assert export.path == "bucket" + assert export.pseudo == "/rgw/bucket" + assert export.access_type == "RW" + assert export.squash == "root" + assert export.protocols == [4] + assert export.transports == ["TCP"] + assert export.fsal.name == "RGW" + assert export.fsal.access_key_id == "the_access_key" + assert export.fsal.secret_access_key == "the_secret_key" + assert len(export.clients) == 1 + assert export.clients[0].squash is None + assert export.clients[0].access_type is None + assert export.cluster_id == self.cluster_id + + export = conf._fetch_export('foo', '/rgw/bucket2') + assert export.export_id == 4 + assert export.path == "bucket2" + assert export.pseudo == "/rgw/bucket2" + assert export.access_type == "RO" + assert export.squash == "root" + assert export.protocols == [4] + assert export.transports == ["TCP"] + assert export.fsal.name == "RGW" + assert export.fsal.access_key_id == "the_access_key" + assert export.fsal.secret_access_key == "the_secret_key" + assert len(export.clients) == 1 + assert export.clients[0].squash is None + assert export.clients[0].access_type is None + assert export.cluster_id == self.cluster_id + + def test_remove_export(self) -> None: + self._do_mock_test(self._do_test_remove_export) + + def _do_test_remove_export(self) -> None: + nfs_mod = Module('nfs', '', '') + conf = ExportMgr(nfs_mod) + assert len(conf.exports[self.cluster_id]) == 2 + conf.delete_export(cluster_id=self.cluster_id, + pseudo_path="/rgw") + exports = conf.exports[self.cluster_id] + assert len(exports) == 1 + assert exports[0].export_id == 1 + + def test_create_export_rgw_bucket(self): + self._do_mock_test(self._do_test_create_export_rgw_bucket) + + def _do_test_create_export_rgw_bucket(self): + nfs_mod = Module('nfs', '', '') + conf = ExportMgr(nfs_mod) + + ls = conf.list_exports(cluster_id=self.cluster_id) + assert len(ls) == 2 + + r = conf.create_export( + fsal_type='rgw', + cluster_id=self.cluster_id, + bucket='bucket', + pseudo_path='/mybucket', + read_only=False, + squash='root', + addr=["192.168.0.0/16"] + ) + assert r["bind"] == "/mybucket" + + ls = conf.list_exports(cluster_id=self.cluster_id) + assert len(ls) == 3 + + export = conf._fetch_export('foo', '/mybucket') + assert export.export_id + assert export.path == "bucket" + assert export.pseudo == "/mybucket" + assert export.access_type == "none" + assert export.squash == "none" + assert export.protocols == [4] + assert export.transports == ["TCP"] + assert export.fsal.name == "RGW" + assert export.fsal.user_id == "bucket_owner_user" + assert export.fsal.access_key_id == "the_access_key" + assert export.fsal.secret_access_key == "the_secret_key" + assert len(export.clients) == 1 + assert export.clients[0].squash == 'root' + assert export.clients[0].access_type == 'rw' + assert export.clients[0].addresses == ["192.168.0.0/16"] + assert export.cluster_id == self.cluster_id + + def test_create_export_rgw_bucket_user(self): + self._do_mock_test(self._do_test_create_export_rgw_bucket_user) + + def _do_test_create_export_rgw_bucket_user(self): + nfs_mod = Module('nfs', '', '') + conf = ExportMgr(nfs_mod) + + ls = conf.list_exports(cluster_id=self.cluster_id) + assert len(ls) == 2 + + r = conf.create_export( + fsal_type='rgw', + cluster_id=self.cluster_id, + bucket='bucket', + user_id='other_user', + pseudo_path='/mybucket', + read_only=False, + squash='root', + addr=["192.168.0.0/16"] + ) + assert r["bind"] == "/mybucket" + + ls = conf.list_exports(cluster_id=self.cluster_id) + assert len(ls) == 3 + + export = conf._fetch_export('foo', '/mybucket') + assert export.export_id + assert export.path == "bucket" + assert export.pseudo == "/mybucket" + assert export.access_type == "none" + assert export.squash == "none" + assert export.protocols == [4] + assert export.transports == ["TCP"] + assert export.fsal.name == "RGW" + assert export.fsal.access_key_id == "the_access_key" + assert export.fsal.secret_access_key == "the_secret_key" + assert len(export.clients) == 1 + assert export.clients[0].squash == 'root' + assert export.fsal.user_id == "other_user" + assert export.clients[0].access_type == 'rw' + assert export.clients[0].addresses == ["192.168.0.0/16"] + assert export.cluster_id == self.cluster_id + + def test_create_export_rgw_user(self): + self._do_mock_test(self._do_test_create_export_rgw_user) + + def _do_test_create_export_rgw_user(self): + nfs_mod = Module('nfs', '', '') + conf = ExportMgr(nfs_mod) + + ls = conf.list_exports(cluster_id=self.cluster_id) + assert len(ls) == 2 + + r = conf.create_export( + fsal_type='rgw', + cluster_id=self.cluster_id, + user_id='some_user', + pseudo_path='/mybucket', + read_only=False, + squash='root', + addr=["192.168.0.0/16"] + ) + assert r["bind"] == "/mybucket" + + ls = conf.list_exports(cluster_id=self.cluster_id) + assert len(ls) == 3 + + export = conf._fetch_export('foo', '/mybucket') + assert export.export_id + assert export.path == "/" + assert export.pseudo == "/mybucket" + assert export.access_type == "none" + assert export.squash == "none" + assert export.protocols == [4] + assert export.transports == ["TCP"] + assert export.fsal.name == "RGW" + assert export.fsal.access_key_id == "the_access_key" + assert export.fsal.secret_access_key == "the_secret_key" + assert len(export.clients) == 1 + assert export.clients[0].squash == 'root' + assert export.fsal.user_id == "some_user" + assert export.clients[0].access_type == 'rw' + assert export.clients[0].addresses == ["192.168.0.0/16"] + assert export.cluster_id == self.cluster_id + + def test_create_export_cephfs(self): + self._do_mock_test(self._do_test_create_export_cephfs) + + def _do_test_create_export_cephfs(self): + nfs_mod = Module('nfs', '', '') + conf = ExportMgr(nfs_mod) + + ls = conf.list_exports(cluster_id=self.cluster_id) + assert len(ls) == 2 + + r = conf.create_export( + fsal_type='cephfs', + cluster_id=self.cluster_id, + fs_name='myfs', + path='/', + pseudo_path='/cephfs2', + read_only=False, + squash='root', + addr=["192.168.1.0/8"], + ) + assert r["bind"] == "/cephfs2" + + ls = conf.list_exports(cluster_id=self.cluster_id) + assert len(ls) == 3 + + export = conf._fetch_export('foo', '/cephfs2') + assert export.export_id + assert export.path == "/" + assert export.pseudo == "/cephfs2" + assert export.access_type == "none" + assert export.squash == "none" + assert export.protocols == [4] + assert export.transports == ["TCP"] + assert export.fsal.name == "CEPH" + assert export.fsal.user_id == "nfs.foo.3" + assert export.fsal.cephx_key == "thekeyforclientabc" + assert len(export.clients) == 1 + assert export.clients[0].squash == 'root' + assert export.clients[0].access_type == 'rw' + assert export.clients[0].addresses == ["192.168.1.0/8"] + assert export.cluster_id == self.cluster_id + + def _do_test_cluster_ls(self): + nfs_mod = Module('nfs', '', '') + cluster = NFSCluster(nfs_mod) + + out = cluster.list_nfs_cluster() + assert out[0] == self.cluster_id + + def test_cluster_ls(self): + self._do_mock_test(self._do_test_cluster_ls) + + def _do_test_cluster_info(self): + nfs_mod = Module('nfs', '', '') + cluster = NFSCluster(nfs_mod) + + out = cluster.show_nfs_cluster_info(self.cluster_id) + assert out == {"foo": {"virtual_ip": None, "backend": []}} + + def test_cluster_info(self): + self._do_mock_test(self._do_test_cluster_info) + + def _do_test_cluster_config(self): + nfs_mod = Module('nfs', '', '') + cluster = NFSCluster(nfs_mod) + + out = cluster.get_nfs_cluster_config(self.cluster_id) + assert out == "" + + cluster.set_nfs_cluster_config(self.cluster_id, '# foo\n') + + out = cluster.get_nfs_cluster_config(self.cluster_id) + assert out == "# foo\n" + + cluster.reset_nfs_cluster_config(self.cluster_id) + + out = cluster.get_nfs_cluster_config(self.cluster_id) + assert out == "" + + def test_cluster_config(self): + self._do_mock_test(self._do_test_cluster_config) + + +@pytest.mark.parametrize( + "path,expected", + [ + ("/foo/bar/baz", "/foo/bar/baz"), + ("/foo/bar/baz/", "/foo/bar/baz"), + ("/foo/bar/baz ", "/foo/bar/baz"), + ("/foo/./bar/baz", "/foo/bar/baz"), + ("/foo/bar/baz/..", "/foo/bar"), + ("//foo/bar/baz", "/foo/bar/baz"), + ("", ""), + ] +) +def test_normalize_path(path, expected): + assert normalize_path(path) == expected + + +def test_ganesha_validate_squash(): + """Check error handling of internal validation function for squash value.""" + from nfs.ganesha_conf import _validate_squash + from nfs.exception import NFSInvalidOperation + + _validate_squash("root") + with pytest.raises(NFSInvalidOperation): + _validate_squash("toot") + + +def test_ganesha_validate_access_type(): + """Check error handling of internal validation function for access type value.""" + from nfs.ganesha_conf import _validate_access_type + from nfs.exception import NFSInvalidOperation + + for ok in ("rw", "ro", "none"): + _validate_access_type(ok) + with pytest.raises(NFSInvalidOperation): + _validate_access_type("any") diff --git a/src/pybind/mgr/nfs/utils.py b/src/pybind/mgr/nfs/utils.py new file mode 100644 index 000000000..ba3190a96 --- /dev/null +++ b/src/pybind/mgr/nfs/utils.py @@ -0,0 +1,104 @@ +import functools +import logging +import stat +from typing import List, Tuple, TYPE_CHECKING + +from object_format import ErrorResponseBase +import orchestrator +import cephfs +from mgr_util import CephfsClient, open_filesystem + +if TYPE_CHECKING: + from nfs.module import Module + +EXPORT_PREFIX: str = "export-" +CONF_PREFIX: str = "conf-nfs." +USER_CONF_PREFIX: str = "userconf-nfs." + +log = logging.getLogger(__name__) + + +class NonFatalError(ErrorResponseBase): + """Raise this exception when you want to interrupt the flow of a function + and return an informative message to the user. In certain situations the + NFS MGR module wants to indicate an action was or was not taken but still + return a success code so that non-interactive scripts continue as if the + overall action was completed. + """ + def __init__(self, msg: str) -> None: + super().__init__(msg) + self.msg = msg + + def format_response(self) -> Tuple[int, str, str]: + return 0, "", self.msg + + +class ManualRestartRequired(NonFatalError): + """Raise this exception type if all other changes were successful but + user needs to manually restart nfs services. + """ + + def __init__(self, msg: str) -> None: + super().__init__(" ".join((msg, "(Manual Restart of NFS Pods required)"))) + + +def export_obj_name(export_id: int) -> str: + """Return a rados object name for the export.""" + return f"{EXPORT_PREFIX}{export_id}" + + +def conf_obj_name(cluster_id: str) -> str: + """Return a rados object name for the config.""" + return f"{CONF_PREFIX}{cluster_id}" + + +def user_conf_obj_name(cluster_id: str) -> str: + """Returna a rados object name for the user config.""" + return f"{USER_CONF_PREFIX}{cluster_id}" + + +def available_clusters(mgr: 'Module') -> List[str]: + ''' + This method returns list of available cluster ids. + Service name is service_type.service_id + Example: + completion.result value: + > + return value: ['vstart'] + ''' + # TODO check cephadm cluster list with rados pool conf objects + completion = mgr.describe_service(service_type='nfs') + orchestrator.raise_if_exception(completion) + assert completion.result is not None + return [cluster.spec.service_id for cluster in completion.result + if cluster.spec.service_id] + + +def restart_nfs_service(mgr: 'Module', cluster_id: str) -> None: + ''' + This methods restarts the nfs daemons + ''' + completion = mgr.service_action(action='restart', + service_name='nfs.' + cluster_id) + orchestrator.raise_if_exception(completion) + + +def check_fs(mgr: 'Module', fs_name: str) -> bool: + ''' + This method checks if given fs is valid + ''' + fs_map = mgr.get('fs_map') + return fs_name in [fs['mdsmap']['fs_name'] for fs in fs_map['filesystems']] + + +def cephfs_path_is_dir(mgr: 'Module', fs: str, path: str) -> None: + @functools.lru_cache(maxsize=1) + def _get_cephfs_client() -> CephfsClient: + return CephfsClient(mgr) + cephfs_client = _get_cephfs_client() + + with open_filesystem(cephfs_client, fs) as fs_handle: + stx = fs_handle.statx(path.encode('utf-8'), cephfs.CEPH_STATX_MODE, + cephfs.AT_SYMLINK_NOFOLLOW) + if not stat.S_ISDIR(stx.get('mode')): + raise NotADirectoryError() -- cgit v1.2.3