summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/nfs
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/nfs')
-rw-r--r--src/pybind/mgr/nfs/__init__.py7
-rw-r--r--src/pybind/mgr/nfs/cluster.py309
-rw-r--r--src/pybind/mgr/nfs/exception.py32
-rw-r--r--src/pybind/mgr/nfs/export.py856
-rw-r--r--src/pybind/mgr/nfs/ganesha_conf.py548
-rw-r--r--src/pybind/mgr/nfs/module.py189
-rw-r--r--src/pybind/mgr/nfs/tests/__init__.py0
-rw-r--r--src/pybind/mgr/nfs/tests/test_nfs.py1156
-rw-r--r--src/pybind/mgr/nfs/utils.py104
9 files changed, 3201 insertions, 0 deletions
diff --git a/src/pybind/mgr/nfs/__init__.py b/src/pybind/mgr/nfs/__init__.py
new file mode 100644
index 000000000..4e2257788
--- /dev/null
+++ b/src/pybind/mgr/nfs/__init__.py
@@ -0,0 +1,7 @@
+# flake8: noqa
+
+import os
+if 'UNITTEST' in os.environ:
+ import tests
+
+from .module import Module
diff --git a/src/pybind/mgr/nfs/cluster.py b/src/pybind/mgr/nfs/cluster.py
new file mode 100644
index 000000000..d558a3a37
--- /dev/null
+++ b/src/pybind/mgr/nfs/cluster.py
@@ -0,0 +1,309 @@
+import ipaddress
+import logging
+import re
+import socket
+from typing import cast, Dict, List, Any, Union, Optional, TYPE_CHECKING
+
+from mgr_module import NFS_POOL_NAME as POOL_NAME
+from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec, IngressSpec
+from object_format import ErrorResponse
+
+import orchestrator
+from orchestrator.module import IngressType
+
+from .exception import NFSInvalidOperation, ClusterNotFound
+from .utils import (
+ ManualRestartRequired,
+ NonFatalError,
+ available_clusters,
+ conf_obj_name,
+ restart_nfs_service,
+ user_conf_obj_name)
+from .export import NFSRados
+
+if TYPE_CHECKING:
+ from nfs.module import Module
+ from mgr_module import MgrModule
+
+
+log = logging.getLogger(__name__)
+
+
+def resolve_ip(hostname: str) -> str:
+ try:
+ r = socket.getaddrinfo(hostname, None, flags=socket.AI_CANONNAME,
+ type=socket.SOCK_STREAM)
+ # pick first v4 IP, if present
+ for a in r:
+ if a[0] == socket.AF_INET:
+ return a[4][0]
+ return r[0][4][0]
+ except socket.gaierror as e:
+ raise NFSInvalidOperation(f"Cannot resolve IP for host {hostname}: {e}")
+
+
+def create_ganesha_pool(mgr: 'MgrModule') -> None:
+ pool_list = [p['pool_name'] for p in mgr.get_osdmap().dump().get('pools', [])]
+ if POOL_NAME not in pool_list:
+ mgr.check_mon_command({'prefix': 'osd pool create',
+ 'pool': POOL_NAME,
+ 'yes_i_really_mean_it': True})
+ mgr.check_mon_command({'prefix': 'osd pool application enable',
+ 'pool': POOL_NAME,
+ 'app': 'nfs'})
+ log.debug("Successfully created nfs-ganesha pool %s", POOL_NAME)
+
+
+class NFSCluster:
+ def __init__(self, mgr: 'Module') -> None:
+ self.mgr = mgr
+
+ def _call_orch_apply_nfs(
+ self,
+ cluster_id: str,
+ placement: Optional[str] = None,
+ virtual_ip: Optional[str] = None,
+ ingress_mode: Optional[IngressType] = None,
+ port: Optional[int] = None,
+ ) -> None:
+ if not port:
+ port = 2049 # default nfs port
+ if virtual_ip:
+ # nfs + ingress
+ # run NFS on non-standard port
+ if not ingress_mode:
+ ingress_mode = IngressType.default
+ ingress_mode = ingress_mode.canonicalize()
+ pspec = PlacementSpec.from_string(placement)
+ if ingress_mode == IngressType.keepalive_only:
+ # enforce count=1 for nfs over keepalive only
+ pspec.count = 1
+
+ ganesha_port = 10000 + port # semi-arbitrary, fix me someday
+ frontend_port: Optional[int] = port
+ virtual_ip_for_ganesha: Optional[str] = None
+ keepalive_only: bool = False
+ enable_haproxy_protocol: bool = False
+ if ingress_mode == IngressType.haproxy_protocol:
+ enable_haproxy_protocol = True
+ elif ingress_mode == IngressType.keepalive_only:
+ keepalive_only = True
+ virtual_ip_for_ganesha = virtual_ip.split('/')[0]
+ ganesha_port = port
+ frontend_port = None
+
+ spec = NFSServiceSpec(service_type='nfs', service_id=cluster_id,
+ placement=pspec,
+ # use non-default port so we don't conflict with ingress
+ port=ganesha_port,
+ virtual_ip=virtual_ip_for_ganesha,
+ enable_haproxy_protocol=enable_haproxy_protocol)
+ completion = self.mgr.apply_nfs(spec)
+ orchestrator.raise_if_exception(completion)
+ ispec = IngressSpec(service_type='ingress',
+ service_id='nfs.' + cluster_id,
+ backend_service='nfs.' + cluster_id,
+ placement=pspec,
+ frontend_port=frontend_port,
+ monitor_port=7000 + port, # semi-arbitrary, fix me someday
+ virtual_ip=virtual_ip,
+ keepalive_only=keepalive_only,
+ enable_haproxy_protocol=enable_haproxy_protocol)
+ completion = self.mgr.apply_ingress(ispec)
+ orchestrator.raise_if_exception(completion)
+ else:
+ # standalone nfs
+ spec = NFSServiceSpec(service_type='nfs', service_id=cluster_id,
+ placement=PlacementSpec.from_string(placement),
+ port=port)
+ completion = self.mgr.apply_nfs(spec)
+ orchestrator.raise_if_exception(completion)
+ log.debug("Successfully deployed nfs daemons with cluster id %s and placement %s",
+ cluster_id, placement)
+
+ def create_empty_rados_obj(self, cluster_id: str) -> None:
+ common_conf = conf_obj_name(cluster_id)
+ self._rados(cluster_id).write_obj('', conf_obj_name(cluster_id))
+ log.info("Created empty object:%s", common_conf)
+
+ def delete_config_obj(self, cluster_id: str) -> None:
+ self._rados(cluster_id).remove_all_obj()
+ log.info("Deleted %s object and all objects in %s",
+ conf_obj_name(cluster_id), cluster_id)
+
+ def create_nfs_cluster(
+ self,
+ cluster_id: str,
+ placement: Optional[str],
+ virtual_ip: Optional[str],
+ ingress: Optional[bool] = None,
+ ingress_mode: Optional[IngressType] = None,
+ port: Optional[int] = None,
+ ) -> None:
+ try:
+ if virtual_ip:
+ # validate virtual_ip value: ip_address throws a ValueError
+ # exception in case it's not a valid ipv4 or ipv6 address
+ ip = virtual_ip.split('/')[0]
+ ipaddress.ip_address(ip)
+ if virtual_ip and not ingress:
+ raise NFSInvalidOperation('virtual_ip can only be provided with ingress enabled')
+ if not virtual_ip and ingress:
+ raise NFSInvalidOperation('ingress currently requires a virtual_ip')
+ if ingress_mode and not ingress:
+ raise NFSInvalidOperation('--ingress-mode must be passed along with --ingress')
+ invalid_str = re.search('[^A-Za-z0-9-_.]', cluster_id)
+ if invalid_str:
+ raise NFSInvalidOperation(f"cluster id {cluster_id} is invalid. "
+ f"{invalid_str.group()} is char not permitted")
+
+ create_ganesha_pool(self.mgr)
+
+ self.create_empty_rados_obj(cluster_id)
+
+ if cluster_id not in available_clusters(self.mgr):
+ self._call_orch_apply_nfs(cluster_id, placement, virtual_ip, ingress_mode, port)
+ return
+ raise NonFatalError(f"{cluster_id} cluster already exists")
+ except Exception as e:
+ log.exception(f"NFS Cluster {cluster_id} could not be created")
+ raise ErrorResponse.wrap(e)
+
+ def delete_nfs_cluster(self, cluster_id: str) -> None:
+ try:
+ cluster_list = available_clusters(self.mgr)
+ if cluster_id in cluster_list:
+ self.mgr.export_mgr.delete_all_exports(cluster_id)
+ completion = self.mgr.remove_service('ingress.nfs.' + cluster_id)
+ orchestrator.raise_if_exception(completion)
+ completion = self.mgr.remove_service('nfs.' + cluster_id)
+ orchestrator.raise_if_exception(completion)
+ self.delete_config_obj(cluster_id)
+ return
+ raise NonFatalError("Cluster does not exist")
+ except Exception as e:
+ log.exception(f"Failed to delete NFS Cluster {cluster_id}")
+ raise ErrorResponse.wrap(e)
+
+ def list_nfs_cluster(self) -> List[str]:
+ try:
+ return available_clusters(self.mgr)
+ except Exception as e:
+ log.exception("Failed to list NFS Cluster")
+ raise ErrorResponse.wrap(e)
+
+ def _show_nfs_cluster_info(self, cluster_id: str) -> Dict[str, Any]:
+ completion = self.mgr.list_daemons(daemon_type='nfs')
+ # Here completion.result is a list DaemonDescription objects
+ clusters = orchestrator.raise_if_exception(completion)
+ backends: List[Dict[str, Union[Any]]] = []
+
+ for cluster in clusters:
+ if cluster_id == cluster.service_id():
+ assert cluster.hostname
+ try:
+ if cluster.ip:
+ ip = cluster.ip
+ else:
+ c = self.mgr.get_hosts()
+ orchestrator.raise_if_exception(c)
+ hosts = [h for h in c.result or []
+ if h.hostname == cluster.hostname]
+ if hosts:
+ ip = resolve_ip(hosts[0].addr)
+ else:
+ # sigh
+ ip = resolve_ip(cluster.hostname)
+ backends.append({
+ "hostname": cluster.hostname,
+ "ip": ip,
+ "port": cluster.ports[0] if cluster.ports else None
+ })
+ except orchestrator.OrchestratorError:
+ continue
+
+ r: Dict[str, Any] = {
+ 'virtual_ip': None,
+ 'backend': backends,
+ }
+ sc = self.mgr.describe_service(service_type='ingress')
+ services = orchestrator.raise_if_exception(sc)
+ for i in services:
+ spec = cast(IngressSpec, i.spec)
+ if spec.backend_service == f'nfs.{cluster_id}':
+ r['virtual_ip'] = i.virtual_ip.split('/')[0] if i.virtual_ip else None
+ if i.ports:
+ r['port'] = i.ports[0]
+ if len(i.ports) > 1:
+ r['monitor_port'] = i.ports[1]
+ log.debug("Successfully fetched %s info: %s", cluster_id, r)
+ return r
+
+ def show_nfs_cluster_info(self, cluster_id: Optional[str] = None) -> Dict[str, Any]:
+ try:
+ if cluster_id and cluster_id not in available_clusters(self.mgr):
+ raise ClusterNotFound()
+ info_res = {}
+ if cluster_id:
+ cluster_ls = [cluster_id]
+ else:
+ cluster_ls = available_clusters(self.mgr)
+
+ for cluster_id in cluster_ls:
+ res = self._show_nfs_cluster_info(cluster_id)
+ if res:
+ info_res[cluster_id] = res
+ return info_res
+ except Exception as e:
+ log.exception("Failed to show info for cluster")
+ raise ErrorResponse.wrap(e)
+
+ def get_nfs_cluster_config(self, cluster_id: str) -> str:
+ try:
+ if cluster_id in available_clusters(self.mgr):
+ rados_obj = self._rados(cluster_id)
+ conf = rados_obj.read_obj(user_conf_obj_name(cluster_id))
+ return conf or ""
+ raise ClusterNotFound()
+ except Exception as e:
+ log.exception(f"Fetching NFS-Ganesha Config failed for {cluster_id}")
+ raise ErrorResponse.wrap(e)
+
+ def set_nfs_cluster_config(self, cluster_id: str, nfs_config: str) -> None:
+ try:
+ if cluster_id in available_clusters(self.mgr):
+ rados_obj = self._rados(cluster_id)
+ if rados_obj.check_user_config():
+ raise NonFatalError("NFS-Ganesha User Config already exists")
+ rados_obj.write_obj(nfs_config, user_conf_obj_name(cluster_id),
+ conf_obj_name(cluster_id))
+ log.debug("Successfully saved %s's user config: \n %s", cluster_id, nfs_config)
+ restart_nfs_service(self.mgr, cluster_id)
+ return
+ raise ClusterNotFound()
+ except NotImplementedError:
+ raise ManualRestartRequired("NFS-Ganesha Config Added Successfully")
+ except Exception as e:
+ log.exception(f"Setting NFS-Ganesha Config failed for {cluster_id}")
+ raise ErrorResponse.wrap(e)
+
+ def reset_nfs_cluster_config(self, cluster_id: str) -> None:
+ try:
+ if cluster_id in available_clusters(self.mgr):
+ rados_obj = self._rados(cluster_id)
+ if not rados_obj.check_user_config():
+ raise NonFatalError("NFS-Ganesha User Config does not exist")
+ rados_obj.remove_obj(user_conf_obj_name(cluster_id),
+ conf_obj_name(cluster_id))
+ restart_nfs_service(self.mgr, cluster_id)
+ return
+ raise ClusterNotFound()
+ except NotImplementedError:
+ raise ManualRestartRequired("NFS-Ganesha Config Removed Successfully")
+ except Exception as e:
+ log.exception(f"Resetting NFS-Ganesha Config failed for {cluster_id}")
+ raise ErrorResponse.wrap(e)
+
+ def _rados(self, cluster_id: str) -> NFSRados:
+ """Return a new NFSRados object for the given cluster id."""
+ return NFSRados(self.mgr.rados, cluster_id)
diff --git a/src/pybind/mgr/nfs/exception.py b/src/pybind/mgr/nfs/exception.py
new file mode 100644
index 000000000..6c6e3d9f3
--- /dev/null
+++ b/src/pybind/mgr/nfs/exception.py
@@ -0,0 +1,32 @@
+import errno
+from typing import Optional
+
+
+class NFSException(Exception):
+ def __init__(self, err_msg: str, errno: int = -1) -> None:
+ super(NFSException, self).__init__(errno, err_msg)
+ self.errno = errno
+ self.err_msg = err_msg
+
+ def __str__(self) -> str:
+ return self.err_msg
+
+
+class NFSInvalidOperation(NFSException):
+ def __init__(self, err_msg: str) -> None:
+ super(NFSInvalidOperation, self).__init__(err_msg, -errno.EINVAL)
+
+
+class NFSObjectNotFound(NFSException):
+ def __init__(self, err_msg: str) -> None:
+ super(NFSObjectNotFound, self).__init__(err_msg, -errno.ENOENT)
+
+
+class FSNotFound(NFSObjectNotFound):
+ def __init__(self, fs_name: Optional[str]) -> None:
+ super(FSNotFound, self).__init__(f'filesystem {fs_name} not found')
+
+
+class ClusterNotFound(NFSObjectNotFound):
+ def __init__(self) -> None:
+ super(ClusterNotFound, self).__init__('cluster does not exist')
diff --git a/src/pybind/mgr/nfs/export.py b/src/pybind/mgr/nfs/export.py
new file mode 100644
index 000000000..5887c898f
--- /dev/null
+++ b/src/pybind/mgr/nfs/export.py
@@ -0,0 +1,856 @@
+import errno
+import json
+import logging
+from typing import (
+ List,
+ Any,
+ Dict,
+ Optional,
+ TYPE_CHECKING,
+ TypeVar,
+ Callable,
+ Set,
+ cast)
+from os.path import normpath
+import cephfs
+
+from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES
+
+from object_format import ErrorResponse
+from orchestrator import NoOrchestrator
+from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS
+
+from .ganesha_conf import (
+ CephFSFSAL,
+ Export,
+ GaneshaConfParser,
+ RGWFSAL,
+ RawBlock,
+ format_block)
+from .exception import NFSException, NFSInvalidOperation, FSNotFound, NFSObjectNotFound
+from .utils import (
+ CONF_PREFIX,
+ EXPORT_PREFIX,
+ NonFatalError,
+ USER_CONF_PREFIX,
+ export_obj_name,
+ conf_obj_name,
+ available_clusters,
+ check_fs,
+ restart_nfs_service, cephfs_path_is_dir)
+
+if TYPE_CHECKING:
+ from nfs.module import Module
+
+FuncT = TypeVar('FuncT', bound=Callable)
+
+log = logging.getLogger(__name__)
+
+
+def known_cluster_ids(mgr: 'Module') -> Set[str]:
+ """Return the set of known cluster IDs."""
+ try:
+ clusters = set(available_clusters(mgr))
+ except NoOrchestrator:
+ clusters = nfs_rados_configs(mgr.rados)
+ return clusters
+
+
+def _check_rados_notify(ioctx: Any, obj: str) -> None:
+ try:
+ ioctx.notify(obj)
+ except TimedOut:
+ log.exception("Ganesha timed out")
+
+
+def normalize_path(path: str) -> str:
+ if path:
+ path = normpath(path.strip())
+ if path[:2] == "//":
+ path = path[1:]
+ return path
+
+
+class NFSRados:
+ def __init__(self, rados: 'Rados', namespace: str) -> None:
+ self.rados = rados
+ self.pool = POOL_NAME
+ self.namespace = namespace
+
+ def _make_rados_url(self, obj: str) -> str:
+ return "rados://{}/{}/{}".format(self.pool, self.namespace, obj)
+
+ def _create_url_block(self, obj_name: str) -> RawBlock:
+ return RawBlock('%url', values={'value': self._make_rados_url(obj_name)})
+
+ def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ ioctx.write_full(obj, conf_block.encode('utf-8'))
+ if not config_obj:
+ # Return after creating empty common config object
+ return
+ log.debug("write configuration into rados object %s/%s/%s",
+ self.pool, self.namespace, obj)
+
+ # Add created obj url to common config obj
+ ioctx.append(config_obj, format_block(
+ self._create_url_block(obj)).encode('utf-8'))
+ _check_rados_notify(ioctx, config_obj)
+ log.debug("Added %s url to %s", obj, config_obj)
+
+ def read_obj(self, obj: str) -> Optional[str]:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ try:
+ return ioctx.read(obj, 1048576).decode()
+ except ObjectNotFound:
+ return None
+
+ def update_obj(self, conf_block: str, obj: str, config_obj: str,
+ should_notify: Optional[bool] = True) -> None:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ ioctx.write_full(obj, conf_block.encode('utf-8'))
+ log.debug("write configuration into rados object %s/%s/%s",
+ self.pool, self.namespace, obj)
+ if should_notify:
+ _check_rados_notify(ioctx, config_obj)
+ log.debug("Update export %s in %s", obj, config_obj)
+
+ def remove_obj(self, obj: str, config_obj: str) -> None:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ export_urls = ioctx.read(config_obj)
+ url = '%url "{}"\n\n'.format(self._make_rados_url(obj))
+ export_urls = export_urls.replace(url.encode('utf-8'), b'')
+ ioctx.remove_object(obj)
+ ioctx.write_full(config_obj, export_urls)
+ _check_rados_notify(ioctx, config_obj)
+ log.debug("Object deleted: %s", url)
+
+ def remove_all_obj(self) -> None:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ for obj in ioctx.list_objects():
+ obj.remove()
+
+ def check_user_config(self) -> bool:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ for obj in ioctx.list_objects():
+ if obj.key.startswith(USER_CONF_PREFIX):
+ return True
+ return False
+
+
+def nfs_rados_configs(rados: 'Rados', nfs_pool: str = POOL_NAME) -> Set[str]:
+ """Return a set of all the namespaces in the nfs_pool where nfs
+ configuration objects are found. The namespaces also correspond
+ to the cluster ids.
+ """
+ ns: Set[str] = set()
+ prefixes = (EXPORT_PREFIX, CONF_PREFIX, USER_CONF_PREFIX)
+ with rados.open_ioctx(nfs_pool) as ioctx:
+ ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
+ for obj in ioctx.list_objects():
+ if obj.key.startswith(prefixes):
+ ns.add(obj.nspace)
+ return ns
+
+
+class AppliedExportResults:
+ """Gathers the results of multiple changed exports.
+ Returned by apply_export.
+ """
+
+ def __init__(self) -> None:
+ self.changes: List[Dict[str, str]] = []
+ self.has_error = False
+
+ def append(self, value: Dict[str, str]) -> None:
+ if value.get("state", "") == "error":
+ self.has_error = True
+ self.changes.append(value)
+
+ def to_simplified(self) -> List[Dict[str, str]]:
+ return self.changes
+
+ def mgr_return_value(self) -> int:
+ return -errno.EIO if self.has_error else 0
+
+
+class ExportMgr:
+ def __init__(
+ self,
+ mgr: 'Module',
+ export_ls: Optional[Dict[str, List[Export]]] = None
+ ) -> None:
+ self.mgr = mgr
+ self.rados_pool = POOL_NAME
+ self._exports: Optional[Dict[str, List[Export]]] = export_ls
+
+ @property
+ def exports(self) -> Dict[str, List[Export]]:
+ if self._exports is None:
+ self._exports = {}
+ log.info("Begin export parsing")
+ for cluster_id in known_cluster_ids(self.mgr):
+ self.export_conf_objs = [] # type: List[Export]
+ self._read_raw_config(cluster_id)
+ self._exports[cluster_id] = self.export_conf_objs
+ log.info("Exports parsed successfully %s", self.exports.items())
+ return self._exports
+
+ def _fetch_export(
+ self,
+ cluster_id: str,
+ pseudo_path: str
+ ) -> Optional[Export]:
+ try:
+ for ex in self.exports[cluster_id]:
+ if ex.pseudo == pseudo_path:
+ return ex
+ return None
+ except KeyError:
+ log.info('no exports for cluster %s', cluster_id)
+ return None
+
+ def _fetch_export_id(
+ self,
+ cluster_id: str,
+ export_id: int
+ ) -> Optional[Export]:
+ try:
+ for ex in self.exports[cluster_id]:
+ if ex.export_id == export_id:
+ return ex
+ return None
+ except KeyError:
+ log.info(f'no exports for cluster {cluster_id}')
+ return None
+
+ def _delete_export_user(self, export: Export) -> None:
+ if isinstance(export.fsal, CephFSFSAL):
+ assert export.fsal.user_id
+ self.mgr.check_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': 'client.{}'.format(export.fsal.user_id),
+ })
+ log.info("Deleted export user %s", export.fsal.user_id)
+ elif isinstance(export.fsal, RGWFSAL):
+ # do nothing; we're using the bucket owner creds.
+ pass
+
+ def _create_export_user(self, export: Export) -> None:
+ if isinstance(export.fsal, CephFSFSAL):
+ fsal = cast(CephFSFSAL, export.fsal)
+ assert fsal.fs_name
+ fsal.user_id = f"nfs.{export.cluster_id}.{export.export_id}"
+ fsal.cephx_key = self._create_user_key(
+ export.cluster_id, fsal.user_id, export.path, fsal.fs_name
+ )
+ log.debug("Successfully created user %s for cephfs path %s", fsal.user_id, export.path)
+
+ elif isinstance(export.fsal, RGWFSAL):
+ rgwfsal = cast(RGWFSAL, export.fsal)
+ if not rgwfsal.user_id:
+ assert export.path
+ ret, out, err = self.mgr.tool_exec(
+ ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path]
+ )
+ if ret:
+ raise NFSException(f'Failed to fetch owner for bucket {export.path}')
+ j = json.loads(out)
+ owner = j.get('owner', '')
+ rgwfsal.user_id = owner
+ assert rgwfsal.user_id
+ ret, out, err = self.mgr.tool_exec([
+ 'radosgw-admin', 'user', 'info', '--uid', rgwfsal.user_id
+ ])
+ if ret:
+ raise NFSException(
+ f'Failed to fetch key for bucket {export.path} owner {rgwfsal.user_id}'
+ )
+ j = json.loads(out)
+
+ # FIXME: make this more tolerate of unexpected output?
+ rgwfsal.access_key_id = j['keys'][0]['access_key']
+ rgwfsal.secret_access_key = j['keys'][0]['secret_key']
+ log.debug("Successfully fetched user %s for RGW path %s", rgwfsal.user_id, export.path)
+
+ def _gen_export_id(self, cluster_id: str) -> int:
+ exports = sorted([ex.export_id for ex in self.exports[cluster_id]])
+ nid = 1
+ for e_id in exports:
+ if e_id == nid:
+ nid += 1
+ else:
+ break
+ return nid
+
+ def _read_raw_config(self, rados_namespace: str) -> None:
+ with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
+ ioctx.set_namespace(rados_namespace)
+ for obj in ioctx.list_objects():
+ if obj.key.startswith(EXPORT_PREFIX):
+ size, _ = obj.stat()
+ raw_config = obj.read(size)
+ raw_config = raw_config.decode("utf-8")
+ log.debug("read export configuration from rados "
+ "object %s/%s/%s", self.rados_pool,
+ rados_namespace, obj.key)
+ self.export_conf_objs.append(Export.from_export_block(
+ GaneshaConfParser(raw_config).parse()[0], rados_namespace))
+
+ def _save_export(self, cluster_id: str, export: Export) -> None:
+ self.exports[cluster_id].append(export)
+ self._rados(cluster_id).write_obj(
+ format_block(export.to_export_block()),
+ export_obj_name(export.export_id),
+ conf_obj_name(export.cluster_id)
+ )
+
+ def _delete_export(
+ self,
+ cluster_id: str,
+ pseudo_path: Optional[str],
+ export_obj: Optional[Export] = None
+ ) -> None:
+ try:
+ if export_obj:
+ export: Optional[Export] = export_obj
+ else:
+ assert pseudo_path
+ export = self._fetch_export(cluster_id, pseudo_path)
+
+ if export:
+ if pseudo_path:
+ self._rados(cluster_id).remove_obj(
+ export_obj_name(export.export_id), conf_obj_name(cluster_id))
+ self.exports[cluster_id].remove(export)
+ self._delete_export_user(export)
+ if not self.exports[cluster_id]:
+ del self.exports[cluster_id]
+ log.debug("Deleted all exports for cluster %s", cluster_id)
+ return None
+ raise NonFatalError("Export does not exist")
+ except Exception as e:
+ log.exception(f"Failed to delete {pseudo_path} export for {cluster_id}")
+ raise ErrorResponse.wrap(e)
+
+ def _fetch_export_obj(self, cluster_id: str, ex_id: int) -> Optional[Export]:
+ try:
+ with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
+ ioctx.set_namespace(cluster_id)
+ export = Export.from_export_block(
+ GaneshaConfParser(
+ ioctx.read(export_obj_name(ex_id)).decode("utf-8")
+ ).parse()[0],
+ cluster_id
+ )
+ return export
+ except ObjectNotFound:
+ log.exception("Export ID: %s not found", ex_id)
+ return None
+
+ def _update_export(self, cluster_id: str, export: Export,
+ need_nfs_service_restart: bool) -> None:
+ self.exports[cluster_id].append(export)
+ self._rados(cluster_id).update_obj(
+ format_block(export.to_export_block()),
+ export_obj_name(export.export_id), conf_obj_name(export.cluster_id),
+ should_notify=not need_nfs_service_restart)
+ if need_nfs_service_restart:
+ restart_nfs_service(self.mgr, export.cluster_id)
+
+ def _validate_cluster_id(self, cluster_id: str) -> None:
+ """Raise an exception if cluster_id is not valid."""
+ clusters = known_cluster_ids(self.mgr)
+ log.debug("checking for %r in known nfs clusters: %r",
+ cluster_id, clusters)
+ if cluster_id not in clusters:
+ raise ErrorResponse(f"Cluster {cluster_id!r} does not exist",
+ return_value=-errno.ENOENT)
+
+ def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Dict[str, Any]:
+ self._validate_cluster_id(kwargs['cluster_id'])
+ # if addr(s) are provided, construct client list and adjust outer block
+ clients = []
+ if addr:
+ clients = [{
+ 'addresses': addr,
+ 'access_type': 'ro' if kwargs['read_only'] else 'rw',
+ 'squash': kwargs['squash'],
+ }]
+ kwargs['squash'] = 'none'
+ kwargs['clients'] = clients
+
+ if clients:
+ kwargs['access_type'] = "none"
+ elif kwargs['read_only']:
+ kwargs['access_type'] = "RO"
+ else:
+ kwargs['access_type'] = "RW"
+
+ if kwargs['cluster_id'] not in self.exports:
+ self.exports[kwargs['cluster_id']] = []
+
+ try:
+ fsal_type = kwargs.pop('fsal_type')
+ if fsal_type == 'cephfs':
+ return self.create_cephfs_export(**kwargs)
+ if fsal_type == 'rgw':
+ return self.create_rgw_export(**kwargs)
+ raise NotImplementedError()
+ except Exception as e:
+ log.exception(
+ f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}")
+ raise ErrorResponse.wrap(e)
+
+ def delete_export(self,
+ cluster_id: str,
+ pseudo_path: str) -> None:
+ self._validate_cluster_id(cluster_id)
+ return self._delete_export(cluster_id, pseudo_path)
+
+ def delete_all_exports(self, cluster_id: str) -> None:
+ try:
+ export_list = list(self.exports[cluster_id])
+ except KeyError:
+ log.info("No exports to delete")
+ return
+ for export in export_list:
+ try:
+ self._delete_export(cluster_id=cluster_id, pseudo_path=None,
+ export_obj=export)
+ except Exception as e:
+ raise NFSException(f"Failed to delete export {export.export_id}: {e}")
+ log.info("All exports successfully deleted for cluster id: %s", cluster_id)
+
+ def list_all_exports(self) -> List[Dict[str, Any]]:
+ r = []
+ for cluster_id, ls in self.exports.items():
+ r.extend([e.to_dict() for e in ls])
+ return r
+
+ def list_exports(self,
+ cluster_id: str,
+ detailed: bool = False) -> List[Any]:
+ self._validate_cluster_id(cluster_id)
+ try:
+ if detailed:
+ result_d = [export.to_dict() for export in self.exports[cluster_id]]
+ return result_d
+ else:
+ result_ps = [export.pseudo for export in self.exports[cluster_id]]
+ return result_ps
+
+ except KeyError:
+ log.warning("No exports to list for %s", cluster_id)
+ return []
+ except Exception as e:
+ log.exception(f"Failed to list exports for {cluster_id}")
+ raise ErrorResponse.wrap(e)
+
+ def _get_export_dict(self, cluster_id: str, pseudo_path: str) -> Optional[Dict[str, Any]]:
+ export = self._fetch_export(cluster_id, pseudo_path)
+ if export:
+ return export.to_dict()
+ log.warning(f"No {pseudo_path} export to show for {cluster_id}")
+ return None
+
+ def get_export(
+ self,
+ cluster_id: str,
+ pseudo_path: str,
+ ) -> Dict[str, Any]:
+ self._validate_cluster_id(cluster_id)
+ try:
+ export_dict = self._get_export_dict(cluster_id, pseudo_path)
+ log.info(f"Fetched {export_dict!r} for {cluster_id!r}, {pseudo_path!r}")
+ return export_dict if export_dict else {}
+ except Exception as e:
+ log.exception(f"Failed to get {pseudo_path} export for {cluster_id}")
+ raise ErrorResponse.wrap(e)
+
+ def get_export_by_id(
+ self,
+ cluster_id: str,
+ export_id: int
+ ) -> Optional[Dict[str, Any]]:
+ export = self._fetch_export_id(cluster_id, export_id)
+ return export.to_dict() if export else None
+
+ def get_export_by_pseudo(
+ self,
+ cluster_id: str,
+ pseudo_path: str
+ ) -> Optional[Dict[str, Any]]:
+ export = self._fetch_export(cluster_id, pseudo_path)
+ return export.to_dict() if export else None
+
+ # This method is used by the dashboard module (../dashboard/controllers/nfs.py)
+ # Do not change interface without updating the Dashboard code
+ def apply_export(self, cluster_id: str, export_config: str) -> AppliedExportResults:
+ try:
+ exports = self._read_export_config(cluster_id, export_config)
+ except Exception as e:
+ log.exception(f'Failed to update export: {e}')
+ raise ErrorResponse.wrap(e)
+
+ aeresults = AppliedExportResults()
+ for export in exports:
+ aeresults.append(self._change_export(cluster_id, export))
+ return aeresults
+
+ def _read_export_config(self, cluster_id: str, export_config: str) -> List[Dict]:
+ if not export_config:
+ raise NFSInvalidOperation("Empty Config!!")
+ try:
+ j = json.loads(export_config)
+ except ValueError:
+ # okay, not JSON. is it an EXPORT block?
+ try:
+ blocks = GaneshaConfParser(export_config).parse()
+ exports = [
+ Export.from_export_block(block, cluster_id)
+ for block in blocks
+ ]
+ j = [export.to_dict() for export in exports]
+ except Exception as ex:
+ raise NFSInvalidOperation(f"Input must be JSON or a ganesha EXPORT block: {ex}")
+ # check export type - always return a list
+ if isinstance(j, list):
+ return j # j is already a list object
+ return [j] # return a single object list, with j as the only item
+
+ def _change_export(self, cluster_id: str, export: Dict) -> Dict[str, str]:
+ try:
+ return self._apply_export(cluster_id, export)
+ except NotImplementedError:
+ # in theory, the NotImplementedError here may be raised by a hook back to
+ # an orchestration module. If the orchestration module supports it the NFS
+ # servers may be restarted. If not supported the expectation is that an
+ # (unfortunately generic) NotImplementedError will be raised. We then
+ # indicate to the user that manual intervention may be needed now that the
+ # configuration changes have been applied.
+ return {
+ "pseudo": export['pseudo'],
+ "state": "warning",
+ "msg": "changes applied (Manual restart of NFS Pods required)",
+ }
+ except Exception as ex:
+ msg = f'Failed to apply export: {ex}'
+ log.exception(msg)
+ return {"state": "error", "msg": msg}
+
+ def _update_user_id(
+ self,
+ cluster_id: str,
+ path: str,
+ fs_name: str,
+ user_id: str
+ ) -> None:
+ osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
+ self.rados_pool, cluster_id, fs_name)
+ # NFS-Ganesha can dynamically enforce an export's access type changes, but Ceph server
+ # daemons can't dynamically enforce changes in Ceph user caps of the Ceph clients. To
+ # allow dynamic updates of CephFS NFS exports, always set FSAL Ceph user's MDS caps with
+ # path restricted read-write access. Rely on the ganesha servers to enforce the export
+ # access type requested for the NFS clients.
+ self.mgr.check_mon_command({
+ 'prefix': 'auth caps',
+ 'entity': f'client.{user_id}',
+ 'caps': ['mon', 'allow r', 'osd', osd_cap, 'mds', 'allow rw path={}'.format(path)],
+ })
+
+ log.info("Export user updated %s", user_id)
+
+ def _create_user_key(
+ self,
+ cluster_id: str,
+ entity: str,
+ path: str,
+ fs_name: str,
+ ) -> str:
+ osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
+ self.rados_pool, cluster_id, fs_name)
+ nfs_caps = [
+ 'mon', 'allow r',
+ 'osd', osd_cap,
+ 'mds', 'allow rw path={}'.format(path)
+ ]
+
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth get-or-create',
+ 'entity': 'client.{}'.format(entity),
+ 'caps': nfs_caps,
+ 'format': 'json',
+ })
+ if ret == -errno.EINVAL and 'does not match' in err:
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth caps',
+ 'entity': 'client.{}'.format(entity),
+ 'caps': nfs_caps,
+ 'format': 'json',
+ })
+ if err:
+ raise NFSException(f'Failed to update caps for {entity}: {err}')
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth get',
+ 'entity': 'client.{}'.format(entity),
+ 'format': 'json',
+ })
+ if err:
+ raise NFSException(f'Failed to fetch caps for {entity}: {err}')
+
+ json_res = json.loads(out)
+ log.info("Export user created is %s", json_res[0]['entity'])
+ return json_res[0]['key']
+
+ def create_export_from_dict(self,
+ cluster_id: str,
+ ex_id: int,
+ ex_dict: Dict[str, Any]) -> Export:
+ pseudo_path = ex_dict.get("pseudo")
+ if not pseudo_path:
+ raise NFSInvalidOperation("export must specify pseudo path")
+
+ path = ex_dict.get("path")
+ if path is None:
+ raise NFSInvalidOperation("export must specify path")
+ path = normalize_path(path)
+
+ fsal = ex_dict.get("fsal", {})
+ fsal_type = fsal.get("name")
+ if fsal_type == NFS_GANESHA_SUPPORTED_FSALS[1]:
+ if '/' in path and path != '/':
+ raise NFSInvalidOperation('"/" is not allowed in path with bucket name')
+ elif fsal_type == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ fs_name = fsal.get("fs_name")
+ if not fs_name:
+ raise NFSInvalidOperation("export FSAL must specify fs_name")
+ if not check_fs(self.mgr, fs_name):
+ raise FSNotFound(fs_name)
+
+ user_id = f"nfs.{cluster_id}.{ex_id}"
+ if "user_id" in fsal and fsal["user_id"] != user_id:
+ raise NFSInvalidOperation(f"export FSAL user_id must be '{user_id}'")
+ else:
+ raise NFSInvalidOperation(f"NFS Ganesha supported FSALs are {NFS_GANESHA_SUPPORTED_FSALS}."
+ "Export must specify any one of it.")
+
+ ex_dict["fsal"] = fsal
+ ex_dict["cluster_id"] = cluster_id
+ export = Export.from_dict(ex_id, ex_dict)
+ export.validate(self.mgr)
+ log.debug("Successfully created %s export-%s from dict for cluster %s",
+ fsal_type, ex_id, cluster_id)
+ return export
+
+ def create_cephfs_export(self,
+ fs_name: str,
+ cluster_id: str,
+ pseudo_path: str,
+ read_only: bool,
+ path: str,
+ squash: str,
+ access_type: str,
+ clients: list = [],
+ sectype: Optional[List[str]] = None) -> Dict[str, Any]:
+
+ try:
+ cephfs_path_is_dir(self.mgr, fs_name, path)
+ except NotADirectoryError:
+ raise NFSException(f"path {path} is not a dir", -errno.ENOTDIR)
+ except cephfs.ObjectNotFound:
+ raise NFSObjectNotFound(f"path {path} does not exist")
+ except cephfs.Error as e:
+ raise NFSException(e.args[1], -e.args[0])
+
+ pseudo_path = normalize_path(pseudo_path)
+
+ if not self._fetch_export(cluster_id, pseudo_path):
+ export = self.create_export_from_dict(
+ cluster_id,
+ self._gen_export_id(cluster_id),
+ {
+ "pseudo": pseudo_path,
+ "path": path,
+ "access_type": access_type,
+ "squash": squash,
+ "fsal": {
+ "name": NFS_GANESHA_SUPPORTED_FSALS[0],
+ "fs_name": fs_name,
+ },
+ "clients": clients,
+ "sectype": sectype,
+ }
+ )
+ log.debug("creating cephfs export %s", export)
+ self._create_export_user(export)
+ self._save_export(cluster_id, export)
+ result = {
+ "bind": export.pseudo,
+ "fs": fs_name,
+ "path": export.path,
+ "cluster": cluster_id,
+ "mode": export.access_type,
+ }
+ return result
+ raise NonFatalError("Export already exists")
+
+ def create_rgw_export(self,
+ cluster_id: str,
+ pseudo_path: str,
+ access_type: str,
+ read_only: bool,
+ squash: str,
+ bucket: Optional[str] = None,
+ user_id: Optional[str] = None,
+ clients: list = [],
+ sectype: Optional[List[str]] = None) -> Dict[str, Any]:
+ pseudo_path = normalize_path(pseudo_path)
+
+ if not bucket and not user_id:
+ raise ErrorResponse("Must specify either bucket or user_id")
+
+ if not self._fetch_export(cluster_id, pseudo_path):
+ export = self.create_export_from_dict(
+ cluster_id,
+ self._gen_export_id(cluster_id),
+ {
+ "pseudo": pseudo_path,
+ "path": bucket or '/',
+ "access_type": access_type,
+ "squash": squash,
+ "fsal": {
+ "name": NFS_GANESHA_SUPPORTED_FSALS[1],
+ "user_id": user_id,
+ },
+ "clients": clients,
+ "sectype": sectype,
+ }
+ )
+ log.debug("creating rgw export %s", export)
+ self._create_export_user(export)
+ self._save_export(cluster_id, export)
+ result = {
+ "bind": export.pseudo,
+ "path": export.path,
+ "cluster": cluster_id,
+ "mode": export.access_type,
+ "squash": export.squash,
+ }
+ return result
+ raise NonFatalError("Export already exists")
+
+ def _apply_export(
+ self,
+ cluster_id: str,
+ new_export_dict: Dict,
+ ) -> Dict[str, str]:
+ for k in ['path', 'pseudo']:
+ if k not in new_export_dict:
+ raise NFSInvalidOperation(f'Export missing required field {k}')
+ if cluster_id not in self.exports:
+ self.exports[cluster_id] = []
+
+ new_export_dict['path'] = normalize_path(new_export_dict['path'])
+ new_export_dict['pseudo'] = normalize_path(new_export_dict['pseudo'])
+
+ old_export = self._fetch_export(cluster_id, new_export_dict['pseudo'])
+ if old_export:
+ # Check if export id matches
+ if new_export_dict.get('export_id'):
+ if old_export.export_id != new_export_dict.get('export_id'):
+ raise NFSInvalidOperation('Export ID changed, Cannot update export')
+ else:
+ new_export_dict['export_id'] = old_export.export_id
+ elif new_export_dict.get('export_id'):
+ old_export = self._fetch_export_obj(cluster_id, new_export_dict['export_id'])
+ if old_export:
+ # re-fetch via old pseudo
+ old_export = self._fetch_export(cluster_id, old_export.pseudo)
+ assert old_export
+ log.debug("export %s pseudo %s -> %s",
+ old_export.export_id, old_export.pseudo, new_export_dict['pseudo'])
+
+ new_export = self.create_export_from_dict(
+ cluster_id,
+ new_export_dict.get('export_id', self._gen_export_id(cluster_id)),
+ new_export_dict
+ )
+
+ if not old_export:
+ self._create_export_user(new_export)
+ self._save_export(cluster_id, new_export)
+ return {"pseudo": new_export.pseudo, "state": "added"}
+
+ need_nfs_service_restart = True
+ if old_export.fsal.name != new_export.fsal.name:
+ raise NFSInvalidOperation('FSAL change not allowed')
+ if old_export.pseudo != new_export.pseudo:
+ log.debug('export %s pseudo %s -> %s',
+ new_export.export_id, old_export.pseudo, new_export.pseudo)
+
+ if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ old_fsal = cast(CephFSFSAL, old_export.fsal)
+ new_fsal = cast(CephFSFSAL, new_export.fsal)
+ if old_fsal.user_id != new_fsal.user_id:
+ self._delete_export_user(old_export)
+ self._create_export_user(new_export)
+ elif (
+ old_export.path != new_export.path
+ or old_fsal.fs_name != new_fsal.fs_name
+ ):
+ self._update_user_id(
+ cluster_id,
+ new_export.path,
+ cast(str, new_fsal.fs_name),
+ cast(str, new_fsal.user_id)
+ )
+ new_fsal.cephx_key = old_fsal.cephx_key
+ else:
+ expected_mds_caps = 'allow rw path={}'.format(new_export.path)
+ entity = new_fsal.user_id
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth get',
+ 'entity': 'client.{}'.format(entity),
+ 'format': 'json',
+ })
+ if ret:
+ raise NFSException(f'Failed to fetch caps for {entity}: {err}')
+ actual_mds_caps = json.loads(out)[0]['caps'].get('mds')
+ if actual_mds_caps != expected_mds_caps:
+ self._update_user_id(
+ cluster_id,
+ new_export.path,
+ cast(str, new_fsal.fs_name),
+ cast(str, new_fsal.user_id)
+ )
+ elif old_export.pseudo == new_export.pseudo:
+ need_nfs_service_restart = False
+ new_fsal.cephx_key = old_fsal.cephx_key
+
+ if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]:
+ old_rgw_fsal = cast(RGWFSAL, old_export.fsal)
+ new_rgw_fsal = cast(RGWFSAL, new_export.fsal)
+ if old_rgw_fsal.user_id != new_rgw_fsal.user_id:
+ self._delete_export_user(old_export)
+ self._create_export_user(new_export)
+ elif old_rgw_fsal.access_key_id != new_rgw_fsal.access_key_id:
+ raise NFSInvalidOperation('access_key_id change is not allowed')
+ elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key:
+ raise NFSInvalidOperation('secret_access_key change is not allowed')
+
+ self.exports[cluster_id].remove(old_export)
+
+ self._update_export(cluster_id, new_export, need_nfs_service_restart)
+
+ return {"pseudo": new_export.pseudo, "state": "updated"}
+
+ def _rados(self, cluster_id: str) -> NFSRados:
+ """Return a new NFSRados object for the given cluster id."""
+ return NFSRados(self.mgr.rados, cluster_id)
diff --git a/src/pybind/mgr/nfs/ganesha_conf.py b/src/pybind/mgr/nfs/ganesha_conf.py
new file mode 100644
index 000000000..31aaa4ea1
--- /dev/null
+++ b/src/pybind/mgr/nfs/ganesha_conf.py
@@ -0,0 +1,548 @@
+from typing import cast, List, Dict, Any, Optional, TYPE_CHECKING
+from os.path import isabs
+
+from mgr_module import NFS_GANESHA_SUPPORTED_FSALS
+
+from .exception import NFSInvalidOperation, FSNotFound
+from .utils import check_fs
+
+if TYPE_CHECKING:
+ from nfs.module import Module
+
+
+def _indentation(depth: int, size: int = 4) -> str:
+ return " " * (depth * size)
+
+
+def _format_val(block_name: str, key: str, val: str) -> str:
+ if isinstance(val, list):
+ return ', '.join([_format_val(block_name, key, v) for v in val])
+ if isinstance(val, bool):
+ return str(val).lower()
+ if isinstance(val, int) or (block_name == 'CLIENT'
+ and key == 'clients'):
+ return '{}'.format(val)
+ return '"{}"'.format(val)
+
+
+def _validate_squash(squash: str) -> None:
+ valid_squash_ls = [
+ "root", "root_squash", "rootsquash", "rootid", "root_id_squash",
+ "rootidsquash", "all", "all_squash", "allsquash", "all_anomnymous",
+ "allanonymous", "no_root_squash", "none", "noidsquash",
+ ]
+ if squash.lower() not in valid_squash_ls:
+ raise NFSInvalidOperation(
+ f"squash {squash} not in valid list {valid_squash_ls}"
+ )
+
+
+def _validate_access_type(access_type: str) -> None:
+ valid_access_types = ['rw', 'ro', 'none']
+ if not isinstance(access_type, str) or access_type.lower() not in valid_access_types:
+ raise NFSInvalidOperation(
+ f'{access_type} is invalid, valid access type are'
+ f'{valid_access_types}'
+ )
+
+
+def _validate_sec_type(sec_type: str) -> None:
+ valid_sec_types = ["none", "sys", "krb5", "krb5i", "krb5p"]
+ if not isinstance(sec_type, str) or sec_type not in valid_sec_types:
+ raise NFSInvalidOperation(
+ f"SecType {sec_type} invalid, valid types are {valid_sec_types}")
+
+
+class RawBlock():
+ def __init__(self, block_name: str, blocks: List['RawBlock'] = [], values: Dict[str, Any] = {}):
+ if not values: # workaround mutable default argument
+ values = {}
+ if not blocks: # workaround mutable default argument
+ blocks = []
+ self.block_name = block_name
+ self.blocks = blocks
+ self.values = values
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, RawBlock):
+ return False
+ return self.block_name == other.block_name and \
+ self.blocks == other.blocks and \
+ self.values == other.values
+
+ def __repr__(self) -> str:
+ return f'RawBlock({self.block_name!r}, {self.blocks!r}, {self.values!r})'
+
+
+class GaneshaConfParser:
+ def __init__(self, raw_config: str):
+ self.pos = 0
+ self.text = ""
+ for line in raw_config.split("\n"):
+ line = line.lstrip()
+
+ if line.startswith("%"):
+ self.text += line.replace('"', "")
+ self.text += "\n"
+ else:
+ self.text += "".join(line.split())
+
+ def stream(self) -> str:
+ return self.text[self.pos:]
+
+ def last_context(self) -> str:
+ return f'"...{self.text[max(0, self.pos - 30):self.pos]}<here>{self.stream()[:30]}"'
+
+ def parse_block_name(self) -> str:
+ idx = self.stream().find('{')
+ if idx == -1:
+ raise Exception(f"Cannot find block name at {self.last_context()}")
+ block_name = self.stream()[:idx]
+ self.pos += idx + 1
+ return block_name
+
+ def parse_block_or_section(self) -> RawBlock:
+ if self.stream().startswith("%url "):
+ # section line
+ self.pos += 5
+ idx = self.stream().find('\n')
+ if idx == -1:
+ value = self.stream()
+ self.pos += len(value)
+ else:
+ value = self.stream()[:idx]
+ self.pos += idx + 1
+ block_dict = RawBlock('%url', values={'value': value})
+ return block_dict
+
+ block_dict = RawBlock(self.parse_block_name().upper())
+ self.parse_block_body(block_dict)
+ if self.stream()[0] != '}':
+ raise Exception("No closing bracket '}' found at the end of block")
+ self.pos += 1
+ return block_dict
+
+ def parse_parameter_value(self, raw_value: str) -> Any:
+ if raw_value.find(',') != -1:
+ return [self.parse_parameter_value(v.strip())
+ for v in raw_value.split(',')]
+ try:
+ return int(raw_value)
+ except ValueError:
+ if raw_value == "true":
+ return True
+ if raw_value == "false":
+ return False
+ if raw_value.find('"') == 0:
+ return raw_value[1:-1]
+ return raw_value
+
+ def parse_stanza(self, block_dict: RawBlock) -> None:
+ equal_idx = self.stream().find('=')
+ if equal_idx == -1:
+ raise Exception("Malformed stanza: no equal symbol found.")
+ semicolon_idx = self.stream().find(';')
+ parameter_name = self.stream()[:equal_idx].lower()
+ parameter_value = self.stream()[equal_idx + 1:semicolon_idx]
+ block_dict.values[parameter_name] = self.parse_parameter_value(parameter_value)
+ self.pos += semicolon_idx + 1
+
+ def parse_block_body(self, block_dict: RawBlock) -> None:
+ while True:
+ if self.stream().find('}') == 0:
+ # block end
+ return
+
+ last_pos = self.pos
+ semicolon_idx = self.stream().find(';')
+ lbracket_idx = self.stream().find('{')
+ is_semicolon = (semicolon_idx != -1)
+ is_lbracket = (lbracket_idx != -1)
+ is_semicolon_lt_lbracket = (semicolon_idx < lbracket_idx)
+
+ if is_semicolon and ((is_lbracket and is_semicolon_lt_lbracket) or not is_lbracket):
+ self.parse_stanza(block_dict)
+ elif is_lbracket and ((is_semicolon and not is_semicolon_lt_lbracket)
+ or (not is_semicolon)):
+ block_dict.blocks.append(self.parse_block_or_section())
+ else:
+ raise Exception("Malformed stanza: no semicolon found.")
+
+ if last_pos == self.pos:
+ raise Exception("Infinite loop while parsing block content")
+
+ def parse(self) -> List[RawBlock]:
+ blocks = []
+ while self.stream():
+ blocks.append(self.parse_block_or_section())
+ return blocks
+
+
+class FSAL(object):
+ def __init__(self, name: str) -> None:
+ self.name = name
+
+ @classmethod
+ def from_dict(cls, fsal_dict: Dict[str, Any]) -> 'FSAL':
+ if fsal_dict.get('name') == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ return CephFSFSAL.from_dict(fsal_dict)
+ if fsal_dict.get('name') == NFS_GANESHA_SUPPORTED_FSALS[1]:
+ return RGWFSAL.from_dict(fsal_dict)
+ raise NFSInvalidOperation(f'Unknown FSAL {fsal_dict.get("name")}')
+
+ @classmethod
+ def from_fsal_block(cls, fsal_block: RawBlock) -> 'FSAL':
+ if fsal_block.values.get('name') == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ return CephFSFSAL.from_fsal_block(fsal_block)
+ if fsal_block.values.get('name') == NFS_GANESHA_SUPPORTED_FSALS[1]:
+ return RGWFSAL.from_fsal_block(fsal_block)
+ raise NFSInvalidOperation(f'Unknown FSAL {fsal_block.values.get("name")}')
+
+ def to_fsal_block(self) -> RawBlock:
+ raise NotImplementedError
+
+ def to_dict(self) -> Dict[str, Any]:
+ raise NotImplementedError
+
+
+class CephFSFSAL(FSAL):
+ def __init__(self,
+ name: str,
+ user_id: Optional[str] = None,
+ fs_name: Optional[str] = None,
+ sec_label_xattr: Optional[str] = None,
+ cephx_key: Optional[str] = None) -> None:
+ super().__init__(name)
+ assert name == 'CEPH'
+ self.fs_name = fs_name
+ self.user_id = user_id
+ self.sec_label_xattr = sec_label_xattr
+ self.cephx_key = cephx_key
+
+ @classmethod
+ def from_fsal_block(cls, fsal_block: RawBlock) -> 'CephFSFSAL':
+ return cls(fsal_block.values['name'],
+ fsal_block.values.get('user_id'),
+ fsal_block.values.get('filesystem'),
+ fsal_block.values.get('sec_label_xattr'),
+ fsal_block.values.get('secret_access_key'))
+
+ def to_fsal_block(self) -> RawBlock:
+ result = RawBlock('FSAL', values={'name': self.name})
+
+ if self.user_id:
+ result.values['user_id'] = self.user_id
+ if self.fs_name:
+ result.values['filesystem'] = self.fs_name
+ if self.sec_label_xattr:
+ result.values['sec_label_xattr'] = self.sec_label_xattr
+ if self.cephx_key:
+ result.values['secret_access_key'] = self.cephx_key
+ return result
+
+ @classmethod
+ def from_dict(cls, fsal_dict: Dict[str, Any]) -> 'CephFSFSAL':
+ return cls(fsal_dict['name'],
+ fsal_dict.get('user_id'),
+ fsal_dict.get('fs_name'),
+ fsal_dict.get('sec_label_xattr'),
+ fsal_dict.get('cephx_key'))
+
+ def to_dict(self) -> Dict[str, str]:
+ r = {'name': self.name}
+ if self.user_id:
+ r['user_id'] = self.user_id
+ if self.fs_name:
+ r['fs_name'] = self.fs_name
+ if self.sec_label_xattr:
+ r['sec_label_xattr'] = self.sec_label_xattr
+ return r
+
+
+class RGWFSAL(FSAL):
+ def __init__(self,
+ name: str,
+ user_id: Optional[str] = None,
+ access_key_id: Optional[str] = None,
+ secret_access_key: Optional[str] = None
+ ) -> None:
+ super().__init__(name)
+ assert name == 'RGW'
+ # RGW user uid
+ self.user_id = user_id
+ # S3 credentials
+ self.access_key_id = access_key_id
+ self.secret_access_key = secret_access_key
+
+ @classmethod
+ def from_fsal_block(cls, fsal_block: RawBlock) -> 'RGWFSAL':
+ return cls(fsal_block.values['name'],
+ fsal_block.values.get('user_id'),
+ fsal_block.values.get('access_key_id'),
+ fsal_block.values.get('secret_access_key'))
+
+ def to_fsal_block(self) -> RawBlock:
+ result = RawBlock('FSAL', values={'name': self.name})
+
+ if self.user_id:
+ result.values['user_id'] = self.user_id
+ if self.access_key_id:
+ result.values['access_key_id'] = self.access_key_id
+ if self.secret_access_key:
+ result.values['secret_access_key'] = self.secret_access_key
+ return result
+
+ @classmethod
+ def from_dict(cls, fsal_dict: Dict[str, str]) -> 'RGWFSAL':
+ return cls(fsal_dict['name'],
+ fsal_dict.get('user_id'),
+ fsal_dict.get('access_key_id'),
+ fsal_dict.get('secret_access_key'))
+
+ def to_dict(self) -> Dict[str, str]:
+ r = {'name': self.name}
+ if self.user_id:
+ r['user_id'] = self.user_id
+ if self.access_key_id:
+ r['access_key_id'] = self.access_key_id
+ if self.secret_access_key:
+ r['secret_access_key'] = self.secret_access_key
+ return r
+
+
+class Client:
+ def __init__(self,
+ addresses: List[str],
+ access_type: str,
+ squash: str):
+ self.addresses = addresses
+ self.access_type = access_type
+ self.squash = squash
+
+ @classmethod
+ def from_client_block(cls, client_block: RawBlock) -> 'Client':
+ addresses = client_block.values.get('clients', [])
+ if isinstance(addresses, str):
+ addresses = [addresses]
+ return cls(addresses,
+ client_block.values.get('access_type', None),
+ client_block.values.get('squash', None))
+
+ def to_client_block(self) -> RawBlock:
+ result = RawBlock('CLIENT', values={'clients': self.addresses})
+ if self.access_type:
+ result.values['access_type'] = self.access_type
+ if self.squash:
+ result.values['squash'] = self.squash
+ return result
+
+ @classmethod
+ def from_dict(cls, client_dict: Dict[str, Any]) -> 'Client':
+ return cls(client_dict['addresses'], client_dict['access_type'],
+ client_dict['squash'])
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ 'addresses': self.addresses,
+ 'access_type': self.access_type,
+ 'squash': self.squash
+ }
+
+
+class Export:
+ def __init__(
+ self,
+ export_id: int,
+ path: str,
+ cluster_id: str,
+ pseudo: str,
+ access_type: str,
+ squash: str,
+ security_label: bool,
+ protocols: List[int],
+ transports: List[str],
+ fsal: FSAL,
+ clients: Optional[List[Client]] = None,
+ sectype: Optional[List[str]] = None) -> None:
+ self.export_id = export_id
+ self.path = path
+ self.fsal = fsal
+ self.cluster_id = cluster_id
+ self.pseudo = pseudo
+ self.access_type = access_type
+ self.squash = squash
+ self.attr_expiration_time = 0
+ self.security_label = security_label
+ self.protocols = protocols
+ self.transports = transports
+ self.clients: List[Client] = clients or []
+ self.sectype = sectype
+
+ @classmethod
+ def from_export_block(cls, export_block: RawBlock, cluster_id: str) -> 'Export':
+ fsal_blocks = [b for b in export_block.blocks
+ if b.block_name == "FSAL"]
+
+ client_blocks = [b for b in export_block.blocks
+ if b.block_name == "CLIENT"]
+
+ protocols = export_block.values.get('protocols')
+ if not isinstance(protocols, list):
+ protocols = [protocols]
+
+ transports = export_block.values.get('transports')
+ if isinstance(transports, str):
+ transports = [transports]
+ elif not transports:
+ transports = []
+
+ # if this module wrote the ganesha conf the param is camelcase
+ # "SecType". but for compatiblity with manually edited ganesha confs,
+ # accept "sectype" too.
+ sectype = (export_block.values.get("SecType")
+ or export_block.values.get("sectype") or None)
+ return cls(export_block.values['export_id'],
+ export_block.values['path'],
+ cluster_id,
+ export_block.values['pseudo'],
+ export_block.values.get('access_type', 'none'),
+ export_block.values.get('squash', 'no_root_squash'),
+ export_block.values.get('security_label', True),
+ protocols,
+ transports,
+ FSAL.from_fsal_block(fsal_blocks[0]),
+ [Client.from_client_block(client)
+ for client in client_blocks],
+ sectype=sectype)
+
+ def to_export_block(self) -> RawBlock:
+ values = {
+ 'export_id': self.export_id,
+ 'path': self.path,
+ 'pseudo': self.pseudo,
+ 'access_type': self.access_type,
+ 'squash': self.squash,
+ 'attr_expiration_time': self.attr_expiration_time,
+ 'security_label': self.security_label,
+ 'protocols': self.protocols,
+ 'transports': self.transports,
+ }
+ if self.sectype:
+ values['SecType'] = self.sectype
+ result = RawBlock("EXPORT", values=values)
+ result.blocks = [
+ self.fsal.to_fsal_block()
+ ] + [
+ client.to_client_block()
+ for client in self.clients
+ ]
+ return result
+
+ @classmethod
+ def from_dict(cls, export_id: int, ex_dict: Dict[str, Any]) -> 'Export':
+ return cls(export_id,
+ ex_dict.get('path', '/'),
+ ex_dict['cluster_id'],
+ ex_dict['pseudo'],
+ ex_dict.get('access_type', 'RO'),
+ ex_dict.get('squash', 'no_root_squash'),
+ ex_dict.get('security_label', True),
+ ex_dict.get('protocols', [4]),
+ ex_dict.get('transports', ['TCP']),
+ FSAL.from_dict(ex_dict.get('fsal', {})),
+ [Client.from_dict(client) for client in ex_dict.get('clients', [])],
+ sectype=ex_dict.get("sectype"))
+
+ def to_dict(self) -> Dict[str, Any]:
+ values = {
+ 'export_id': self.export_id,
+ 'path': self.path,
+ 'cluster_id': self.cluster_id,
+ 'pseudo': self.pseudo,
+ 'access_type': self.access_type,
+ 'squash': self.squash,
+ 'security_label': self.security_label,
+ 'protocols': sorted([p for p in self.protocols]),
+ 'transports': sorted([t for t in self.transports]),
+ 'fsal': self.fsal.to_dict(),
+ 'clients': [client.to_dict() for client in self.clients]
+ }
+ if self.sectype:
+ values['sectype'] = self.sectype
+ return values
+
+ def validate(self, mgr: 'Module') -> None:
+ if not isabs(self.pseudo) or self.pseudo == "/":
+ raise NFSInvalidOperation(
+ f"pseudo path {self.pseudo} is invalid. It should be an absolute "
+ "path and it cannot be just '/'."
+ )
+
+ _validate_squash(self.squash)
+ _validate_access_type(self.access_type)
+
+ if not isinstance(self.security_label, bool):
+ raise NFSInvalidOperation('security_label must be a boolean value')
+
+ for p in self.protocols:
+ if p not in [3, 4]:
+ raise NFSInvalidOperation(f"Invalid protocol {p}")
+
+ valid_transport = ["UDP", "TCP"]
+ for trans in self.transports:
+ if trans.upper() not in valid_transport:
+ raise NFSInvalidOperation(f'{trans} is not a valid transport protocol')
+
+ for client in self.clients:
+ if client.squash:
+ _validate_squash(client.squash)
+ if client.access_type:
+ _validate_access_type(client.access_type)
+
+ if self.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ fs = cast(CephFSFSAL, self.fsal)
+ if not fs.fs_name or not check_fs(mgr, fs.fs_name):
+ raise FSNotFound(fs.fs_name)
+ elif self.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]:
+ rgw = cast(RGWFSAL, self.fsal) # noqa
+ pass
+ else:
+ raise NFSInvalidOperation('FSAL {self.fsal.name} not supported')
+
+ for st in (self.sectype or []):
+ _validate_sec_type(st)
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, Export):
+ return False
+ return self.to_dict() == other.to_dict()
+
+
+def _format_block_body(block: RawBlock, depth: int = 0) -> str:
+ conf_str = ""
+ for blo in block.blocks:
+ conf_str += format_block(blo, depth)
+
+ for key, val in block.values.items():
+ if val is not None:
+ conf_str += _indentation(depth)
+ fval = _format_val(block.block_name, key, val)
+ conf_str += '{} = {};\n'.format(key, fval)
+ return conf_str
+
+
+def format_block(block: RawBlock, depth: int = 0) -> str:
+ """Format a raw block object into text suitable as a ganesha configuration
+ block.
+ """
+ if block.block_name == "%url":
+ return '%url "{}"\n\n'.format(block.values['value'])
+
+ conf_str = ""
+ conf_str += _indentation(depth)
+ conf_str += format(block.block_name)
+ conf_str += " {\n"
+ conf_str += _format_block_body(block, depth + 1)
+ conf_str += _indentation(depth)
+ conf_str += "}\n"
+ return conf_str
diff --git a/src/pybind/mgr/nfs/module.py b/src/pybind/mgr/nfs/module.py
new file mode 100644
index 000000000..a984500ee
--- /dev/null
+++ b/src/pybind/mgr/nfs/module.py
@@ -0,0 +1,189 @@
+import logging
+import threading
+from typing import Tuple, Optional, List, Dict, Any
+
+from mgr_module import MgrModule, CLICommand, Option, CLICheckNonemptyFileInput
+import object_format
+import orchestrator
+from orchestrator.module import IngressType
+
+from .export import ExportMgr, AppliedExportResults
+from .cluster import NFSCluster
+from .utils import available_clusters
+
+log = logging.getLogger(__name__)
+
+
+class Module(orchestrator.OrchestratorClientMixin, MgrModule):
+ MODULE_OPTIONS: List[Option] = []
+
+ def __init__(self, *args: str, **kwargs: Any) -> None:
+ self.inited = False
+ self.lock = threading.Lock()
+ super(Module, self).__init__(*args, **kwargs)
+ with self.lock:
+ self.export_mgr = ExportMgr(self)
+ self.nfs = NFSCluster(self)
+ self.inited = True
+
+ @CLICommand('nfs export create cephfs', perm='rw')
+ @object_format.Responder()
+ def _cmd_nfs_export_create_cephfs(
+ self,
+ cluster_id: str,
+ pseudo_path: str,
+ fsname: str,
+ path: Optional[str] = '/',
+ readonly: Optional[bool] = False,
+ client_addr: Optional[List[str]] = None,
+ squash: str = 'none',
+ sectype: Optional[List[str]] = None,
+ ) -> Dict[str, Any]:
+ """Create a CephFS export"""
+ return self.export_mgr.create_export(
+ fsal_type='cephfs',
+ fs_name=fsname,
+ cluster_id=cluster_id,
+ pseudo_path=pseudo_path,
+ read_only=readonly,
+ path=path,
+ squash=squash,
+ addr=client_addr,
+ sectype=sectype,
+ )
+
+ @CLICommand('nfs export create rgw', perm='rw')
+ @object_format.Responder()
+ def _cmd_nfs_export_create_rgw(
+ self,
+ cluster_id: str,
+ pseudo_path: str,
+ bucket: Optional[str] = None,
+ user_id: Optional[str] = None,
+ readonly: Optional[bool] = False,
+ client_addr: Optional[List[str]] = None,
+ squash: str = 'none',
+ sectype: Optional[List[str]] = None,
+ ) -> Dict[str, Any]:
+ """Create an RGW export"""
+ return self.export_mgr.create_export(
+ fsal_type='rgw',
+ bucket=bucket,
+ user_id=user_id,
+ cluster_id=cluster_id,
+ pseudo_path=pseudo_path,
+ read_only=readonly,
+ squash=squash,
+ addr=client_addr,
+ sectype=sectype,
+ )
+
+ @CLICommand('nfs export rm', perm='rw')
+ @object_format.EmptyResponder()
+ def _cmd_nfs_export_rm(self, cluster_id: str, pseudo_path: str) -> None:
+ """Remove a cephfs export"""
+ return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
+
+ @CLICommand('nfs export delete', perm='rw')
+ @object_format.EmptyResponder()
+ def _cmd_nfs_export_delete(self, cluster_id: str, pseudo_path: str) -> None:
+ """Delete a cephfs export (DEPRECATED)"""
+ return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
+
+ @CLICommand('nfs export ls', perm='r')
+ @object_format.Responder()
+ def _cmd_nfs_export_ls(self, cluster_id: str, detailed: bool = False) -> List[Any]:
+ """List exports of a NFS cluster"""
+ return self.export_mgr.list_exports(cluster_id=cluster_id, detailed=detailed)
+
+ @CLICommand('nfs export info', perm='r')
+ @object_format.Responder()
+ def _cmd_nfs_export_info(self, cluster_id: str, pseudo_path: str) -> Dict[str, Any]:
+ """Fetch a export of a NFS cluster given the pseudo path/binding"""
+ return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
+
+ @CLICommand('nfs export get', perm='r')
+ @object_format.Responder()
+ def _cmd_nfs_export_get(self, cluster_id: str, pseudo_path: str) -> Dict[str, Any]:
+ """Fetch a export of a NFS cluster given the pseudo path/binding (DEPRECATED)"""
+ return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
+
+ @CLICommand('nfs export apply', perm='rw')
+ @CLICheckNonemptyFileInput(desc='Export JSON or Ganesha EXPORT specification')
+ @object_format.Responder()
+ def _cmd_nfs_export_apply(self, cluster_id: str, inbuf: str) -> AppliedExportResults:
+ """Create or update an export by `-i <json_or_ganesha_export_file>`"""
+ return self.export_mgr.apply_export(cluster_id, export_config=inbuf)
+
+ @CLICommand('nfs cluster create', perm='rw')
+ @object_format.EmptyResponder()
+ def _cmd_nfs_cluster_create(self,
+ cluster_id: str,
+ placement: Optional[str] = None,
+ ingress: Optional[bool] = None,
+ virtual_ip: Optional[str] = None,
+ ingress_mode: Optional[IngressType] = None,
+ port: Optional[int] = None) -> None:
+ """Create an NFS Cluster"""
+ return self.nfs.create_nfs_cluster(cluster_id=cluster_id, placement=placement,
+ virtual_ip=virtual_ip, ingress=ingress,
+ ingress_mode=ingress_mode, port=port)
+
+ @CLICommand('nfs cluster rm', perm='rw')
+ @object_format.EmptyResponder()
+ def _cmd_nfs_cluster_rm(self, cluster_id: str) -> None:
+ """Removes an NFS Cluster"""
+ return self.nfs.delete_nfs_cluster(cluster_id=cluster_id)
+
+ @CLICommand('nfs cluster delete', perm='rw')
+ @object_format.EmptyResponder()
+ def _cmd_nfs_cluster_delete(self, cluster_id: str) -> None:
+ """Removes an NFS Cluster (DEPRECATED)"""
+ return self.nfs.delete_nfs_cluster(cluster_id=cluster_id)
+
+ @CLICommand('nfs cluster ls', perm='r')
+ @object_format.Responder()
+ def _cmd_nfs_cluster_ls(self) -> List[str]:
+ """List NFS Clusters"""
+ return self.nfs.list_nfs_cluster()
+
+ @CLICommand('nfs cluster info', perm='r')
+ @object_format.Responder()
+ def _cmd_nfs_cluster_info(self, cluster_id: Optional[str] = None) -> Dict[str, Any]:
+ """Displays NFS Cluster info"""
+ return self.nfs.show_nfs_cluster_info(cluster_id=cluster_id)
+
+ @CLICommand('nfs cluster config get', perm='r')
+ @object_format.ErrorResponseHandler()
+ def _cmd_nfs_cluster_config_get(self, cluster_id: str) -> Tuple[int, str, str]:
+ """Fetch NFS-Ganesha config"""
+ conf = self.nfs.get_nfs_cluster_config(cluster_id=cluster_id)
+ return 0, conf, ""
+
+ @CLICommand('nfs cluster config set', perm='rw')
+ @CLICheckNonemptyFileInput(desc='NFS-Ganesha Configuration')
+ @object_format.EmptyResponder()
+ def _cmd_nfs_cluster_config_set(self, cluster_id: str, inbuf: str) -> None:
+ """Set NFS-Ganesha config by `-i <config_file>`"""
+ return self.nfs.set_nfs_cluster_config(cluster_id=cluster_id, nfs_config=inbuf)
+
+ @CLICommand('nfs cluster config reset', perm='rw')
+ @object_format.EmptyResponder()
+ def _cmd_nfs_cluster_config_reset(self, cluster_id: str) -> None:
+ """Reset NFS-Ganesha Config to default"""
+ return self.nfs.reset_nfs_cluster_config(cluster_id=cluster_id)
+
+ def fetch_nfs_export_obj(self) -> ExportMgr:
+ return self.export_mgr
+
+ def export_ls(self) -> List[Dict[Any, Any]]:
+ return self.export_mgr.list_all_exports()
+
+ def export_get(self, cluster_id: str, export_id: int) -> Optional[Dict[str, Any]]:
+ return self.export_mgr.get_export_by_id(cluster_id, export_id)
+
+ def export_rm(self, cluster_id: str, pseudo: str) -> None:
+ self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo)
+
+ def cluster_ls(self) -> List[str]:
+ return available_clusters(self)
diff --git a/src/pybind/mgr/nfs/tests/__init__.py b/src/pybind/mgr/nfs/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/nfs/tests/__init__.py
diff --git a/src/pybind/mgr/nfs/tests/test_nfs.py b/src/pybind/mgr/nfs/tests/test_nfs.py
new file mode 100644
index 000000000..5b4d5fe7e
--- /dev/null
+++ b/src/pybind/mgr/nfs/tests/test_nfs.py
@@ -0,0 +1,1156 @@
+# flake8: noqa
+import json
+import pytest
+from typing import Optional, Tuple, Iterator, List, Any
+
+from contextlib import contextmanager
+from unittest import mock
+from unittest.mock import MagicMock
+from mgr_module import MgrModule, NFS_POOL_NAME
+
+from rados import ObjectNotFound
+
+from ceph.deployment.service_spec import NFSServiceSpec
+from nfs import Module
+from nfs.export import ExportMgr, normalize_path
+from nfs.ganesha_conf import GaneshaConfParser, Export, RawBlock
+from nfs.cluster import NFSCluster
+from orchestrator import ServiceDescription, DaemonDescription, OrchResult
+
+
+class TestNFS:
+ cluster_id = "foo"
+ export_1 = """
+EXPORT {
+ Export_ID=1;
+ Protocols = 4;
+ Path = /;
+ Pseudo = /cephfs_a/;
+ Access_Type = RW;
+ Protocols = 4;
+ Attr_Expiration_Time = 0;
+ # Squash = root;
+
+ FSAL {
+ Name = CEPH;
+ Filesystem = "a";
+ User_Id = "ganesha";
+ # Secret_Access_Key = "YOUR SECRET KEY HERE";
+ }
+
+ CLIENT
+ {
+ Clients = 192.168.0.10, 192.168.1.0/8;
+ Squash = None;
+ }
+
+ CLIENT
+ {
+ Clients = 192.168.0.0/16;
+ Squash = All;
+ Access_Type = RO;
+ }
+}
+"""
+
+ export_2 = """
+EXPORT
+{
+ Export_ID=2;
+ Path = "/";
+ Pseudo = "/rgw";
+ Access_Type = RW;
+ squash = AllAnonymous;
+ Protocols = 4, 3;
+ Transports = TCP, UDP;
+
+ FSAL {
+ Name = RGW;
+ User_Id = "nfs.foo.bucket";
+ Access_Key_Id ="the_access_key";
+ Secret_Access_Key = "the_secret_key";
+ }
+}
+"""
+ export_3 = """
+EXPORT {
+ FSAL {
+ name = "CEPH";
+ user_id = "nfs.foo.1";
+ filesystem = "a";
+ secret_access_key = "AQCjU+hgjyReLBAAddJa0Dza/ZHqjX5+JiePMA==";
+ }
+ export_id = 1;
+ path = "/";
+ pseudo = "/a";
+ access_type = "RW";
+ squash = "none";
+ attr_expiration_time = 0;
+ security_label = true;
+ protocols = 4;
+ transports = "TCP";
+}
+"""
+ export_4 = """
+EXPORT {
+ FSAL {
+ name = "CEPH";
+ user_id = "nfs.foo.1";
+ filesystem = "a";
+ secret_access_key = "AQCjU+hgjyReLBAAddJa0Dza/ZHqjX5+JiePMA==";
+ }
+ export_id = 1;
+ path = "/secure/me";
+ pseudo = "/secure1";
+ access_type = "RW";
+ squash = "no_root_squash";
+ SecType = "krb5p", "krb5i";
+ attr_expiration_time = 0;
+ security_label = true;
+ protocols = 4;
+ transports = "TCP";
+}
+"""
+
+ conf_nfs_foo = f'''
+%url "rados://{NFS_POOL_NAME}/{cluster_id}/export-1"
+
+%url "rados://{NFS_POOL_NAME}/{cluster_id}/export-2"'''
+
+ class RObject(object):
+ def __init__(self, key: str, raw: str) -> None:
+ self.key = key
+ self.raw = raw
+
+ def read(self, _: Optional[int]) -> bytes:
+ return self.raw.encode('utf-8')
+
+ def stat(self) -> Tuple[int, None]:
+ return len(self.raw), None
+
+ def _ioctx_write_full_mock(self, key: str, content: bytes) -> None:
+ if key not in self.temp_store[self.temp_store_namespace]:
+ self.temp_store[self.temp_store_namespace][key] = \
+ TestNFS.RObject(key, content.decode('utf-8'))
+ else:
+ self.temp_store[self.temp_store_namespace][key].raw = content.decode('utf-8')
+
+ def _ioctx_remove_mock(self, key: str) -> None:
+ del self.temp_store[self.temp_store_namespace][key]
+
+ def _ioctx_list_objects_mock(self) -> List['TestNFS.RObject']:
+ r = [obj for _, obj in self.temp_store[self.temp_store_namespace].items()]
+ return r
+
+ def _ioctl_stat_mock(self, key):
+ return self.temp_store[self.temp_store_namespace][key].stat()
+
+ def _ioctl_read_mock(self, key: str, size: Optional[Any] = None) -> bytes:
+ if key not in self.temp_store[self.temp_store_namespace]:
+ raise ObjectNotFound
+ return self.temp_store[self.temp_store_namespace][key].read(size)
+
+ def _ioctx_set_namespace_mock(self, namespace: str) -> None:
+ self.temp_store_namespace = namespace
+
+ def _reset_temp_store(self) -> None:
+ self.temp_store_namespace = None
+ self.temp_store = {
+ 'foo': {
+ 'export-1': TestNFS.RObject("export-1", self.export_1),
+ 'export-2': TestNFS.RObject("export-2", self.export_2),
+ 'conf-nfs.foo': TestNFS.RObject("conf-nfs.foo", self.conf_nfs_foo)
+ }
+ }
+
+ @contextmanager
+ def _mock_orchestrator(self, enable: bool) -> Iterator:
+ self.io_mock = MagicMock()
+ self.io_mock.set_namespace.side_effect = self._ioctx_set_namespace_mock
+ self.io_mock.read = self._ioctl_read_mock
+ self.io_mock.stat = self._ioctl_stat_mock
+ self.io_mock.list_objects.side_effect = self._ioctx_list_objects_mock
+ self.io_mock.write_full.side_effect = self._ioctx_write_full_mock
+ self.io_mock.remove_object.side_effect = self._ioctx_remove_mock
+
+ # mock nfs services
+ orch_nfs_services = [
+ ServiceDescription(spec=NFSServiceSpec(service_id=self.cluster_id))
+ ] if enable else []
+
+ orch_nfs_daemons = [
+ DaemonDescription('nfs', 'foo.mydaemon', 'myhostname')
+ ] if enable else []
+
+ def mock_exec(cls, args):
+ if args[1:3] == ['bucket', 'stats']:
+ bucket_info = {
+ "owner": "bucket_owner_user",
+ }
+ return 0, json.dumps(bucket_info), ''
+ u = {
+ "user_id": "abc",
+ "display_name": "foo",
+ "email": "",
+ "suspended": 0,
+ "max_buckets": 1000,
+ "subusers": [],
+ "keys": [
+ {
+ "user": "abc",
+ "access_key": "the_access_key",
+ "secret_key": "the_secret_key"
+ }
+ ],
+ "swift_keys": [],
+ "caps": [],
+ "op_mask": "read, write, delete",
+ "default_placement": "",
+ "default_storage_class": "",
+ "placement_tags": [],
+ "bucket_quota": {
+ "enabled": False,
+ "check_on_raw": False,
+ "max_size": -1,
+ "max_size_kb": 0,
+ "max_objects": -1
+ },
+ "user_quota": {
+ "enabled": False,
+ "check_on_raw": False,
+ "max_size": -1,
+ "max_size_kb": 0,
+ "max_objects": -1
+ },
+ "temp_url_keys": [],
+ "type": "rgw",
+ "mfa_ids": []
+ }
+ if args[2] == 'list':
+ return 0, json.dumps([u]), ''
+ return 0, json.dumps(u), ''
+
+ def mock_describe_service(cls, *args, **kwargs):
+ if kwargs['service_type'] == 'nfs':
+ return OrchResult(orch_nfs_services)
+ return OrchResult([])
+
+ def mock_list_daemons(cls, *args, **kwargs):
+ if kwargs['daemon_type'] == 'nfs':
+ return OrchResult(orch_nfs_daemons)
+ return OrchResult([])
+
+ with mock.patch('nfs.module.Module.describe_service', mock_describe_service) as describe_service, \
+ mock.patch('nfs.module.Module.list_daemons', mock_list_daemons) as list_daemons, \
+ mock.patch('nfs.module.Module.rados') as rados, \
+ mock.patch('nfs.export.available_clusters',
+ return_value=[self.cluster_id]), \
+ mock.patch('nfs.export.restart_nfs_service'), \
+ mock.patch('nfs.cluster.restart_nfs_service'), \
+ mock.patch.object(MgrModule, 'tool_exec', mock_exec), \
+ mock.patch('nfs.export.check_fs', return_value=True), \
+ mock.patch('nfs.ganesha_conf.check_fs', return_value=True), \
+ mock.patch('nfs.export.ExportMgr._create_user_key',
+ return_value='thekeyforclientabc'), \
+ mock.patch('nfs.export.cephfs_path_is_dir'):
+
+ rados.open_ioctx.return_value.__enter__.return_value = self.io_mock
+ rados.open_ioctx.return_value.__exit__ = mock.Mock(return_value=None)
+
+ self._reset_temp_store()
+
+ yield
+
+ def test_parse_daemon_raw_config(self) -> None:
+ expected_daemon_config = [
+ RawBlock('NFS_CORE_PARAM', values={
+ "enable_nlm": False,
+ "enable_rquota": False,
+ "protocols": 4,
+ "nfs_port": 14000
+ }),
+ RawBlock('MDCACHE', values={
+ "dir_chunk": 0
+ }),
+ RawBlock('NFSV4', values={
+ "recoverybackend": "rados_cluster",
+ "minor_versions": [1, 2]
+ }),
+ RawBlock('RADOS_KV', values={
+ "pool": NFS_POOL_NAME,
+ "namespace": "vstart",
+ "userid": "vstart",
+ "nodeid": "a"
+ }),
+ RawBlock('RADOS_URLS', values={
+ "userid": "vstart",
+ "watch_url": f"'rados://{NFS_POOL_NAME}/vstart/conf-nfs.vstart'"
+ }),
+ RawBlock('%url', values={
+ "value": f"rados://{NFS_POOL_NAME}/vstart/conf-nfs.vstart"
+ })
+ ]
+ daemon_raw_config = """
+NFS_CORE_PARAM {
+ Enable_NLM = false;
+ Enable_RQUOTA = false;
+ Protocols = 4;
+ NFS_Port = 14000;
+ }
+
+ MDCACHE {
+ Dir_Chunk = 0;
+ }
+
+ NFSv4 {
+ RecoveryBackend = rados_cluster;
+ Minor_Versions = 1, 2;
+ }
+
+ RADOS_KV {
+ pool = {};
+ namespace = vstart;
+ UserId = vstart;
+ nodeid = a;
+ }
+
+ RADOS_URLS {
+ Userid = vstart;
+ watch_url = 'rados://{}/vstart/conf-nfs.vstart';
+ }
+
+ %url rados://{}/vstart/conf-nfs.vstart
+""".replace('{}', NFS_POOL_NAME)
+ daemon_config = GaneshaConfParser(daemon_raw_config).parse()
+ assert daemon_config == expected_daemon_config
+
+ def _validate_export_1(self, export: Export):
+ assert export.export_id == 1
+ assert export.path == "/"
+ assert export.pseudo == "/cephfs_a/"
+ assert export.access_type == "RW"
+ # assert export.squash == "root_squash" # probably correct value
+ assert export.squash == "no_root_squash"
+ assert export.protocols == [4]
+ # assert export.transports == {"TCP", "UDP"}
+ assert export.fsal.name == "CEPH"
+ assert export.fsal.user_id == "ganesha"
+ assert export.fsal.fs_name == "a"
+ assert export.fsal.sec_label_xattr == None
+ assert len(export.clients) == 2
+ assert export.clients[0].addresses == \
+ ["192.168.0.10", "192.168.1.0/8"]
+ # assert export.clients[0].squash == "no_root_squash" # probably correct value
+ assert export.clients[0].squash == "None"
+ assert export.clients[0].access_type is None
+ assert export.clients[1].addresses == ["192.168.0.0/16"]
+ # assert export.clients[1].squash == "all_squash" # probably correct value
+ assert export.clients[1].squash == "All"
+ assert export.clients[1].access_type == "RO"
+ assert export.cluster_id == 'foo'
+ assert export.attr_expiration_time == 0
+ # assert export.security_label == False # probably correct value
+ assert export.security_label == True
+
+ def test_export_parser_1(self) -> None:
+ blocks = GaneshaConfParser(self.export_1).parse()
+ assert isinstance(blocks, list)
+ assert len(blocks) == 1
+ export = Export.from_export_block(blocks[0], self.cluster_id)
+ self._validate_export_1(export)
+
+ def _validate_export_2(self, export: Export):
+ assert export.export_id == 2
+ assert export.path == "/"
+ assert export.pseudo == "/rgw"
+ assert export.access_type == "RW"
+ # assert export.squash == "all_squash" # probably correct value
+ assert export.squash == "AllAnonymous"
+ assert export.protocols == [4, 3]
+ assert set(export.transports) == {"TCP", "UDP"}
+ assert export.fsal.name == "RGW"
+ assert export.fsal.user_id == "nfs.foo.bucket"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 0
+ assert export.cluster_id == 'foo'
+
+ def test_export_parser_2(self) -> None:
+ blocks = GaneshaConfParser(self.export_2).parse()
+ assert isinstance(blocks, list)
+ assert len(blocks) == 1
+ export = Export.from_export_block(blocks[0], self.cluster_id)
+ self._validate_export_2(export)
+
+ def test_daemon_conf_parser(self) -> None:
+ blocks = GaneshaConfParser(self.conf_nfs_foo).parse()
+ assert isinstance(blocks, list)
+ assert len(blocks) == 2
+ assert blocks[0].block_name == "%url"
+ assert blocks[0].values['value'] == f"rados://{NFS_POOL_NAME}/{self.cluster_id}/export-1"
+ assert blocks[1].block_name == "%url"
+ assert blocks[1].values['value'] == f"rados://{NFS_POOL_NAME}/{self.cluster_id}/export-2"
+
+ def _do_mock_test(self, func, *args) -> None:
+ with self._mock_orchestrator(True):
+ func(*args)
+ self._reset_temp_store()
+
+ def test_ganesha_conf(self) -> None:
+ self._do_mock_test(self._do_test_ganesha_conf)
+
+ def _do_test_ganesha_conf(self) -> None:
+ nfs_mod = Module('nfs', '', '')
+ ganesha_conf = ExportMgr(nfs_mod)
+ exports = ganesha_conf.exports[self.cluster_id]
+
+ assert len(exports) == 2
+
+ self._validate_export_1([e for e in exports if e.export_id == 1][0])
+ self._validate_export_2([e for e in exports if e.export_id == 2][0])
+
+ def test_config_dict(self) -> None:
+ self._do_mock_test(self._do_test_config_dict)
+
+ def _do_test_config_dict(self) -> None:
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ export = [e for e in conf.exports['foo'] if e.export_id == 1][0]
+ ex_dict = export.to_dict()
+
+ assert ex_dict == {'access_type': 'RW',
+ 'clients': [{'access_type': None,
+ 'addresses': ['192.168.0.10', '192.168.1.0/8'],
+ 'squash': 'None'},
+ {'access_type': 'RO',
+ 'addresses': ['192.168.0.0/16'],
+ 'squash': 'All'}],
+ 'cluster_id': self.cluster_id,
+ 'export_id': 1,
+ 'fsal': {'fs_name': 'a', 'name': 'CEPH', 'user_id': 'ganesha'},
+ 'path': '/',
+ 'protocols': [4],
+ 'pseudo': '/cephfs_a/',
+ 'security_label': True,
+ 'squash': 'no_root_squash',
+ 'transports': []}
+
+ export = [e for e in conf.exports['foo'] if e.export_id == 2][0]
+ ex_dict = export.to_dict()
+ assert ex_dict == {'access_type': 'RW',
+ 'clients': [],
+ 'cluster_id': self.cluster_id,
+ 'export_id': 2,
+ 'fsal': {'name': 'RGW',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ 'user_id': 'nfs.foo.bucket'},
+ 'path': '/',
+ 'protocols': [3, 4],
+ 'pseudo': '/rgw',
+ 'security_label': True,
+ 'squash': 'AllAnonymous',
+ 'transports': ['TCP', 'UDP']}
+
+ def test_config_from_dict(self) -> None:
+ self._do_mock_test(self._do_test_config_from_dict)
+
+ def _do_test_config_from_dict(self) -> None:
+ export = Export.from_dict(1, {
+ 'export_id': 1,
+ 'path': '/',
+ 'cluster_id': self.cluster_id,
+ 'pseudo': '/cephfs_a',
+ 'access_type': 'RW',
+ 'squash': 'root_squash',
+ 'security_label': True,
+ 'protocols': [4],
+ 'transports': ['TCP', 'UDP'],
+ 'clients': [{
+ 'addresses': ["192.168.0.10", "192.168.1.0/8"],
+ 'access_type': None,
+ 'squash': 'no_root_squash'
+ }, {
+ 'addresses': ["192.168.0.0/16"],
+ 'access_type': 'RO',
+ 'squash': 'all_squash'
+ }],
+ 'fsal': {
+ 'name': 'CEPH',
+ 'user_id': 'ganesha',
+ 'fs_name': 'a',
+ 'sec_label_xattr': 'security.selinux'
+ }
+ })
+
+ assert export.export_id == 1
+ assert export.path == "/"
+ assert export.pseudo == "/cephfs_a"
+ assert export.access_type == "RW"
+ assert export.squash == "root_squash"
+ assert set(export.protocols) == {4}
+ assert set(export.transports) == {"TCP", "UDP"}
+ assert export.fsal.name == "CEPH"
+ assert export.fsal.user_id == "ganesha"
+ assert export.fsal.fs_name == "a"
+ assert export.fsal.sec_label_xattr == 'security.selinux'
+ assert len(export.clients) == 2
+ assert export.clients[0].addresses == \
+ ["192.168.0.10", "192.168.1.0/8"]
+ assert export.clients[0].squash == "no_root_squash"
+ assert export.clients[0].access_type is None
+ assert export.clients[1].addresses == ["192.168.0.0/16"]
+ assert export.clients[1].squash == "all_squash"
+ assert export.clients[1].access_type == "RO"
+ assert export.cluster_id == self.cluster_id
+ assert export.attr_expiration_time == 0
+ assert export.security_label
+
+ export = Export.from_dict(2, {
+ 'export_id': 2,
+ 'path': 'bucket',
+ 'pseudo': '/rgw',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RW',
+ 'squash': 'all_squash',
+ 'security_label': False,
+ 'protocols': [4, 3],
+ 'transports': ['TCP', 'UDP'],
+ 'clients': [],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'rgw.foo.bucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key'
+ }
+ })
+
+ assert export.export_id == 2
+ assert export.path == "bucket"
+ assert export.pseudo == "/rgw"
+ assert export.access_type == "RW"
+ assert export.squash == "all_squash"
+ assert set(export.protocols) == {4, 3}
+ assert set(export.transports) == {"TCP", "UDP"}
+ assert export.fsal.name == "RGW"
+ assert export.fsal.user_id == "rgw.foo.bucket"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 0
+ assert export.cluster_id == self.cluster_id
+
+ @pytest.mark.parametrize(
+ "block",
+ [
+ export_1,
+ export_2,
+ ]
+ )
+ def test_export_from_to_export_block(self, block):
+ blocks = GaneshaConfParser(block).parse()
+ export = Export.from_export_block(blocks[0], self.cluster_id)
+ newblock = export.to_export_block()
+ export2 = Export.from_export_block(newblock, self.cluster_id)
+ newblock2 = export2.to_export_block()
+ assert newblock == newblock2
+
+ @pytest.mark.parametrize(
+ "block",
+ [
+ export_1,
+ export_2,
+ ]
+ )
+ def test_export_from_to_dict(self, block):
+ blocks = GaneshaConfParser(block).parse()
+ export = Export.from_export_block(blocks[0], self.cluster_id)
+ j = export.to_dict()
+ export2 = Export.from_dict(j['export_id'], j)
+ j2 = export2.to_dict()
+ assert j == j2
+
+ @pytest.mark.parametrize(
+ "block",
+ [
+ export_1,
+ export_2,
+ ]
+ )
+ def test_export_validate(self, block):
+ blocks = GaneshaConfParser(block).parse()
+ export = Export.from_export_block(blocks[0], self.cluster_id)
+ nfs_mod = Module('nfs', '', '')
+ with mock.patch('nfs.ganesha_conf.check_fs', return_value=True):
+ export.validate(nfs_mod)
+
+ def test_update_export(self):
+ self._do_mock_test(self._do_test_update_export)
+
+ def _do_test_update_export(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ r = conf.apply_export(self.cluster_id, json.dumps({
+ 'export_id': 2,
+ 'path': 'bucket',
+ 'pseudo': '/rgw/bucket',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RW',
+ 'squash': 'all_squash',
+ 'security_label': False,
+ 'protocols': [4, 3],
+ 'transports': ['TCP', 'UDP'],
+ 'clients': [{
+ 'addresses': ["192.168.0.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.bucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ }))
+ assert len(r.changes) == 1
+
+ export = conf._fetch_export('foo', '/rgw/bucket')
+ assert export.export_id == 2
+ assert export.path == "bucket"
+ assert export.pseudo == "/rgw/bucket"
+ assert export.access_type == "RW"
+ assert export.squash == "all_squash"
+ assert export.protocols == [4, 3]
+ assert export.transports == ["TCP", "UDP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash is None
+ assert export.clients[0].access_type is None
+ assert export.cluster_id == self.cluster_id
+
+ # do it again, with changes
+ r = conf.apply_export(self.cluster_id, json.dumps({
+ 'export_id': 2,
+ 'path': 'newbucket',
+ 'pseudo': '/rgw/bucket',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RO',
+ 'squash': 'root',
+ 'security_label': False,
+ 'protocols': [4],
+ 'transports': ['TCP'],
+ 'clients': [{
+ 'addresses': ["192.168.10.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.newbucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ }))
+ assert len(r.changes) == 1
+
+ export = conf._fetch_export('foo', '/rgw/bucket')
+ assert export.export_id == 2
+ assert export.path == "newbucket"
+ assert export.pseudo == "/rgw/bucket"
+ assert export.access_type == "RO"
+ assert export.squash == "root"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash is None
+ assert export.clients[0].access_type is None
+ assert export.cluster_id == self.cluster_id
+
+ # again, but without export_id
+ r = conf.apply_export(self.cluster_id, json.dumps({
+ 'path': 'newestbucket',
+ 'pseudo': '/rgw/bucket',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RW',
+ 'squash': 'root',
+ 'security_label': False,
+ 'protocols': [4],
+ 'transports': ['TCP'],
+ 'clients': [{
+ 'addresses': ["192.168.10.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.newestbucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ }))
+ assert len(r.changes) == 1
+
+ export = conf._fetch_export(self.cluster_id, '/rgw/bucket')
+ assert export.export_id == 2
+ assert export.path == "newestbucket"
+ assert export.pseudo == "/rgw/bucket"
+ assert export.access_type == "RW"
+ assert export.squash == "root"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash is None
+ assert export.clients[0].access_type is None
+ assert export.cluster_id == self.cluster_id
+
+ def test_update_export_sectype(self):
+ self._do_mock_test(self._test_update_export_sectype)
+
+ def _test_update_export_sectype(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ r = conf.apply_export(self.cluster_id, json.dumps({
+ 'export_id': 2,
+ 'path': 'bucket',
+ 'pseudo': '/rgw/bucket',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RW',
+ 'squash': 'all_squash',
+ 'security_label': False,
+ 'protocols': [4, 3],
+ 'transports': ['TCP', 'UDP'],
+ 'clients': [{
+ 'addresses': ["192.168.0.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.bucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ }))
+ assert len(r.changes) == 1
+
+ # no sectype was given, key not present
+ info = conf._get_export_dict(self.cluster_id, "/rgw/bucket")
+ assert info["export_id"] == 2
+ assert info["path"] == "bucket"
+ assert "sectype" not in info
+
+ r = conf.apply_export(self.cluster_id, json.dumps({
+ 'export_id': 2,
+ 'path': 'bucket',
+ 'pseudo': '/rgw/bucket',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RW',
+ 'squash': 'all_squash',
+ 'security_label': False,
+ 'protocols': [4, 3],
+ 'transports': ['TCP', 'UDP'],
+ 'clients': [{
+ 'addresses': ["192.168.0.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'sectype': ["krb5p", "krb5i", "sys"],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.bucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ }))
+ assert len(r.changes) == 1
+
+ # assert sectype matches new value(s)
+ info = conf._get_export_dict(self.cluster_id, "/rgw/bucket")
+ assert info["export_id"] == 2
+ assert info["path"] == "bucket"
+ assert info["sectype"] == ["krb5p", "krb5i", "sys"]
+
+ def test_update_export_with_ganesha_conf(self):
+ self._do_mock_test(self._do_test_update_export_with_ganesha_conf)
+
+ def _do_test_update_export_with_ganesha_conf(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ r = conf.apply_export(self.cluster_id, self.export_3)
+ assert len(r.changes) == 1
+
+ def test_update_export_with_ganesha_conf_sectype(self):
+ self._do_mock_test(
+ self._do_test_update_export_with_ganesha_conf_sectype,
+ self.export_4, ["krb5p", "krb5i"])
+
+ def test_update_export_with_ganesha_conf_sectype_lcase(self):
+ export_conf = self.export_4.replace("SecType", "sectype").replace("krb5i", "sys")
+ self._do_mock_test(
+ self._do_test_update_export_with_ganesha_conf_sectype,
+ export_conf, ["krb5p", "sys"])
+
+ def _do_test_update_export_with_ganesha_conf_sectype(self, export_conf, expect_sectype):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ r = conf.apply_export(self.cluster_id, export_conf)
+ assert len(r.changes) == 1
+
+ # assert sectype matches new value(s)
+ info = conf._get_export_dict(self.cluster_id, "/secure1")
+ assert info["export_id"] == 1
+ assert info["path"] == "/secure/me"
+ assert info["sectype"] == expect_sectype
+
+ def test_update_export_with_list(self):
+ self._do_mock_test(self._do_test_update_export_with_list)
+
+ def _do_test_update_export_with_list(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ r = conf.apply_export(self.cluster_id, json.dumps([
+ {
+ 'path': 'bucket',
+ 'pseudo': '/rgw/bucket',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RW',
+ 'squash': 'root',
+ 'security_label': False,
+ 'protocols': [4],
+ 'transports': ['TCP'],
+ 'clients': [{
+ 'addresses': ["192.168.0.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.bucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ },
+ {
+ 'path': 'bucket2',
+ 'pseudo': '/rgw/bucket2',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RO',
+ 'squash': 'root',
+ 'security_label': False,
+ 'protocols': [4],
+ 'transports': ['TCP'],
+ 'clients': [{
+ 'addresses': ["192.168.0.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.bucket2',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ },
+ ]))
+ # The input object above contains TWO items (two different pseudo paths)
+ # therefore we expect the result to report that two changes have been
+ # applied, rather than the typical 1 change.
+ assert len(r.changes) == 2
+
+ export = conf._fetch_export('foo', '/rgw/bucket')
+ assert export.export_id == 3
+ assert export.path == "bucket"
+ assert export.pseudo == "/rgw/bucket"
+ assert export.access_type == "RW"
+ assert export.squash == "root"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash is None
+ assert export.clients[0].access_type is None
+ assert export.cluster_id == self.cluster_id
+
+ export = conf._fetch_export('foo', '/rgw/bucket2')
+ assert export.export_id == 4
+ assert export.path == "bucket2"
+ assert export.pseudo == "/rgw/bucket2"
+ assert export.access_type == "RO"
+ assert export.squash == "root"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash is None
+ assert export.clients[0].access_type is None
+ assert export.cluster_id == self.cluster_id
+
+ def test_remove_export(self) -> None:
+ self._do_mock_test(self._do_test_remove_export)
+
+ def _do_test_remove_export(self) -> None:
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ assert len(conf.exports[self.cluster_id]) == 2
+ conf.delete_export(cluster_id=self.cluster_id,
+ pseudo_path="/rgw")
+ exports = conf.exports[self.cluster_id]
+ assert len(exports) == 1
+ assert exports[0].export_id == 1
+
+ def test_create_export_rgw_bucket(self):
+ self._do_mock_test(self._do_test_create_export_rgw_bucket)
+
+ def _do_test_create_export_rgw_bucket(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+
+ ls = conf.list_exports(cluster_id=self.cluster_id)
+ assert len(ls) == 2
+
+ r = conf.create_export(
+ fsal_type='rgw',
+ cluster_id=self.cluster_id,
+ bucket='bucket',
+ pseudo_path='/mybucket',
+ read_only=False,
+ squash='root',
+ addr=["192.168.0.0/16"]
+ )
+ assert r["bind"] == "/mybucket"
+
+ ls = conf.list_exports(cluster_id=self.cluster_id)
+ assert len(ls) == 3
+
+ export = conf._fetch_export('foo', '/mybucket')
+ assert export.export_id
+ assert export.path == "bucket"
+ assert export.pseudo == "/mybucket"
+ assert export.access_type == "none"
+ assert export.squash == "none"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.user_id == "bucket_owner_user"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash == 'root'
+ assert export.clients[0].access_type == 'rw'
+ assert export.clients[0].addresses == ["192.168.0.0/16"]
+ assert export.cluster_id == self.cluster_id
+
+ def test_create_export_rgw_bucket_user(self):
+ self._do_mock_test(self._do_test_create_export_rgw_bucket_user)
+
+ def _do_test_create_export_rgw_bucket_user(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+
+ ls = conf.list_exports(cluster_id=self.cluster_id)
+ assert len(ls) == 2
+
+ r = conf.create_export(
+ fsal_type='rgw',
+ cluster_id=self.cluster_id,
+ bucket='bucket',
+ user_id='other_user',
+ pseudo_path='/mybucket',
+ read_only=False,
+ squash='root',
+ addr=["192.168.0.0/16"]
+ )
+ assert r["bind"] == "/mybucket"
+
+ ls = conf.list_exports(cluster_id=self.cluster_id)
+ assert len(ls) == 3
+
+ export = conf._fetch_export('foo', '/mybucket')
+ assert export.export_id
+ assert export.path == "bucket"
+ assert export.pseudo == "/mybucket"
+ assert export.access_type == "none"
+ assert export.squash == "none"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash == 'root'
+ assert export.fsal.user_id == "other_user"
+ assert export.clients[0].access_type == 'rw'
+ assert export.clients[0].addresses == ["192.168.0.0/16"]
+ assert export.cluster_id == self.cluster_id
+
+ def test_create_export_rgw_user(self):
+ self._do_mock_test(self._do_test_create_export_rgw_user)
+
+ def _do_test_create_export_rgw_user(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+
+ ls = conf.list_exports(cluster_id=self.cluster_id)
+ assert len(ls) == 2
+
+ r = conf.create_export(
+ fsal_type='rgw',
+ cluster_id=self.cluster_id,
+ user_id='some_user',
+ pseudo_path='/mybucket',
+ read_only=False,
+ squash='root',
+ addr=["192.168.0.0/16"]
+ )
+ assert r["bind"] == "/mybucket"
+
+ ls = conf.list_exports(cluster_id=self.cluster_id)
+ assert len(ls) == 3
+
+ export = conf._fetch_export('foo', '/mybucket')
+ assert export.export_id
+ assert export.path == "/"
+ assert export.pseudo == "/mybucket"
+ assert export.access_type == "none"
+ assert export.squash == "none"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash == 'root'
+ assert export.fsal.user_id == "some_user"
+ assert export.clients[0].access_type == 'rw'
+ assert export.clients[0].addresses == ["192.168.0.0/16"]
+ assert export.cluster_id == self.cluster_id
+
+ def test_create_export_cephfs(self):
+ self._do_mock_test(self._do_test_create_export_cephfs)
+
+ def _do_test_create_export_cephfs(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+
+ ls = conf.list_exports(cluster_id=self.cluster_id)
+ assert len(ls) == 2
+
+ r = conf.create_export(
+ fsal_type='cephfs',
+ cluster_id=self.cluster_id,
+ fs_name='myfs',
+ path='/',
+ pseudo_path='/cephfs2',
+ read_only=False,
+ squash='root',
+ addr=["192.168.1.0/8"],
+ )
+ assert r["bind"] == "/cephfs2"
+
+ ls = conf.list_exports(cluster_id=self.cluster_id)
+ assert len(ls) == 3
+
+ export = conf._fetch_export('foo', '/cephfs2')
+ assert export.export_id
+ assert export.path == "/"
+ assert export.pseudo == "/cephfs2"
+ assert export.access_type == "none"
+ assert export.squash == "none"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "CEPH"
+ assert export.fsal.user_id == "nfs.foo.3"
+ assert export.fsal.cephx_key == "thekeyforclientabc"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash == 'root'
+ assert export.clients[0].access_type == 'rw'
+ assert export.clients[0].addresses == ["192.168.1.0/8"]
+ assert export.cluster_id == self.cluster_id
+
+ def _do_test_cluster_ls(self):
+ nfs_mod = Module('nfs', '', '')
+ cluster = NFSCluster(nfs_mod)
+
+ out = cluster.list_nfs_cluster()
+ assert out[0] == self.cluster_id
+
+ def test_cluster_ls(self):
+ self._do_mock_test(self._do_test_cluster_ls)
+
+ def _do_test_cluster_info(self):
+ nfs_mod = Module('nfs', '', '')
+ cluster = NFSCluster(nfs_mod)
+
+ out = cluster.show_nfs_cluster_info(self.cluster_id)
+ assert out == {"foo": {"virtual_ip": None, "backend": []}}
+
+ def test_cluster_info(self):
+ self._do_mock_test(self._do_test_cluster_info)
+
+ def _do_test_cluster_config(self):
+ nfs_mod = Module('nfs', '', '')
+ cluster = NFSCluster(nfs_mod)
+
+ out = cluster.get_nfs_cluster_config(self.cluster_id)
+ assert out == ""
+
+ cluster.set_nfs_cluster_config(self.cluster_id, '# foo\n')
+
+ out = cluster.get_nfs_cluster_config(self.cluster_id)
+ assert out == "# foo\n"
+
+ cluster.reset_nfs_cluster_config(self.cluster_id)
+
+ out = cluster.get_nfs_cluster_config(self.cluster_id)
+ assert out == ""
+
+ def test_cluster_config(self):
+ self._do_mock_test(self._do_test_cluster_config)
+
+
+@pytest.mark.parametrize(
+ "path,expected",
+ [
+ ("/foo/bar/baz", "/foo/bar/baz"),
+ ("/foo/bar/baz/", "/foo/bar/baz"),
+ ("/foo/bar/baz ", "/foo/bar/baz"),
+ ("/foo/./bar/baz", "/foo/bar/baz"),
+ ("/foo/bar/baz/..", "/foo/bar"),
+ ("//foo/bar/baz", "/foo/bar/baz"),
+ ("", ""),
+ ]
+)
+def test_normalize_path(path, expected):
+ assert normalize_path(path) == expected
+
+
+def test_ganesha_validate_squash():
+ """Check error handling of internal validation function for squash value."""
+ from nfs.ganesha_conf import _validate_squash
+ from nfs.exception import NFSInvalidOperation
+
+ _validate_squash("root")
+ with pytest.raises(NFSInvalidOperation):
+ _validate_squash("toot")
+
+
+def test_ganesha_validate_access_type():
+ """Check error handling of internal validation function for access type value."""
+ from nfs.ganesha_conf import _validate_access_type
+ from nfs.exception import NFSInvalidOperation
+
+ for ok in ("rw", "ro", "none"):
+ _validate_access_type(ok)
+ with pytest.raises(NFSInvalidOperation):
+ _validate_access_type("any")
diff --git a/src/pybind/mgr/nfs/utils.py b/src/pybind/mgr/nfs/utils.py
new file mode 100644
index 000000000..ba3190a96
--- /dev/null
+++ b/src/pybind/mgr/nfs/utils.py
@@ -0,0 +1,104 @@
+import functools
+import logging
+import stat
+from typing import List, Tuple, TYPE_CHECKING
+
+from object_format import ErrorResponseBase
+import orchestrator
+import cephfs
+from mgr_util import CephfsClient, open_filesystem
+
+if TYPE_CHECKING:
+ from nfs.module import Module
+
+EXPORT_PREFIX: str = "export-"
+CONF_PREFIX: str = "conf-nfs."
+USER_CONF_PREFIX: str = "userconf-nfs."
+
+log = logging.getLogger(__name__)
+
+
+class NonFatalError(ErrorResponseBase):
+ """Raise this exception when you want to interrupt the flow of a function
+ and return an informative message to the user. In certain situations the
+ NFS MGR module wants to indicate an action was or was not taken but still
+ return a success code so that non-interactive scripts continue as if the
+ overall action was completed.
+ """
+ def __init__(self, msg: str) -> None:
+ super().__init__(msg)
+ self.msg = msg
+
+ def format_response(self) -> Tuple[int, str, str]:
+ return 0, "", self.msg
+
+
+class ManualRestartRequired(NonFatalError):
+ """Raise this exception type if all other changes were successful but
+ user needs to manually restart nfs services.
+ """
+
+ def __init__(self, msg: str) -> None:
+ super().__init__(" ".join((msg, "(Manual Restart of NFS Pods required)")))
+
+
+def export_obj_name(export_id: int) -> str:
+ """Return a rados object name for the export."""
+ return f"{EXPORT_PREFIX}{export_id}"
+
+
+def conf_obj_name(cluster_id: str) -> str:
+ """Return a rados object name for the config."""
+ return f"{CONF_PREFIX}{cluster_id}"
+
+
+def user_conf_obj_name(cluster_id: str) -> str:
+ """Returna a rados object name for the user config."""
+ return f"{USER_CONF_PREFIX}{cluster_id}"
+
+
+def available_clusters(mgr: 'Module') -> List[str]:
+ '''
+ This method returns list of available cluster ids.
+ Service name is service_type.service_id
+ Example:
+ completion.result value:
+ <ServiceDescription of <NFSServiceSpec for service_name=nfs.vstart>>
+ return value: ['vstart']
+ '''
+ # TODO check cephadm cluster list with rados pool conf objects
+ completion = mgr.describe_service(service_type='nfs')
+ orchestrator.raise_if_exception(completion)
+ assert completion.result is not None
+ return [cluster.spec.service_id for cluster in completion.result
+ if cluster.spec.service_id]
+
+
+def restart_nfs_service(mgr: 'Module', cluster_id: str) -> None:
+ '''
+ This methods restarts the nfs daemons
+ '''
+ completion = mgr.service_action(action='restart',
+ service_name='nfs.' + cluster_id)
+ orchestrator.raise_if_exception(completion)
+
+
+def check_fs(mgr: 'Module', fs_name: str) -> bool:
+ '''
+ This method checks if given fs is valid
+ '''
+ fs_map = mgr.get('fs_map')
+ return fs_name in [fs['mdsmap']['fs_name'] for fs in fs_map['filesystems']]
+
+
+def cephfs_path_is_dir(mgr: 'Module', fs: str, path: str) -> None:
+ @functools.lru_cache(maxsize=1)
+ def _get_cephfs_client() -> CephfsClient:
+ return CephfsClient(mgr)
+ cephfs_client = _get_cephfs_client()
+
+ with open_filesystem(cephfs_client, fs) as fs_handle:
+ stx = fs_handle.statx(path.encode('utf-8'), cephfs.CEPH_STATX_MODE,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+ if not stat.S_ISDIR(stx.get('mode')):
+ raise NotADirectoryError()