summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/nfs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/nfs
parentInitial commit. (diff)
downloadceph-upstream/16.2.11+ds.tar.xz
ceph-upstream/16.2.11+ds.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/nfs/__init__.py7
-rw-r--r--src/pybind/mgr/nfs/cluster.py258
-rw-r--r--src/pybind/mgr/nfs/exception.py32
-rw-r--r--src/pybind/mgr/nfs/export.py814
-rw-r--r--src/pybind/mgr/nfs/export_utils.py521
-rw-r--r--src/pybind/mgr/nfs/module.py154
-rw-r--r--src/pybind/mgr/nfs/tests/__init__.py0
-rw-r--r--src/pybind/mgr/nfs/tests/test_nfs.py1036
-rw-r--r--src/pybind/mgr/nfs/utils.py59
9 files changed, 2881 insertions, 0 deletions
diff --git a/src/pybind/mgr/nfs/__init__.py b/src/pybind/mgr/nfs/__init__.py
new file mode 100644
index 000000000..4e2257788
--- /dev/null
+++ b/src/pybind/mgr/nfs/__init__.py
@@ -0,0 +1,7 @@
+# flake8: noqa
+
+import os
+if 'UNITTEST' in os.environ:
+ import tests
+
+from .module import Module
diff --git a/src/pybind/mgr/nfs/cluster.py b/src/pybind/mgr/nfs/cluster.py
new file mode 100644
index 000000000..1d8054411
--- /dev/null
+++ b/src/pybind/mgr/nfs/cluster.py
@@ -0,0 +1,258 @@
+import logging
+import json
+import re
+import socket
+from typing import cast, Dict, List, Any, Union, Optional, TYPE_CHECKING, Tuple
+
+from mgr_module import NFS_POOL_NAME as POOL_NAME
+from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec, IngressSpec
+
+import orchestrator
+
+from .exception import NFSInvalidOperation, ClusterNotFound
+from .utils import (available_clusters, restart_nfs_service, conf_obj_name,
+ user_conf_obj_name)
+from .export import NFSRados, exception_handler
+
+if TYPE_CHECKING:
+ from nfs.module import Module
+ from mgr_module import MgrModule
+
+
+log = logging.getLogger(__name__)
+
+
+def resolve_ip(hostname: str) -> str:
+ try:
+ r = socket.getaddrinfo(hostname, None, flags=socket.AI_CANONNAME,
+ type=socket.SOCK_STREAM)
+ # pick first v4 IP, if present
+ for a in r:
+ if a[0] == socket.AF_INET:
+ return a[4][0]
+ return r[0][4][0]
+ except socket.gaierror as e:
+ raise NFSInvalidOperation(f"Cannot resolve IP for host {hostname}: {e}")
+
+
+def create_ganesha_pool(mgr: 'MgrModule') -> None:
+ pool_list = [p['pool_name'] for p in mgr.get_osdmap().dump().get('pools', [])]
+ if POOL_NAME not in pool_list:
+ mgr.check_mon_command({'prefix': 'osd pool create', 'pool': POOL_NAME})
+ mgr.check_mon_command({'prefix': 'osd pool application enable',
+ 'pool': POOL_NAME,
+ 'app': 'nfs'})
+ log.debug("Successfully created nfs-ganesha pool %s", POOL_NAME)
+
+
+class NFSCluster:
+ def __init__(self, mgr: 'Module') -> None:
+ self.mgr = mgr
+
+ def _call_orch_apply_nfs(
+ self,
+ cluster_id: str,
+ placement: Optional[str],
+ virtual_ip: Optional[str] = None,
+ port: Optional[int] = None,
+ ) -> None:
+ if not port:
+ port = 2049 # default nfs port
+ if virtual_ip:
+ # nfs + ingress
+ # run NFS on non-standard port
+ spec = NFSServiceSpec(service_type='nfs', service_id=cluster_id,
+ placement=PlacementSpec.from_string(placement),
+ # use non-default port so we don't conflict with ingress
+ port=10000 + port) # semi-arbitrary, fix me someday
+ completion = self.mgr.apply_nfs(spec)
+ orchestrator.raise_if_exception(completion)
+ ispec = IngressSpec(service_type='ingress',
+ service_id='nfs.' + cluster_id,
+ backend_service='nfs.' + cluster_id,
+ frontend_port=port,
+ monitor_port=7000 + port, # semi-arbitrary, fix me someday
+ virtual_ip=virtual_ip)
+ completion = self.mgr.apply_ingress(ispec)
+ orchestrator.raise_if_exception(completion)
+ else:
+ # standalone nfs
+ spec = NFSServiceSpec(service_type='nfs', service_id=cluster_id,
+ placement=PlacementSpec.from_string(placement),
+ port=port)
+ completion = self.mgr.apply_nfs(spec)
+ orchestrator.raise_if_exception(completion)
+ log.debug("Successfully deployed nfs daemons with cluster id %s and placement %s",
+ cluster_id, placement)
+
+ def create_empty_rados_obj(self, cluster_id: str) -> None:
+ common_conf = conf_obj_name(cluster_id)
+ self._rados(cluster_id).write_obj('', conf_obj_name(cluster_id))
+ log.info("Created empty object:%s", common_conf)
+
+ def delete_config_obj(self, cluster_id: str) -> None:
+ self._rados(cluster_id).remove_all_obj()
+ log.info("Deleted %s object and all objects in %s",
+ conf_obj_name(cluster_id), cluster_id)
+
+ def create_nfs_cluster(
+ self,
+ cluster_id: str,
+ placement: Optional[str],
+ virtual_ip: Optional[str],
+ ingress: Optional[bool] = None,
+ port: Optional[int] = None,
+ ) -> Tuple[int, str, str]:
+ try:
+ if virtual_ip and not ingress:
+ raise NFSInvalidOperation('virtual_ip can only be provided with ingress enabled')
+ if not virtual_ip and ingress:
+ raise NFSInvalidOperation('ingress currently requires a virtual_ip')
+ invalid_str = re.search('[^A-Za-z0-9-_.]', cluster_id)
+ if invalid_str:
+ raise NFSInvalidOperation(f"cluster id {cluster_id} is invalid. "
+ f"{invalid_str.group()} is char not permitted")
+
+ create_ganesha_pool(self.mgr)
+
+ self.create_empty_rados_obj(cluster_id)
+
+ if cluster_id not in available_clusters(self.mgr):
+ self._call_orch_apply_nfs(cluster_id, placement, virtual_ip, port)
+ return 0, "NFS Cluster Created Successfully", ""
+ return 0, "", f"{cluster_id} cluster already exists"
+ except Exception as e:
+ return exception_handler(e, f"NFS Cluster {cluster_id} could not be created")
+
+ def delete_nfs_cluster(self, cluster_id: str) -> Tuple[int, str, str]:
+ try:
+ cluster_list = available_clusters(self.mgr)
+ if cluster_id in cluster_list:
+ self.mgr.export_mgr.delete_all_exports(cluster_id)
+ completion = self.mgr.remove_service('ingress.nfs.' + cluster_id)
+ orchestrator.raise_if_exception(completion)
+ completion = self.mgr.remove_service('nfs.' + cluster_id)
+ orchestrator.raise_if_exception(completion)
+ self.delete_config_obj(cluster_id)
+ return 0, "NFS Cluster Deleted Successfully", ""
+ return 0, "", "Cluster does not exist"
+ except Exception as e:
+ return exception_handler(e, f"Failed to delete NFS Cluster {cluster_id}")
+
+ def list_nfs_cluster(self) -> Tuple[int, str, str]:
+ try:
+ return 0, '\n'.join(available_clusters(self.mgr)), ""
+ except Exception as e:
+ return exception_handler(e, "Failed to list NFS Cluster")
+
+ def _show_nfs_cluster_info(self, cluster_id: str) -> Dict[str, Any]:
+ completion = self.mgr.list_daemons(daemon_type='nfs')
+ # Here completion.result is a list DaemonDescription objects
+ clusters = orchestrator.raise_if_exception(completion)
+ backends: List[Dict[str, Union[Any]]] = []
+
+ for cluster in clusters:
+ if cluster_id == cluster.service_id():
+ assert cluster.hostname
+ try:
+ if cluster.ip:
+ ip = cluster.ip
+ else:
+ c = self.mgr.get_hosts()
+ orchestrator.raise_if_exception(c)
+ hosts = [h for h in c.result or []
+ if h.hostname == cluster.hostname]
+ if hosts:
+ ip = resolve_ip(hosts[0].addr)
+ else:
+ # sigh
+ ip = resolve_ip(cluster.hostname)
+ backends.append({
+ "hostname": cluster.hostname,
+ "ip": ip,
+ "port": cluster.ports[0] if cluster.ports else None
+ })
+ except orchestrator.OrchestratorError:
+ continue
+
+ r: Dict[str, Any] = {
+ 'virtual_ip': None,
+ 'backend': backends,
+ }
+ sc = self.mgr.describe_service(service_type='ingress')
+ services = orchestrator.raise_if_exception(sc)
+ for i in services:
+ spec = cast(IngressSpec, i.spec)
+ if spec.backend_service == f'nfs.{cluster_id}':
+ r['virtual_ip'] = i.virtual_ip.split('/')[0] if i.virtual_ip else None
+ if i.ports:
+ r['port'] = i.ports[0]
+ if len(i.ports) > 1:
+ r['monitor_port'] = i.ports[1]
+ log.debug("Successfully fetched %s info: %s", cluster_id, r)
+ return r
+
+ def show_nfs_cluster_info(self, cluster_id: Optional[str] = None) -> Tuple[int, str, str]:
+ try:
+ info_res = {}
+ if cluster_id:
+ cluster_ls = [cluster_id]
+ else:
+ cluster_ls = available_clusters(self.mgr)
+
+ for cluster_id in cluster_ls:
+ res = self._show_nfs_cluster_info(cluster_id)
+ if res:
+ info_res[cluster_id] = res
+ return (0, json.dumps(info_res, indent=4), '')
+ except Exception as e:
+ return exception_handler(e, "Failed to show info for cluster")
+
+ def get_nfs_cluster_config(self, cluster_id: str) -> Tuple[int, str, str]:
+ try:
+ if cluster_id in available_clusters(self.mgr):
+ rados_obj = self._rados(cluster_id)
+ conf = rados_obj.read_obj(user_conf_obj_name(cluster_id))
+ return 0, conf or "", ""
+ raise ClusterNotFound()
+ except Exception as e:
+ return exception_handler(e, f"Fetching NFS-Ganesha Config failed for {cluster_id}")
+
+ def set_nfs_cluster_config(self, cluster_id: str, nfs_config: str) -> Tuple[int, str, str]:
+ try:
+ if cluster_id in available_clusters(self.mgr):
+ rados_obj = self._rados(cluster_id)
+ if rados_obj.check_user_config():
+ return 0, "", "NFS-Ganesha User Config already exists"
+ rados_obj.write_obj(nfs_config, user_conf_obj_name(cluster_id),
+ conf_obj_name(cluster_id))
+ log.debug("Successfully saved %s's user config: \n %s", cluster_id, nfs_config)
+ restart_nfs_service(self.mgr, cluster_id)
+ return 0, "NFS-Ganesha Config Set Successfully", ""
+ raise ClusterNotFound()
+ except NotImplementedError:
+ return 0, "NFS-Ganesha Config Added Successfully "\
+ "(Manual Restart of NFS PODS required)", ""
+ except Exception as e:
+ return exception_handler(e, f"Setting NFS-Ganesha Config failed for {cluster_id}")
+
+ def reset_nfs_cluster_config(self, cluster_id: str) -> Tuple[int, str, str]:
+ try:
+ if cluster_id in available_clusters(self.mgr):
+ rados_obj = self._rados(cluster_id)
+ if not rados_obj.check_user_config():
+ return 0, "", "NFS-Ganesha User Config does not exist"
+ rados_obj.remove_obj(user_conf_obj_name(cluster_id),
+ conf_obj_name(cluster_id))
+ restart_nfs_service(self.mgr, cluster_id)
+ return 0, "NFS-Ganesha Config Reset Successfully", ""
+ raise ClusterNotFound()
+ except NotImplementedError:
+ return 0, "NFS-Ganesha Config Removed Successfully "\
+ "(Manual Restart of NFS PODS required)", ""
+ except Exception as e:
+ return exception_handler(e, f"Resetting NFS-Ganesha Config failed for {cluster_id}")
+
+ def _rados(self, cluster_id: str) -> NFSRados:
+ """Return a new NFSRados object for the given cluster id."""
+ return NFSRados(self.mgr.rados, cluster_id)
diff --git a/src/pybind/mgr/nfs/exception.py b/src/pybind/mgr/nfs/exception.py
new file mode 100644
index 000000000..6c6e3d9f3
--- /dev/null
+++ b/src/pybind/mgr/nfs/exception.py
@@ -0,0 +1,32 @@
+import errno
+from typing import Optional
+
+
+class NFSException(Exception):
+ def __init__(self, err_msg: str, errno: int = -1) -> None:
+ super(NFSException, self).__init__(errno, err_msg)
+ self.errno = errno
+ self.err_msg = err_msg
+
+ def __str__(self) -> str:
+ return self.err_msg
+
+
+class NFSInvalidOperation(NFSException):
+ def __init__(self, err_msg: str) -> None:
+ super(NFSInvalidOperation, self).__init__(err_msg, -errno.EINVAL)
+
+
+class NFSObjectNotFound(NFSException):
+ def __init__(self, err_msg: str) -> None:
+ super(NFSObjectNotFound, self).__init__(err_msg, -errno.ENOENT)
+
+
+class FSNotFound(NFSObjectNotFound):
+ def __init__(self, fs_name: Optional[str]) -> None:
+ super(FSNotFound, self).__init__(f'filesystem {fs_name} not found')
+
+
+class ClusterNotFound(NFSObjectNotFound):
+ def __init__(self) -> None:
+ super(ClusterNotFound, self).__init__('cluster does not exist')
diff --git a/src/pybind/mgr/nfs/export.py b/src/pybind/mgr/nfs/export.py
new file mode 100644
index 000000000..3c4ce8625
--- /dev/null
+++ b/src/pybind/mgr/nfs/export.py
@@ -0,0 +1,814 @@
+import errno
+import json
+import logging
+from typing import (
+ List,
+ Any,
+ Dict,
+ Tuple,
+ Optional,
+ TYPE_CHECKING,
+ TypeVar,
+ Callable,
+ Set,
+ cast)
+from os.path import normpath
+
+from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES
+
+from orchestrator import NoOrchestrator
+from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS
+
+from .export_utils import GaneshaConfParser, Export, RawBlock, CephFSFSAL, RGWFSAL
+from .exception import NFSException, NFSInvalidOperation, FSNotFound
+from .utils import (
+ CONF_PREFIX,
+ EXPORT_PREFIX,
+ USER_CONF_PREFIX,
+ export_obj_name,
+ conf_obj_name,
+ available_clusters,
+ check_fs,
+ restart_nfs_service)
+
+if TYPE_CHECKING:
+ from nfs.module import Module
+
+FuncT = TypeVar('FuncT', bound=Callable)
+
+log = logging.getLogger(__name__)
+
+
+def known_cluster_ids(mgr: 'Module') -> Set[str]:
+ """Return the set of known cluster IDs."""
+ try:
+ clusters = set(available_clusters(mgr))
+ except NoOrchestrator:
+ clusters = nfs_rados_configs(mgr.rados)
+ return clusters
+
+
+def export_cluster_checker(func: FuncT) -> FuncT:
+ def cluster_check(
+ export: 'ExportMgr',
+ *args: Any,
+ **kwargs: Any
+ ) -> Tuple[int, str, str]:
+ """
+ This method checks if cluster exists
+ """
+ clusters = known_cluster_ids(export.mgr)
+ cluster_id: str = kwargs['cluster_id']
+ log.debug("checking for %r in known nfs clusters: %r",
+ cluster_id, clusters)
+ if cluster_id not in clusters:
+ return -errno.ENOENT, "", "Cluster does not exist"
+ return func(export, *args, **kwargs)
+ return cast(FuncT, cluster_check)
+
+
+def exception_handler(
+ exception_obj: Exception,
+ log_msg: str = ""
+) -> Tuple[int, str, str]:
+ if log_msg:
+ log.exception(log_msg)
+ return getattr(exception_obj, 'errno', -1), "", str(exception_obj)
+
+
+def _check_rados_notify(ioctx: Any, obj: str) -> None:
+ try:
+ ioctx.notify(obj)
+ except TimedOut:
+ log.exception("Ganesha timed out")
+
+
+def normalize_path(path: str) -> str:
+ if path:
+ path = normpath(path.strip())
+ if path[:2] == "//":
+ path = path[1:]
+ return path
+
+
+class NFSRados:
+ def __init__(self, rados: 'Rados', namespace: str) -> None:
+ self.rados = rados
+ self.pool = POOL_NAME
+ self.namespace = namespace
+
+ def _make_rados_url(self, obj: str) -> str:
+ return "rados://{}/{}/{}".format(self.pool, self.namespace, obj)
+
+ def _create_url_block(self, obj_name: str) -> RawBlock:
+ return RawBlock('%url', values={'value': self._make_rados_url(obj_name)})
+
+ def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ ioctx.write_full(obj, conf_block.encode('utf-8'))
+ if not config_obj:
+ # Return after creating empty common config object
+ return
+ log.debug("write configuration into rados object %s/%s/%s",
+ self.pool, self.namespace, obj)
+
+ # Add created obj url to common config obj
+ ioctx.append(config_obj, GaneshaConfParser.write_block(
+ self._create_url_block(obj)).encode('utf-8'))
+ _check_rados_notify(ioctx, config_obj)
+ log.debug("Added %s url to %s", obj, config_obj)
+
+ def read_obj(self, obj: str) -> Optional[str]:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ try:
+ return ioctx.read(obj, 1048576).decode()
+ except ObjectNotFound:
+ return None
+
+ def update_obj(self, conf_block: str, obj: str, config_obj: str,
+ should_notify: Optional[bool] = True) -> None:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ ioctx.write_full(obj, conf_block.encode('utf-8'))
+ log.debug("write configuration into rados object %s/%s/%s",
+ self.pool, self.namespace, obj)
+ if should_notify:
+ _check_rados_notify(ioctx, config_obj)
+ log.debug("Update export %s in %s", obj, config_obj)
+
+ def remove_obj(self, obj: str, config_obj: str) -> None:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ export_urls = ioctx.read(config_obj)
+ url = '%url "{}"\n\n'.format(self._make_rados_url(obj))
+ export_urls = export_urls.replace(url.encode('utf-8'), b'')
+ ioctx.remove_object(obj)
+ ioctx.write_full(config_obj, export_urls)
+ _check_rados_notify(ioctx, config_obj)
+ log.debug("Object deleted: %s", url)
+
+ def remove_all_obj(self) -> None:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ for obj in ioctx.list_objects():
+ obj.remove()
+
+ def check_user_config(self) -> bool:
+ with self.rados.open_ioctx(self.pool) as ioctx:
+ ioctx.set_namespace(self.namespace)
+ for obj in ioctx.list_objects():
+ if obj.key.startswith(USER_CONF_PREFIX):
+ return True
+ return False
+
+
+def nfs_rados_configs(rados: 'Rados', nfs_pool: str = POOL_NAME) -> Set[str]:
+ """Return a set of all the namespaces in the nfs_pool where nfs
+ configuration objects are found. The namespaces also correspond
+ to the cluster ids.
+ """
+ ns: Set[str] = set()
+ prefixes = (EXPORT_PREFIX, CONF_PREFIX, USER_CONF_PREFIX)
+ with rados.open_ioctx(nfs_pool) as ioctx:
+ ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
+ for obj in ioctx.list_objects():
+ if obj.key.startswith(prefixes):
+ ns.add(obj.nspace)
+ return ns
+
+
+class ExportMgr:
+ def __init__(
+ self,
+ mgr: 'Module',
+ export_ls: Optional[Dict[str, List[Export]]] = None
+ ) -> None:
+ self.mgr = mgr
+ self.rados_pool = POOL_NAME
+ self._exports: Optional[Dict[str, List[Export]]] = export_ls
+
+ @property
+ def exports(self) -> Dict[str, List[Export]]:
+ if self._exports is None:
+ self._exports = {}
+ log.info("Begin export parsing")
+ for cluster_id in known_cluster_ids(self.mgr):
+ self.export_conf_objs = [] # type: List[Export]
+ self._read_raw_config(cluster_id)
+ self._exports[cluster_id] = self.export_conf_objs
+ log.info("Exports parsed successfully %s", self.exports.items())
+ return self._exports
+
+ def _fetch_export(
+ self,
+ cluster_id: str,
+ pseudo_path: str
+ ) -> Optional[Export]:
+ try:
+ for ex in self.exports[cluster_id]:
+ if ex.pseudo == pseudo_path:
+ return ex
+ return None
+ except KeyError:
+ log.info('no exports for cluster %s', cluster_id)
+ return None
+
+ def _fetch_export_id(
+ self,
+ cluster_id: str,
+ export_id: int
+ ) -> Optional[Export]:
+ try:
+ for ex in self.exports[cluster_id]:
+ if ex.export_id == export_id:
+ return ex
+ return None
+ except KeyError:
+ log.info(f'no exports for cluster {cluster_id}')
+ return None
+
+ def _delete_export_user(self, export: Export) -> None:
+ if isinstance(export.fsal, CephFSFSAL):
+ assert export.fsal.user_id
+ self.mgr.check_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': 'client.{}'.format(export.fsal.user_id),
+ })
+ log.info("Deleted export user %s", export.fsal.user_id)
+ elif isinstance(export.fsal, RGWFSAL):
+ # do nothing; we're using the bucket owner creds.
+ pass
+
+ def _create_export_user(self, export: Export) -> None:
+ if isinstance(export.fsal, CephFSFSAL):
+ fsal = cast(CephFSFSAL, export.fsal)
+ assert fsal.fs_name
+ fsal.user_id = f"nfs.{export.cluster_id}.{export.export_id}"
+ fsal.cephx_key = self._create_user_key(
+ export.cluster_id, fsal.user_id, export.path, fsal.fs_name
+ )
+ log.debug("Successfully created user %s for cephfs path %s", fsal.user_id, export.path)
+
+ elif isinstance(export.fsal, RGWFSAL):
+ rgwfsal = cast(RGWFSAL, export.fsal)
+ if not rgwfsal.user_id:
+ assert export.path
+ ret, out, err = self.mgr.tool_exec(
+ ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path]
+ )
+ if ret:
+ raise NFSException(f'Failed to fetch owner for bucket {export.path}')
+ j = json.loads(out)
+ owner = j.get('owner', '')
+ rgwfsal.user_id = owner
+ assert rgwfsal.user_id
+ ret, out, err = self.mgr.tool_exec([
+ 'radosgw-admin', 'user', 'info', '--uid', rgwfsal.user_id
+ ])
+ if ret:
+ raise NFSException(
+ f'Failed to fetch key for bucket {export.path} owner {rgwfsal.user_id}'
+ )
+ j = json.loads(out)
+
+ # FIXME: make this more tolerate of unexpected output?
+ rgwfsal.access_key_id = j['keys'][0]['access_key']
+ rgwfsal.secret_access_key = j['keys'][0]['secret_key']
+ log.debug("Successfully fetched user %s for RGW path %s", rgwfsal.user_id, export.path)
+
+ def _gen_export_id(self, cluster_id: str) -> int:
+ exports = sorted([ex.export_id for ex in self.exports[cluster_id]])
+ nid = 1
+ for e_id in exports:
+ if e_id == nid:
+ nid += 1
+ else:
+ break
+ return nid
+
+ def _read_raw_config(self, rados_namespace: str) -> None:
+ with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
+ ioctx.set_namespace(rados_namespace)
+ for obj in ioctx.list_objects():
+ if obj.key.startswith(EXPORT_PREFIX):
+ size, _ = obj.stat()
+ raw_config = obj.read(size)
+ raw_config = raw_config.decode("utf-8")
+ log.debug("read export configuration from rados "
+ "object %s/%s/%s", self.rados_pool,
+ rados_namespace, obj.key)
+ self.export_conf_objs.append(Export.from_export_block(
+ GaneshaConfParser(raw_config).parse()[0], rados_namespace))
+
+ def _save_export(self, cluster_id: str, export: Export) -> None:
+ self.exports[cluster_id].append(export)
+ self._rados(cluster_id).write_obj(
+ GaneshaConfParser.write_block(export.to_export_block()),
+ export_obj_name(export.export_id),
+ conf_obj_name(export.cluster_id)
+ )
+
+ def _delete_export(
+ self,
+ cluster_id: str,
+ pseudo_path: Optional[str],
+ export_obj: Optional[Export] = None
+ ) -> Tuple[int, str, str]:
+ try:
+ if export_obj:
+ export: Optional[Export] = export_obj
+ else:
+ assert pseudo_path
+ export = self._fetch_export(cluster_id, pseudo_path)
+
+ if export:
+ if pseudo_path:
+ self._rados(cluster_id).remove_obj(
+ export_obj_name(export.export_id), conf_obj_name(cluster_id))
+ self.exports[cluster_id].remove(export)
+ self._delete_export_user(export)
+ if not self.exports[cluster_id]:
+ del self.exports[cluster_id]
+ log.debug("Deleted all exports for cluster %s", cluster_id)
+ return 0, "Successfully deleted export", ""
+ return 0, "", "Export does not exist"
+ except Exception as e:
+ return exception_handler(e, f"Failed to delete {pseudo_path} export for {cluster_id}")
+
+ def _fetch_export_obj(self, cluster_id: str, ex_id: int) -> Optional[Export]:
+ try:
+ with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
+ ioctx.set_namespace(cluster_id)
+ export = Export.from_export_block(
+ GaneshaConfParser(
+ ioctx.read(export_obj_name(ex_id)).decode("utf-8")
+ ).parse()[0],
+ cluster_id
+ )
+ return export
+ except ObjectNotFound:
+ log.exception("Export ID: %s not found", ex_id)
+ return None
+
+ def _update_export(self, cluster_id: str, export: Export,
+ need_nfs_service_restart: bool) -> None:
+ self.exports[cluster_id].append(export)
+ self._rados(cluster_id).update_obj(
+ GaneshaConfParser.write_block(export.to_export_block()),
+ export_obj_name(export.export_id), conf_obj_name(export.cluster_id),
+ should_notify=not need_nfs_service_restart)
+ if need_nfs_service_restart:
+ restart_nfs_service(self.mgr, export.cluster_id)
+
+ @export_cluster_checker
+ def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Tuple[int, str, str]:
+ # if addr(s) are provided, construct client list and adjust outer block
+ clients = []
+ if addr:
+ clients = [{
+ 'addresses': addr,
+ 'access_type': 'ro' if kwargs['read_only'] else 'rw',
+ 'squash': kwargs['squash'],
+ }]
+ kwargs['squash'] = 'none'
+ kwargs['clients'] = clients
+
+ if clients:
+ kwargs['access_type'] = "none"
+ elif kwargs['read_only']:
+ kwargs['access_type'] = "RO"
+ else:
+ kwargs['access_type'] = "RW"
+
+ if kwargs['cluster_id'] not in self.exports:
+ self.exports[kwargs['cluster_id']] = []
+
+ try:
+ fsal_type = kwargs.pop('fsal_type')
+ if fsal_type == 'cephfs':
+ return self.create_cephfs_export(**kwargs)
+ if fsal_type == 'rgw':
+ return self.create_rgw_export(**kwargs)
+ raise NotImplementedError()
+ except Exception as e:
+ return exception_handler(e, f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}")
+
+ @export_cluster_checker
+ def delete_export(self,
+ cluster_id: str,
+ pseudo_path: str) -> Tuple[int, str, str]:
+ return self._delete_export(cluster_id, pseudo_path)
+
+ def delete_all_exports(self, cluster_id: str) -> None:
+ try:
+ export_list = list(self.exports[cluster_id])
+ except KeyError:
+ log.info("No exports to delete")
+ return
+ for export in export_list:
+ ret, out, err = self._delete_export(cluster_id=cluster_id, pseudo_path=None,
+ export_obj=export)
+ if ret != 0:
+ raise NFSException(f"Failed to delete exports: {err} and {ret}")
+ log.info("All exports successfully deleted for cluster id: %s", cluster_id)
+
+ def list_all_exports(self) -> List[Dict[str, Any]]:
+ r = []
+ for cluster_id, ls in self.exports.items():
+ r.extend([e.to_dict() for e in ls])
+ return r
+
+ @export_cluster_checker
+ def list_exports(self,
+ cluster_id: str,
+ detailed: bool = False) -> Tuple[int, str, str]:
+ try:
+ if detailed:
+ result_d = [export.to_dict() for export in self.exports[cluster_id]]
+ return 0, json.dumps(result_d, indent=2), ''
+ else:
+ result_ps = [export.pseudo for export in self.exports[cluster_id]]
+ return 0, json.dumps(result_ps, indent=2), ''
+
+ except KeyError:
+ log.warning("No exports to list for %s", cluster_id)
+ return 0, '', ''
+ except Exception as e:
+ return exception_handler(e, f"Failed to list exports for {cluster_id}")
+
+ def _get_export_dict(self, cluster_id: str, pseudo_path: str) -> Optional[Dict[str, Any]]:
+ export = self._fetch_export(cluster_id, pseudo_path)
+ if export:
+ return export.to_dict()
+ log.warning(f"No {pseudo_path} export to show for {cluster_id}")
+ return None
+
+ @export_cluster_checker
+ def get_export(
+ self,
+ cluster_id: str,
+ pseudo_path: str,
+ ) -> Tuple[int, str, str]:
+ try:
+ export_dict = self._get_export_dict(cluster_id, pseudo_path)
+ if export_dict:
+ return 0, json.dumps(export_dict, indent=2), ''
+ log.warning("No %s export to show for %s", pseudo_path, cluster_id)
+ return 0, '', ''
+ except Exception as e:
+ return exception_handler(e, f"Failed to get {pseudo_path} export for {cluster_id}")
+
+ def get_export_by_id(
+ self,
+ cluster_id: str,
+ export_id: int
+ ) -> Optional[Dict[str, Any]]:
+ export = self._fetch_export_id(cluster_id, export_id)
+ return export.to_dict() if export else None
+
+ def get_export_by_pseudo(
+ self,
+ cluster_id: str,
+ pseudo_path: str
+ ) -> Optional[Dict[str, Any]]:
+ export = self._fetch_export(cluster_id, pseudo_path)
+ return export.to_dict() if export else None
+
+ def apply_export(self, cluster_id: str, export_config: str) -> Tuple[int, str, str]:
+ try:
+ if not export_config:
+ raise NFSInvalidOperation("Empty Config!!")
+ try:
+ j = json.loads(export_config)
+ except ValueError:
+ # okay, not JSON. is it an EXPORT block?
+ try:
+ blocks = GaneshaConfParser(export_config).parse()
+ exports = [
+ Export.from_export_block(block, cluster_id)
+ for block in blocks
+ ]
+ j = [export.to_dict() for export in exports]
+ except Exception as ex:
+ raise NFSInvalidOperation(f"Input must be JSON or a ganesha EXPORT block: {ex}")
+
+ # check export type
+ if isinstance(j, list):
+ ret, out, err = (0, '', '')
+ for export in j:
+ try:
+ r, o, e = self._apply_export(cluster_id, export)
+ except Exception as ex:
+ r, o, e = exception_handler(ex, f'Failed to apply export: {ex}')
+ if r:
+ ret = r
+ if o:
+ out += o + '\n'
+ if e:
+ err += e + '\n'
+ return ret, out, err
+ else:
+ r, o, e = self._apply_export(cluster_id, j)
+ return r, o, e
+ except NotImplementedError:
+ return 0, " Manual Restart of NFS PODS required for successful update of exports", ""
+ except Exception as e:
+ return exception_handler(e, f'Failed to update export: {e}')
+
+ def _update_user_id(
+ self,
+ cluster_id: str,
+ path: str,
+ fs_name: str,
+ user_id: str
+ ) -> None:
+ osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
+ self.rados_pool, cluster_id, fs_name)
+ # NFS-Ganesha can dynamically enforce an export's access type changes, but Ceph server
+ # daemons can't dynamically enforce changes in Ceph user caps of the Ceph clients. To
+ # allow dynamic updates of CephFS NFS exports, always set FSAL Ceph user's MDS caps with
+ # path restricted read-write access. Rely on the ganesha servers to enforce the export
+ # access type requested for the NFS clients.
+ self.mgr.check_mon_command({
+ 'prefix': 'auth caps',
+ 'entity': f'client.{user_id}',
+ 'caps': ['mon', 'allow r', 'osd', osd_cap, 'mds', 'allow rw path={}'.format(path)],
+ })
+
+ log.info("Export user updated %s", user_id)
+
+ def _create_user_key(
+ self,
+ cluster_id: str,
+ entity: str,
+ path: str,
+ fs_name: str,
+ ) -> str:
+ osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
+ self.rados_pool, cluster_id, fs_name)
+ nfs_caps = [
+ 'mon', 'allow r',
+ 'osd', osd_cap,
+ 'mds', 'allow rw path={}'.format(path)
+ ]
+
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth get-or-create',
+ 'entity': 'client.{}'.format(entity),
+ 'caps': nfs_caps,
+ 'format': 'json',
+ })
+ if ret == -errno.EINVAL and 'does not match' in err:
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth caps',
+ 'entity': 'client.{}'.format(entity),
+ 'caps': nfs_caps,
+ 'format': 'json',
+ })
+ if err:
+ raise NFSException(f'Failed to update caps for {entity}: {err}')
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth get',
+ 'entity': 'client.{}'.format(entity),
+ 'format': 'json',
+ })
+ if err:
+ raise NFSException(f'Failed to fetch caps for {entity}: {err}')
+
+ json_res = json.loads(out)
+ log.info("Export user created is %s", json_res[0]['entity'])
+ return json_res[0]['key']
+
+ def create_export_from_dict(self,
+ cluster_id: str,
+ ex_id: int,
+ ex_dict: Dict[str, Any]) -> Export:
+ pseudo_path = ex_dict.get("pseudo")
+ if not pseudo_path:
+ raise NFSInvalidOperation("export must specify pseudo path")
+
+ path = ex_dict.get("path")
+ if path is None:
+ raise NFSInvalidOperation("export must specify path")
+ path = normalize_path(path)
+
+ fsal = ex_dict.get("fsal", {})
+ fsal_type = fsal.get("name")
+ if fsal_type == NFS_GANESHA_SUPPORTED_FSALS[1]:
+ if '/' in path and path != '/':
+ raise NFSInvalidOperation('"/" is not allowed in path with bucket name')
+ elif fsal_type == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ fs_name = fsal.get("fs_name")
+ if not fs_name:
+ raise NFSInvalidOperation("export FSAL must specify fs_name")
+ if not check_fs(self.mgr, fs_name):
+ raise FSNotFound(fs_name)
+
+ user_id = f"nfs.{cluster_id}.{ex_id}"
+ if "user_id" in fsal and fsal["user_id"] != user_id:
+ raise NFSInvalidOperation(f"export FSAL user_id must be '{user_id}'")
+ else:
+ raise NFSInvalidOperation(f"NFS Ganesha supported FSALs are {NFS_GANESHA_SUPPORTED_FSALS}."
+ "Export must specify any one of it.")
+
+ ex_dict["fsal"] = fsal
+ ex_dict["cluster_id"] = cluster_id
+ export = Export.from_dict(ex_id, ex_dict)
+ export.validate(self.mgr)
+ log.debug("Successfully created %s export-%s from dict for cluster %s",
+ fsal_type, ex_id, cluster_id)
+ return export
+
+ def create_cephfs_export(self,
+ fs_name: str,
+ cluster_id: str,
+ pseudo_path: str,
+ read_only: bool,
+ path: str,
+ squash: str,
+ access_type: str,
+ clients: list = []) -> Tuple[int, str, str]:
+ pseudo_path = normalize_path(pseudo_path)
+
+ if not self._fetch_export(cluster_id, pseudo_path):
+ export = self.create_export_from_dict(
+ cluster_id,
+ self._gen_export_id(cluster_id),
+ {
+ "pseudo": pseudo_path,
+ "path": path,
+ "access_type": access_type,
+ "squash": squash,
+ "fsal": {
+ "name": NFS_GANESHA_SUPPORTED_FSALS[0],
+ "fs_name": fs_name,
+ },
+ "clients": clients,
+ }
+ )
+ log.debug("creating cephfs export %s", export)
+ self._create_export_user(export)
+ self._save_export(cluster_id, export)
+ result = {
+ "bind": export.pseudo,
+ "fs": fs_name,
+ "path": export.path,
+ "cluster": cluster_id,
+ "mode": export.access_type,
+ }
+ return (0, json.dumps(result, indent=4), '')
+ return 0, "", "Export already exists"
+
+ def create_rgw_export(self,
+ cluster_id: str,
+ pseudo_path: str,
+ access_type: str,
+ read_only: bool,
+ squash: str,
+ bucket: Optional[str] = None,
+ user_id: Optional[str] = None,
+ clients: list = []) -> Tuple[int, str, str]:
+ pseudo_path = normalize_path(pseudo_path)
+
+ if not bucket and not user_id:
+ return -errno.EINVAL, "", "Must specify either bucket or user_id"
+
+ if not self._fetch_export(cluster_id, pseudo_path):
+ export = self.create_export_from_dict(
+ cluster_id,
+ self._gen_export_id(cluster_id),
+ {
+ "pseudo": pseudo_path,
+ "path": bucket or '/',
+ "access_type": access_type,
+ "squash": squash,
+ "fsal": {
+ "name": NFS_GANESHA_SUPPORTED_FSALS[1],
+ "user_id": user_id,
+ },
+ "clients": clients,
+ }
+ )
+ log.debug("creating rgw export %s", export)
+ self._create_export_user(export)
+ self._save_export(cluster_id, export)
+ result = {
+ "bind": export.pseudo,
+ "path": export.path,
+ "cluster": cluster_id,
+ "mode": export.access_type,
+ "squash": export.squash,
+ }
+ return (0, json.dumps(result, indent=4), '')
+ return 0, "", "Export already exists"
+
+ def _apply_export(
+ self,
+ cluster_id: str,
+ new_export_dict: Dict,
+ ) -> Tuple[int, str, str]:
+ for k in ['path', 'pseudo']:
+ if k not in new_export_dict:
+ raise NFSInvalidOperation(f'Export missing required field {k}')
+ if cluster_id not in self.exports:
+ self.exports[cluster_id] = []
+
+ new_export_dict['path'] = normalize_path(new_export_dict['path'])
+ new_export_dict['pseudo'] = normalize_path(new_export_dict['pseudo'])
+
+ old_export = self._fetch_export(cluster_id, new_export_dict['pseudo'])
+ if old_export:
+ # Check if export id matches
+ if new_export_dict.get('export_id'):
+ if old_export.export_id != new_export_dict.get('export_id'):
+ raise NFSInvalidOperation('Export ID changed, Cannot update export')
+ else:
+ new_export_dict['export_id'] = old_export.export_id
+ elif new_export_dict.get('export_id'):
+ old_export = self._fetch_export_obj(cluster_id, new_export_dict['export_id'])
+ if old_export:
+ # re-fetch via old pseudo
+ old_export = self._fetch_export(cluster_id, old_export.pseudo)
+ assert old_export
+ log.debug("export %s pseudo %s -> %s",
+ old_export.export_id, old_export.pseudo, new_export_dict['pseudo'])
+
+ new_export = self.create_export_from_dict(
+ cluster_id,
+ new_export_dict.get('export_id', self._gen_export_id(cluster_id)),
+ new_export_dict
+ )
+
+ if not old_export:
+ self._create_export_user(new_export)
+ self._save_export(cluster_id, new_export)
+ return 0, f'Added export {new_export.pseudo}', ''
+
+ need_nfs_service_restart = True
+ if old_export.fsal.name != new_export.fsal.name:
+ raise NFSInvalidOperation('FSAL change not allowed')
+ if old_export.pseudo != new_export.pseudo:
+ log.debug('export %s pseudo %s -> %s',
+ new_export.export_id, old_export.pseudo, new_export.pseudo)
+
+ if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ old_fsal = cast(CephFSFSAL, old_export.fsal)
+ new_fsal = cast(CephFSFSAL, new_export.fsal)
+ if old_fsal.user_id != new_fsal.user_id:
+ self._delete_export_user(old_export)
+ self._create_export_user(new_export)
+ elif (
+ old_export.path != new_export.path
+ or old_fsal.fs_name != new_fsal.fs_name
+ ):
+ self._update_user_id(
+ cluster_id,
+ new_export.path,
+ cast(str, new_fsal.fs_name),
+ cast(str, new_fsal.user_id)
+ )
+ new_fsal.cephx_key = old_fsal.cephx_key
+ else:
+ expected_mds_caps = 'allow rw path={}'.format(new_export.path)
+ entity = new_fsal.user_id
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth get',
+ 'entity': 'client.{}'.format(entity),
+ 'format': 'json',
+ })
+ if ret:
+ raise NFSException(f'Failed to fetch caps for {entity}: {err}')
+ actual_mds_caps = json.loads(out)[0]['caps'].get('mds')
+ if actual_mds_caps != expected_mds_caps:
+ self._update_user_id(
+ cluster_id,
+ new_export.path,
+ cast(str, new_fsal.fs_name),
+ cast(str, new_fsal.user_id)
+ )
+ elif old_export.pseudo == new_export.pseudo:
+ need_nfs_service_restart = False
+ new_fsal.cephx_key = old_fsal.cephx_key
+
+ if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]:
+ old_rgw_fsal = cast(RGWFSAL, old_export.fsal)
+ new_rgw_fsal = cast(RGWFSAL, new_export.fsal)
+ if old_rgw_fsal.user_id != new_rgw_fsal.user_id:
+ self._delete_export_user(old_export)
+ self._create_export_user(new_export)
+ elif old_rgw_fsal.access_key_id != new_rgw_fsal.access_key_id:
+ raise NFSInvalidOperation('access_key_id change is not allowed')
+ elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key:
+ raise NFSInvalidOperation('secret_access_key change is not allowed')
+
+ self.exports[cluster_id].remove(old_export)
+
+ self._update_export(cluster_id, new_export, need_nfs_service_restart)
+
+ return 0, f"Updated export {new_export.pseudo}", ""
+
+ def _rados(self, cluster_id: str) -> NFSRados:
+ """Return a new NFSRados object for the given cluster id."""
+ return NFSRados(self.mgr.rados, cluster_id)
diff --git a/src/pybind/mgr/nfs/export_utils.py b/src/pybind/mgr/nfs/export_utils.py
new file mode 100644
index 000000000..873354536
--- /dev/null
+++ b/src/pybind/mgr/nfs/export_utils.py
@@ -0,0 +1,521 @@
+from typing import cast, List, Dict, Any, Optional, TYPE_CHECKING
+from os.path import isabs
+
+from mgr_module import NFS_GANESHA_SUPPORTED_FSALS
+
+from .exception import NFSInvalidOperation, FSNotFound
+from .utils import check_fs
+
+if TYPE_CHECKING:
+ from nfs.module import Module
+
+
+class RawBlock():
+ def __init__(self, block_name: str, blocks: List['RawBlock'] = [], values: Dict[str, Any] = {}):
+ if not values: # workaround mutable default argument
+ values = {}
+ if not blocks: # workaround mutable default argument
+ blocks = []
+ self.block_name = block_name
+ self.blocks = blocks
+ self.values = values
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, RawBlock):
+ return False
+ return self.block_name == other.block_name and \
+ self.blocks == other.blocks and \
+ self.values == other.values
+
+ def __repr__(self) -> str:
+ return f'RawBlock({self.block_name!r}, {self.blocks!r}, {self.values!r})'
+
+
+class GaneshaConfParser:
+ def __init__(self, raw_config: str):
+ self.pos = 0
+ self.text = ""
+ for line in raw_config.split("\n"):
+ line = line.lstrip()
+
+ if line.startswith("%"):
+ self.text += line.replace('"', "")
+ self.text += "\n"
+ else:
+ self.text += "".join(line.split())
+
+ def stream(self) -> str:
+ return self.text[self.pos:]
+
+ def last_context(self) -> str:
+ return f'"...{self.text[max(0, self.pos - 30):self.pos]}<here>{self.stream()[:30]}"'
+
+ def parse_block_name(self) -> str:
+ idx = self.stream().find('{')
+ if idx == -1:
+ raise Exception(f"Cannot find block name at {self.last_context()}")
+ block_name = self.stream()[:idx]
+ self.pos += idx + 1
+ return block_name
+
+ def parse_block_or_section(self) -> RawBlock:
+ if self.stream().startswith("%url "):
+ # section line
+ self.pos += 5
+ idx = self.stream().find('\n')
+ if idx == -1:
+ value = self.stream()
+ self.pos += len(value)
+ else:
+ value = self.stream()[:idx]
+ self.pos += idx + 1
+ block_dict = RawBlock('%url', values={'value': value})
+ return block_dict
+
+ block_dict = RawBlock(self.parse_block_name().upper())
+ self.parse_block_body(block_dict)
+ if self.stream()[0] != '}':
+ raise Exception("No closing bracket '}' found at the end of block")
+ self.pos += 1
+ return block_dict
+
+ def parse_parameter_value(self, raw_value: str) -> Any:
+ if raw_value.find(',') != -1:
+ return [self.parse_parameter_value(v.strip())
+ for v in raw_value.split(',')]
+ try:
+ return int(raw_value)
+ except ValueError:
+ if raw_value == "true":
+ return True
+ if raw_value == "false":
+ return False
+ if raw_value.find('"') == 0:
+ return raw_value[1:-1]
+ return raw_value
+
+ def parse_stanza(self, block_dict: RawBlock) -> None:
+ equal_idx = self.stream().find('=')
+ if equal_idx == -1:
+ raise Exception("Malformed stanza: no equal symbol found.")
+ semicolon_idx = self.stream().find(';')
+ parameter_name = self.stream()[:equal_idx].lower()
+ parameter_value = self.stream()[equal_idx + 1:semicolon_idx]
+ block_dict.values[parameter_name] = self.parse_parameter_value(parameter_value)
+ self.pos += semicolon_idx + 1
+
+ def parse_block_body(self, block_dict: RawBlock) -> None:
+ while True:
+ if self.stream().find('}') == 0:
+ # block end
+ return
+
+ last_pos = self.pos
+ semicolon_idx = self.stream().find(';')
+ lbracket_idx = self.stream().find('{')
+ is_semicolon = (semicolon_idx != -1)
+ is_lbracket = (lbracket_idx != -1)
+ is_semicolon_lt_lbracket = (semicolon_idx < lbracket_idx)
+
+ if is_semicolon and ((is_lbracket and is_semicolon_lt_lbracket) or not is_lbracket):
+ self.parse_stanza(block_dict)
+ elif is_lbracket and ((is_semicolon and not is_semicolon_lt_lbracket)
+ or (not is_semicolon)):
+ block_dict.blocks.append(self.parse_block_or_section())
+ else:
+ raise Exception("Malformed stanza: no semicolon found.")
+
+ if last_pos == self.pos:
+ raise Exception("Infinite loop while parsing block content")
+
+ def parse(self) -> List[RawBlock]:
+ blocks = []
+ while self.stream():
+ blocks.append(self.parse_block_or_section())
+ return blocks
+
+ @staticmethod
+ def _indentation(depth: int, size: int = 4) -> str:
+ conf_str = ""
+ for _ in range(0, depth * size):
+ conf_str += " "
+ return conf_str
+
+ @staticmethod
+ def write_block_body(block: RawBlock, depth: int = 0) -> str:
+ def format_val(key: str, val: str) -> str:
+ if isinstance(val, list):
+ return ', '.join([format_val(key, v) for v in val])
+ if isinstance(val, bool):
+ return str(val).lower()
+ if isinstance(val, int) or (block.block_name == 'CLIENT'
+ and key == 'clients'):
+ return '{}'.format(val)
+ return '"{}"'.format(val)
+
+ conf_str = ""
+ for blo in block.blocks:
+ conf_str += GaneshaConfParser.write_block(blo, depth)
+
+ for key, val in block.values.items():
+ if val is not None:
+ conf_str += GaneshaConfParser._indentation(depth)
+ conf_str += '{} = {};\n'.format(key, format_val(key, val))
+ return conf_str
+
+ @staticmethod
+ def write_block(block: RawBlock, depth: int = 0) -> str:
+ if block.block_name == "%url":
+ return '%url "{}"\n\n'.format(block.values['value'])
+
+ conf_str = ""
+ conf_str += GaneshaConfParser._indentation(depth)
+ conf_str += format(block.block_name)
+ conf_str += " {\n"
+ conf_str += GaneshaConfParser.write_block_body(block, depth + 1)
+ conf_str += GaneshaConfParser._indentation(depth)
+ conf_str += "}\n"
+ return conf_str
+
+
+class FSAL(object):
+ def __init__(self, name: str) -> None:
+ self.name = name
+
+ @classmethod
+ def from_dict(cls, fsal_dict: Dict[str, Any]) -> 'FSAL':
+ if fsal_dict.get('name') == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ return CephFSFSAL.from_dict(fsal_dict)
+ if fsal_dict.get('name') == NFS_GANESHA_SUPPORTED_FSALS[1]:
+ return RGWFSAL.from_dict(fsal_dict)
+ raise NFSInvalidOperation(f'Unknown FSAL {fsal_dict.get("name")}')
+
+ @classmethod
+ def from_fsal_block(cls, fsal_block: RawBlock) -> 'FSAL':
+ if fsal_block.values.get('name') == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ return CephFSFSAL.from_fsal_block(fsal_block)
+ if fsal_block.values.get('name') == NFS_GANESHA_SUPPORTED_FSALS[1]:
+ return RGWFSAL.from_fsal_block(fsal_block)
+ raise NFSInvalidOperation(f'Unknown FSAL {fsal_block.values.get("name")}')
+
+ def to_fsal_block(self) -> RawBlock:
+ raise NotImplementedError
+
+ def to_dict(self) -> Dict[str, Any]:
+ raise NotImplementedError
+
+
+class CephFSFSAL(FSAL):
+ def __init__(self,
+ name: str,
+ user_id: Optional[str] = None,
+ fs_name: Optional[str] = None,
+ sec_label_xattr: Optional[str] = None,
+ cephx_key: Optional[str] = None) -> None:
+ super().__init__(name)
+ assert name == 'CEPH'
+ self.fs_name = fs_name
+ self.user_id = user_id
+ self.sec_label_xattr = sec_label_xattr
+ self.cephx_key = cephx_key
+
+ @classmethod
+ def from_fsal_block(cls, fsal_block: RawBlock) -> 'CephFSFSAL':
+ return cls(fsal_block.values['name'],
+ fsal_block.values.get('user_id'),
+ fsal_block.values.get('filesystem'),
+ fsal_block.values.get('sec_label_xattr'),
+ fsal_block.values.get('secret_access_key'))
+
+ def to_fsal_block(self) -> RawBlock:
+ result = RawBlock('FSAL', values={'name': self.name})
+
+ if self.user_id:
+ result.values['user_id'] = self.user_id
+ if self.fs_name:
+ result.values['filesystem'] = self.fs_name
+ if self.sec_label_xattr:
+ result.values['sec_label_xattr'] = self.sec_label_xattr
+ if self.cephx_key:
+ result.values['secret_access_key'] = self.cephx_key
+ return result
+
+ @classmethod
+ def from_dict(cls, fsal_dict: Dict[str, Any]) -> 'CephFSFSAL':
+ return cls(fsal_dict['name'],
+ fsal_dict.get('user_id'),
+ fsal_dict.get('fs_name'),
+ fsal_dict.get('sec_label_xattr'),
+ fsal_dict.get('cephx_key'))
+
+ def to_dict(self) -> Dict[str, str]:
+ r = {'name': self.name}
+ if self.user_id:
+ r['user_id'] = self.user_id
+ if self.fs_name:
+ r['fs_name'] = self.fs_name
+ if self.sec_label_xattr:
+ r['sec_label_xattr'] = self.sec_label_xattr
+ return r
+
+
+class RGWFSAL(FSAL):
+ def __init__(self,
+ name: str,
+ user_id: Optional[str] = None,
+ access_key_id: Optional[str] = None,
+ secret_access_key: Optional[str] = None
+ ) -> None:
+ super().__init__(name)
+ assert name == 'RGW'
+ # RGW user uid
+ self.user_id = user_id
+ # S3 credentials
+ self.access_key_id = access_key_id
+ self.secret_access_key = secret_access_key
+
+ @classmethod
+ def from_fsal_block(cls, fsal_block: RawBlock) -> 'RGWFSAL':
+ return cls(fsal_block.values['name'],
+ fsal_block.values.get('user_id'),
+ fsal_block.values.get('access_key_id'),
+ fsal_block.values.get('secret_access_key'))
+
+ def to_fsal_block(self) -> RawBlock:
+ result = RawBlock('FSAL', values={'name': self.name})
+
+ if self.user_id:
+ result.values['user_id'] = self.user_id
+ if self.access_key_id:
+ result.values['access_key_id'] = self.access_key_id
+ if self.secret_access_key:
+ result.values['secret_access_key'] = self.secret_access_key
+ return result
+
+ @classmethod
+ def from_dict(cls, fsal_dict: Dict[str, str]) -> 'RGWFSAL':
+ return cls(fsal_dict['name'],
+ fsal_dict.get('user_id'),
+ fsal_dict.get('access_key_id'),
+ fsal_dict.get('secret_access_key'))
+
+ def to_dict(self) -> Dict[str, str]:
+ r = {'name': self.name}
+ if self.user_id:
+ r['user_id'] = self.user_id
+ if self.access_key_id:
+ r['access_key_id'] = self.access_key_id
+ if self.secret_access_key:
+ r['secret_access_key'] = self.secret_access_key
+ return r
+
+
+class Client:
+ def __init__(self,
+ addresses: List[str],
+ access_type: str,
+ squash: str):
+ self.addresses = addresses
+ self.access_type = access_type
+ self.squash = squash
+
+ @classmethod
+ def from_client_block(cls, client_block: RawBlock) -> 'Client':
+ addresses = client_block.values.get('clients', [])
+ if isinstance(addresses, str):
+ addresses = [addresses]
+ return cls(addresses,
+ client_block.values.get('access_type', None),
+ client_block.values.get('squash', None))
+
+ def to_client_block(self) -> RawBlock:
+ result = RawBlock('CLIENT', values={'clients': self.addresses})
+ if self.access_type:
+ result.values['access_type'] = self.access_type
+ if self.squash:
+ result.values['squash'] = self.squash
+ return result
+
+ @classmethod
+ def from_dict(cls, client_dict: Dict[str, Any]) -> 'Client':
+ return cls(client_dict['addresses'], client_dict['access_type'],
+ client_dict['squash'])
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ 'addresses': self.addresses,
+ 'access_type': self.access_type,
+ 'squash': self.squash
+ }
+
+
+class Export:
+ def __init__(
+ self,
+ export_id: int,
+ path: str,
+ cluster_id: str,
+ pseudo: str,
+ access_type: str,
+ squash: str,
+ security_label: bool,
+ protocols: List[int],
+ transports: List[str],
+ fsal: FSAL,
+ clients: Optional[List[Client]] = None) -> None:
+ self.export_id = export_id
+ self.path = path
+ self.fsal = fsal
+ self.cluster_id = cluster_id
+ self.pseudo = pseudo
+ self.access_type = access_type
+ self.squash = squash
+ self.attr_expiration_time = 0
+ self.security_label = security_label
+ self.protocols = protocols
+ self.transports = transports
+ self.clients: List[Client] = clients or []
+
+ @classmethod
+ def from_export_block(cls, export_block: RawBlock, cluster_id: str) -> 'Export':
+ fsal_blocks = [b for b in export_block.blocks
+ if b.block_name == "FSAL"]
+
+ client_blocks = [b for b in export_block.blocks
+ if b.block_name == "CLIENT"]
+
+ protocols = export_block.values.get('protocols')
+ if not isinstance(protocols, list):
+ protocols = [protocols]
+
+ transports = export_block.values.get('transports')
+ if isinstance(transports, str):
+ transports = [transports]
+ elif not transports:
+ transports = []
+
+ return cls(export_block.values['export_id'],
+ export_block.values['path'],
+ cluster_id,
+ export_block.values['pseudo'],
+ export_block.values.get('access_type', 'none'),
+ export_block.values.get('squash', 'no_root_squash'),
+ export_block.values.get('security_label', True),
+ protocols,
+ transports,
+ FSAL.from_fsal_block(fsal_blocks[0]),
+ [Client.from_client_block(client)
+ for client in client_blocks])
+
+ def to_export_block(self) -> RawBlock:
+ result = RawBlock('EXPORT', values={
+ 'export_id': self.export_id,
+ 'path': self.path,
+ 'pseudo': self.pseudo,
+ 'access_type': self.access_type,
+ 'squash': self.squash,
+ 'attr_expiration_time': self.attr_expiration_time,
+ 'security_label': self.security_label,
+ 'protocols': self.protocols,
+ 'transports': self.transports,
+ })
+ result.blocks = [
+ self.fsal.to_fsal_block()
+ ] + [
+ client.to_client_block()
+ for client in self.clients
+ ]
+ return result
+
+ @classmethod
+ def from_dict(cls, export_id: int, ex_dict: Dict[str, Any]) -> 'Export':
+ return cls(export_id,
+ ex_dict.get('path', '/'),
+ ex_dict['cluster_id'],
+ ex_dict['pseudo'],
+ ex_dict.get('access_type', 'RO'),
+ ex_dict.get('squash', 'no_root_squash'),
+ ex_dict.get('security_label', True),
+ ex_dict.get('protocols', [4]),
+ ex_dict.get('transports', ['TCP']),
+ FSAL.from_dict(ex_dict.get('fsal', {})),
+ [Client.from_dict(client) for client in ex_dict.get('clients', [])])
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ 'export_id': self.export_id,
+ 'path': self.path,
+ 'cluster_id': self.cluster_id,
+ 'pseudo': self.pseudo,
+ 'access_type': self.access_type,
+ 'squash': self.squash,
+ 'security_label': self.security_label,
+ 'protocols': sorted([p for p in self.protocols]),
+ 'transports': sorted([t for t in self.transports]),
+ 'fsal': self.fsal.to_dict(),
+ 'clients': [client.to_dict() for client in self.clients]
+ }
+
+ @staticmethod
+ def validate_access_type(access_type: str) -> None:
+ valid_access_types = ['rw', 'ro', 'none']
+ if not isinstance(access_type, str) or access_type.lower() not in valid_access_types:
+ raise NFSInvalidOperation(
+ f'{access_type} is invalid, valid access type are'
+ f'{valid_access_types}'
+ )
+
+ @staticmethod
+ def validate_squash(squash: str) -> None:
+ valid_squash_ls = [
+ "root", "root_squash", "rootsquash", "rootid", "root_id_squash",
+ "rootidsquash", "all", "all_squash", "allsquash", "all_anomnymous",
+ "allanonymous", "no_root_squash", "none", "noidsquash",
+ ]
+ if squash.lower() not in valid_squash_ls:
+ raise NFSInvalidOperation(
+ f"squash {squash} not in valid list {valid_squash_ls}"
+ )
+
+ def validate(self, mgr: 'Module') -> None:
+ if not isabs(self.pseudo) or self.pseudo == "/":
+ raise NFSInvalidOperation(
+ f"pseudo path {self.pseudo} is invalid. It should be an absolute "
+ "path and it cannot be just '/'."
+ )
+
+ self.validate_squash(self.squash)
+ self.validate_access_type(self.access_type)
+
+ if not isinstance(self.security_label, bool):
+ raise NFSInvalidOperation('security_label must be a boolean value')
+
+ for p in self.protocols:
+ if p not in [3, 4]:
+ raise NFSInvalidOperation(f"Invalid protocol {p}")
+
+ valid_transport = ["UDP", "TCP"]
+ for trans in self.transports:
+ if trans.upper() not in valid_transport:
+ raise NFSInvalidOperation(f'{trans} is not a valid transport protocol')
+
+ for client in self.clients:
+ if client.squash:
+ self.validate_squash(client.squash)
+ if client.access_type:
+ self.validate_access_type(client.access_type)
+
+ if self.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ fs = cast(CephFSFSAL, self.fsal)
+ if not fs.fs_name or not check_fs(mgr, fs.fs_name):
+ raise FSNotFound(fs.fs_name)
+ elif self.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]:
+ rgw = cast(RGWFSAL, self.fsal) # noqa
+ pass
+ else:
+ raise NFSInvalidOperation('FSAL {self.fsal.name} not supported')
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, Export):
+ return False
+ return self.to_dict() == other.to_dict()
diff --git a/src/pybind/mgr/nfs/module.py b/src/pybind/mgr/nfs/module.py
new file mode 100644
index 000000000..403f89f3a
--- /dev/null
+++ b/src/pybind/mgr/nfs/module.py
@@ -0,0 +1,154 @@
+import logging
+import threading
+from typing import Tuple, Optional, List, Dict, Any
+
+from mgr_module import MgrModule, CLICommand, Option, CLICheckNonemptyFileInput
+import orchestrator
+
+from .export import ExportMgr
+from .cluster import NFSCluster
+from .utils import available_clusters
+
+log = logging.getLogger(__name__)
+
+
+class Module(orchestrator.OrchestratorClientMixin, MgrModule):
+ MODULE_OPTIONS: List[Option] = []
+
+ def __init__(self, *args: str, **kwargs: Any) -> None:
+ self.inited = False
+ self.lock = threading.Lock()
+ super(Module, self).__init__(*args, **kwargs)
+ with self.lock:
+ self.export_mgr = ExportMgr(self)
+ self.nfs = NFSCluster(self)
+ self.inited = True
+
+ @CLICommand('nfs export create cephfs', perm='rw')
+ def _cmd_nfs_export_create_cephfs(
+ self,
+ cluster_id: str,
+ pseudo_path: str,
+ fsname: str,
+ path: Optional[str] = '/',
+ readonly: Optional[bool] = False,
+ client_addr: Optional[List[str]] = None,
+ squash: str = 'none',
+ ) -> Tuple[int, str, str]:
+ """Create a CephFS export"""
+ return self.export_mgr.create_export(fsal_type='cephfs', fs_name=fsname,
+ cluster_id=cluster_id, pseudo_path=pseudo_path,
+ read_only=readonly, path=path,
+ squash=squash, addr=client_addr)
+
+ @CLICommand('nfs export create rgw', perm='rw')
+ def _cmd_nfs_export_create_rgw(
+ self,
+ cluster_id: str,
+ pseudo_path: str,
+ bucket: Optional[str] = None,
+ user_id: Optional[str] = None,
+ readonly: Optional[bool] = False,
+ client_addr: Optional[List[str]] = None,
+ squash: str = 'none',
+ ) -> Tuple[int, str, str]:
+ """Create an RGW export"""
+ return self.export_mgr.create_export(fsal_type='rgw', bucket=bucket,
+ user_id=user_id,
+ cluster_id=cluster_id, pseudo_path=pseudo_path,
+ read_only=readonly, squash=squash,
+ addr=client_addr)
+
+ @CLICommand('nfs export rm', perm='rw')
+ def _cmd_nfs_export_rm(self, cluster_id: str, pseudo_path: str) -> Tuple[int, str, str]:
+ """Remove a cephfs export"""
+ return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
+
+ @CLICommand('nfs export delete', perm='rw')
+ def _cmd_nfs_export_delete(self, cluster_id: str, pseudo_path: str) -> Tuple[int, str, str]:
+ """Delete a cephfs export (DEPRECATED)"""
+ return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
+
+ @CLICommand('nfs export ls', perm='r')
+ def _cmd_nfs_export_ls(self, cluster_id: str, detailed: bool = False) -> Tuple[int, str, str]:
+ """List exports of a NFS cluster"""
+ return self.export_mgr.list_exports(cluster_id=cluster_id, detailed=detailed)
+
+ @CLICommand('nfs export info', perm='r')
+ def _cmd_nfs_export_info(self, cluster_id: str, pseudo_path: str) -> Tuple[int, str, str]:
+ """Fetch a export of a NFS cluster given the pseudo path/binding"""
+ return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
+
+ @CLICommand('nfs export get', perm='r')
+ def _cmd_nfs_export_get(self, cluster_id: str, pseudo_path: str) -> Tuple[int, str, str]:
+ """Fetch a export of a NFS cluster given the pseudo path/binding (DEPRECATED)"""
+ return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
+
+ @CLICommand('nfs export apply', perm='rw')
+ @CLICheckNonemptyFileInput(desc='Export JSON or Ganesha EXPORT specification')
+ def _cmd_nfs_export_apply(self, cluster_id: str, inbuf: str) -> Tuple[int, str, str]:
+ """Create or update an export by `-i <json_or_ganesha_export_file>`"""
+ return self.export_mgr.apply_export(cluster_id, export_config=inbuf)
+
+ @CLICommand('nfs cluster create', perm='rw')
+ def _cmd_nfs_cluster_create(self,
+ cluster_id: str,
+ placement: Optional[str] = None,
+ ingress: Optional[bool] = None,
+ virtual_ip: Optional[str] = None,
+ port: Optional[int] = None) -> Tuple[int, str, str]:
+ """Create an NFS Cluster"""
+ return self.nfs.create_nfs_cluster(cluster_id=cluster_id, placement=placement,
+ virtual_ip=virtual_ip, ingress=ingress,
+ port=port)
+
+ @CLICommand('nfs cluster rm', perm='rw')
+ def _cmd_nfs_cluster_rm(self, cluster_id: str) -> Tuple[int, str, str]:
+ """Removes an NFS Cluster"""
+ return self.nfs.delete_nfs_cluster(cluster_id=cluster_id)
+
+ @CLICommand('nfs cluster delete', perm='rw')
+ def _cmd_nfs_cluster_delete(self, cluster_id: str) -> Tuple[int, str, str]:
+ """Removes an NFS Cluster (DEPRECATED)"""
+ return self.nfs.delete_nfs_cluster(cluster_id=cluster_id)
+
+ @CLICommand('nfs cluster ls', perm='r')
+ def _cmd_nfs_cluster_ls(self) -> Tuple[int, str, str]:
+ """List NFS Clusters"""
+ return self.nfs.list_nfs_cluster()
+
+ @CLICommand('nfs cluster info', perm='r')
+ def _cmd_nfs_cluster_info(self, cluster_id: Optional[str] = None) -> Tuple[int, str, str]:
+ """Displays NFS Cluster info"""
+ return self.nfs.show_nfs_cluster_info(cluster_id=cluster_id)
+
+ @CLICommand('nfs cluster config get', perm='r')
+ def _cmd_nfs_cluster_config_get(self, cluster_id: str) -> Tuple[int, str, str]:
+ """Fetch NFS-Ganesha config"""
+ return self.nfs.get_nfs_cluster_config(cluster_id=cluster_id)
+
+ @CLICommand('nfs cluster config set', perm='rw')
+ @CLICheckNonemptyFileInput(desc='NFS-Ganesha Configuration')
+ def _cmd_nfs_cluster_config_set(self, cluster_id: str, inbuf: str) -> Tuple[int, str, str]:
+ """Set NFS-Ganesha config by `-i <config_file>`"""
+ return self.nfs.set_nfs_cluster_config(cluster_id=cluster_id, nfs_config=inbuf)
+
+ @CLICommand('nfs cluster config reset', perm='rw')
+ def _cmd_nfs_cluster_config_reset(self, cluster_id: str) -> Tuple[int, str, str]:
+ """Reset NFS-Ganesha Config to default"""
+ return self.nfs.reset_nfs_cluster_config(cluster_id=cluster_id)
+
+ def fetch_nfs_export_obj(self) -> ExportMgr:
+ return self.export_mgr
+
+ def export_ls(self) -> List[Dict[Any, Any]]:
+ return self.export_mgr.list_all_exports()
+
+ def export_get(self, cluster_id: str, export_id: int) -> Optional[Dict[str, Any]]:
+ return self.export_mgr.get_export_by_id(cluster_id, export_id)
+
+ def export_rm(self, cluster_id: str, pseudo: str) -> None:
+ self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo)
+
+ def cluster_ls(self) -> List[str]:
+ return available_clusters(self)
diff --git a/src/pybind/mgr/nfs/tests/__init__.py b/src/pybind/mgr/nfs/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/nfs/tests/__init__.py
diff --git a/src/pybind/mgr/nfs/tests/test_nfs.py b/src/pybind/mgr/nfs/tests/test_nfs.py
new file mode 100644
index 000000000..c31e5b889
--- /dev/null
+++ b/src/pybind/mgr/nfs/tests/test_nfs.py
@@ -0,0 +1,1036 @@
+# flake8: noqa
+import json
+import pytest
+from typing import Optional, Tuple, Iterator, List, Any
+
+from contextlib import contextmanager
+from unittest import mock
+from unittest.mock import MagicMock
+from mgr_module import MgrModule, NFS_POOL_NAME
+
+from rados import ObjectNotFound
+
+from ceph.deployment.service_spec import NFSServiceSpec
+from nfs import Module
+from nfs.export import ExportMgr, normalize_path
+from nfs.export_utils import GaneshaConfParser, Export, RawBlock
+from nfs.cluster import NFSCluster
+from orchestrator import ServiceDescription, DaemonDescription, OrchResult
+
+
+class TestNFS:
+ cluster_id = "foo"
+ export_1 = """
+EXPORT {
+ Export_ID=1;
+ Protocols = 4;
+ Path = /;
+ Pseudo = /cephfs_a/;
+ Access_Type = RW;
+ Protocols = 4;
+ Attr_Expiration_Time = 0;
+ # Squash = root;
+
+ FSAL {
+ Name = CEPH;
+ Filesystem = "a";
+ User_Id = "ganesha";
+ # Secret_Access_Key = "YOUR SECRET KEY HERE";
+ }
+
+ CLIENT
+ {
+ Clients = 192.168.0.10, 192.168.1.0/8;
+ Squash = None;
+ }
+
+ CLIENT
+ {
+ Clients = 192.168.0.0/16;
+ Squash = All;
+ Access_Type = RO;
+ }
+}
+"""
+
+ export_2 = """
+EXPORT
+{
+ Export_ID=2;
+ Path = "/";
+ Pseudo = "/rgw";
+ Access_Type = RW;
+ squash = AllAnonymous;
+ Protocols = 4, 3;
+ Transports = TCP, UDP;
+
+ FSAL {
+ Name = RGW;
+ User_Id = "nfs.foo.bucket";
+ Access_Key_Id ="the_access_key";
+ Secret_Access_Key = "the_secret_key";
+ }
+}
+"""
+ export_3 = """
+EXPORT {
+ FSAL {
+ name = "CEPH";
+ user_id = "nfs.foo.1";
+ filesystem = "a";
+ secret_access_key = "AQCjU+hgjyReLBAAddJa0Dza/ZHqjX5+JiePMA==";
+ }
+ export_id = 1;
+ path = "/";
+ pseudo = "/a";
+ access_type = "RW";
+ squash = "none";
+ attr_expiration_time = 0;
+ security_label = true;
+ protocols = 4;
+ transports = "TCP";
+}
+"""
+
+ conf_nfs_foo = f'''
+%url "rados://{NFS_POOL_NAME}/{cluster_id}/export-1"
+
+%url "rados://{NFS_POOL_NAME}/{cluster_id}/export-2"'''
+
+ class RObject(object):
+ def __init__(self, key: str, raw: str) -> None:
+ self.key = key
+ self.raw = raw
+
+ def read(self, _: Optional[int]) -> bytes:
+ return self.raw.encode('utf-8')
+
+ def stat(self) -> Tuple[int, None]:
+ return len(self.raw), None
+
+ def _ioctx_write_full_mock(self, key: str, content: bytes) -> None:
+ if key not in self.temp_store[self.temp_store_namespace]:
+ self.temp_store[self.temp_store_namespace][key] = \
+ TestNFS.RObject(key, content.decode('utf-8'))
+ else:
+ self.temp_store[self.temp_store_namespace][key].raw = content.decode('utf-8')
+
+ def _ioctx_remove_mock(self, key: str) -> None:
+ del self.temp_store[self.temp_store_namespace][key]
+
+ def _ioctx_list_objects_mock(self) -> List['TestNFS.RObject']:
+ r = [obj for _, obj in self.temp_store[self.temp_store_namespace].items()]
+ return r
+
+ def _ioctl_stat_mock(self, key):
+ return self.temp_store[self.temp_store_namespace][key].stat()
+
+ def _ioctl_read_mock(self, key: str, size: Optional[Any] = None) -> bytes:
+ if key not in self.temp_store[self.temp_store_namespace]:
+ raise ObjectNotFound
+ return self.temp_store[self.temp_store_namespace][key].read(size)
+
+ def _ioctx_set_namespace_mock(self, namespace: str) -> None:
+ self.temp_store_namespace = namespace
+
+ def _reset_temp_store(self) -> None:
+ self.temp_store_namespace = None
+ self.temp_store = {
+ 'foo': {
+ 'export-1': TestNFS.RObject("export-1", self.export_1),
+ 'export-2': TestNFS.RObject("export-2", self.export_2),
+ 'conf-nfs.foo': TestNFS.RObject("conf-nfs.foo", self.conf_nfs_foo)
+ }
+ }
+
+ @contextmanager
+ def _mock_orchestrator(self, enable: bool) -> Iterator:
+ self.io_mock = MagicMock()
+ self.io_mock.set_namespace.side_effect = self._ioctx_set_namespace_mock
+ self.io_mock.read = self._ioctl_read_mock
+ self.io_mock.stat = self._ioctl_stat_mock
+ self.io_mock.list_objects.side_effect = self._ioctx_list_objects_mock
+ self.io_mock.write_full.side_effect = self._ioctx_write_full_mock
+ self.io_mock.remove_object.side_effect = self._ioctx_remove_mock
+
+ # mock nfs services
+ orch_nfs_services = [
+ ServiceDescription(spec=NFSServiceSpec(service_id=self.cluster_id))
+ ] if enable else []
+
+ orch_nfs_daemons = [
+ DaemonDescription('nfs', 'foo.mydaemon', 'myhostname')
+ ] if enable else []
+
+ def mock_exec(cls, args):
+ if args[1:3] == ['bucket', 'stats']:
+ bucket_info = {
+ "owner": "bucket_owner_user",
+ }
+ return 0, json.dumps(bucket_info), ''
+ u = {
+ "user_id": "abc",
+ "display_name": "foo",
+ "email": "",
+ "suspended": 0,
+ "max_buckets": 1000,
+ "subusers": [],
+ "keys": [
+ {
+ "user": "abc",
+ "access_key": "the_access_key",
+ "secret_key": "the_secret_key"
+ }
+ ],
+ "swift_keys": [],
+ "caps": [],
+ "op_mask": "read, write, delete",
+ "default_placement": "",
+ "default_storage_class": "",
+ "placement_tags": [],
+ "bucket_quota": {
+ "enabled": False,
+ "check_on_raw": False,
+ "max_size": -1,
+ "max_size_kb": 0,
+ "max_objects": -1
+ },
+ "user_quota": {
+ "enabled": False,
+ "check_on_raw": False,
+ "max_size": -1,
+ "max_size_kb": 0,
+ "max_objects": -1
+ },
+ "temp_url_keys": [],
+ "type": "rgw",
+ "mfa_ids": []
+ }
+ if args[2] == 'list':
+ return 0, json.dumps([u]), ''
+ return 0, json.dumps(u), ''
+
+ def mock_describe_service(cls, *args, **kwargs):
+ if kwargs['service_type'] == 'nfs':
+ return OrchResult(orch_nfs_services)
+ return OrchResult([])
+
+ def mock_list_daemons(cls, *args, **kwargs):
+ if kwargs['daemon_type'] == 'nfs':
+ return OrchResult(orch_nfs_daemons)
+ return OrchResult([])
+
+ with mock.patch('nfs.module.Module.describe_service', mock_describe_service) as describe_service, \
+ mock.patch('nfs.module.Module.list_daemons', mock_list_daemons) as list_daemons, \
+ mock.patch('nfs.module.Module.rados') as rados, \
+ mock.patch('nfs.export.available_clusters',
+ return_value=[self.cluster_id]), \
+ mock.patch('nfs.export.restart_nfs_service'), \
+ mock.patch('nfs.cluster.restart_nfs_service'), \
+ mock.patch.object(MgrModule, 'tool_exec', mock_exec), \
+ mock.patch('nfs.export.check_fs', return_value=True), \
+ mock.patch('nfs.export_utils.check_fs', return_value=True), \
+ mock.patch('nfs.export.ExportMgr._create_user_key',
+ return_value='thekeyforclientabc'):
+
+ rados.open_ioctx.return_value.__enter__.return_value = self.io_mock
+ rados.open_ioctx.return_value.__exit__ = mock.Mock(return_value=None)
+
+ self._reset_temp_store()
+
+ yield
+
+ def test_parse_daemon_raw_config(self) -> None:
+ expected_daemon_config = [
+ RawBlock('NFS_CORE_PARAM', values={
+ "enable_nlm": False,
+ "enable_rquota": False,
+ "protocols": 4,
+ "nfs_port": 14000
+ }),
+ RawBlock('MDCACHE', values={
+ "dir_chunk": 0
+ }),
+ RawBlock('NFSV4', values={
+ "recoverybackend": "rados_cluster",
+ "minor_versions": [1, 2]
+ }),
+ RawBlock('RADOS_KV', values={
+ "pool": NFS_POOL_NAME,
+ "namespace": "vstart",
+ "userid": "vstart",
+ "nodeid": "a"
+ }),
+ RawBlock('RADOS_URLS', values={
+ "userid": "vstart",
+ "watch_url": f"'rados://{NFS_POOL_NAME}/vstart/conf-nfs.vstart'"
+ }),
+ RawBlock('%url', values={
+ "value": f"rados://{NFS_POOL_NAME}/vstart/conf-nfs.vstart"
+ })
+ ]
+ daemon_raw_config = """
+NFS_CORE_PARAM {
+ Enable_NLM = false;
+ Enable_RQUOTA = false;
+ Protocols = 4;
+ NFS_Port = 14000;
+ }
+
+ MDCACHE {
+ Dir_Chunk = 0;
+ }
+
+ NFSv4 {
+ RecoveryBackend = rados_cluster;
+ Minor_Versions = 1, 2;
+ }
+
+ RADOS_KV {
+ pool = {};
+ namespace = vstart;
+ UserId = vstart;
+ nodeid = a;
+ }
+
+ RADOS_URLS {
+ Userid = vstart;
+ watch_url = 'rados://{}/vstart/conf-nfs.vstart';
+ }
+
+ %url rados://{}/vstart/conf-nfs.vstart
+""".replace('{}', NFS_POOL_NAME)
+ daemon_config = GaneshaConfParser(daemon_raw_config).parse()
+ assert daemon_config == expected_daemon_config
+
+ def _validate_export_1(self, export: Export):
+ assert export.export_id == 1
+ assert export.path == "/"
+ assert export.pseudo == "/cephfs_a/"
+ assert export.access_type == "RW"
+ # assert export.squash == "root_squash" # probably correct value
+ assert export.squash == "no_root_squash"
+ assert export.protocols == [4]
+ # assert export.transports == {"TCP", "UDP"}
+ assert export.fsal.name == "CEPH"
+ assert export.fsal.user_id == "ganesha"
+ assert export.fsal.fs_name == "a"
+ assert export.fsal.sec_label_xattr == None
+ assert len(export.clients) == 2
+ assert export.clients[0].addresses == \
+ ["192.168.0.10", "192.168.1.0/8"]
+ # assert export.clients[0].squash == "no_root_squash" # probably correct value
+ assert export.clients[0].squash == "None"
+ assert export.clients[0].access_type is None
+ assert export.clients[1].addresses == ["192.168.0.0/16"]
+ # assert export.clients[1].squash == "all_squash" # probably correct value
+ assert export.clients[1].squash == "All"
+ assert export.clients[1].access_type == "RO"
+ assert export.cluster_id == 'foo'
+ assert export.attr_expiration_time == 0
+ # assert export.security_label == False # probably correct value
+ assert export.security_label == True
+
+ def test_export_parser_1(self) -> None:
+ blocks = GaneshaConfParser(self.export_1).parse()
+ assert isinstance(blocks, list)
+ assert len(blocks) == 1
+ export = Export.from_export_block(blocks[0], self.cluster_id)
+ self._validate_export_1(export)
+
+ def _validate_export_2(self, export: Export):
+ assert export.export_id == 2
+ assert export.path == "/"
+ assert export.pseudo == "/rgw"
+ assert export.access_type == "RW"
+ # assert export.squash == "all_squash" # probably correct value
+ assert export.squash == "AllAnonymous"
+ assert export.protocols == [4, 3]
+ assert set(export.transports) == {"TCP", "UDP"}
+ assert export.fsal.name == "RGW"
+ assert export.fsal.user_id == "nfs.foo.bucket"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 0
+ assert export.cluster_id == 'foo'
+
+ def test_export_parser_2(self) -> None:
+ blocks = GaneshaConfParser(self.export_2).parse()
+ assert isinstance(blocks, list)
+ assert len(blocks) == 1
+ export = Export.from_export_block(blocks[0], self.cluster_id)
+ self._validate_export_2(export)
+
+ def test_daemon_conf_parser(self) -> None:
+ blocks = GaneshaConfParser(self.conf_nfs_foo).parse()
+ assert isinstance(blocks, list)
+ assert len(blocks) == 2
+ assert blocks[0].block_name == "%url"
+ assert blocks[0].values['value'] == f"rados://{NFS_POOL_NAME}/{self.cluster_id}/export-1"
+ assert blocks[1].block_name == "%url"
+ assert blocks[1].values['value'] == f"rados://{NFS_POOL_NAME}/{self.cluster_id}/export-2"
+
+ def _do_mock_test(self, func) -> None:
+ with self._mock_orchestrator(True):
+ func()
+ self._reset_temp_store()
+
+ def test_ganesha_conf(self) -> None:
+ self._do_mock_test(self._do_test_ganesha_conf)
+
+ def _do_test_ganesha_conf(self) -> None:
+ nfs_mod = Module('nfs', '', '')
+ ganesha_conf = ExportMgr(nfs_mod)
+ exports = ganesha_conf.exports[self.cluster_id]
+
+ assert len(exports) == 2
+
+ self._validate_export_1([e for e in exports if e.export_id == 1][0])
+ self._validate_export_2([e for e in exports if e.export_id == 2][0])
+
+ def test_config_dict(self) -> None:
+ self._do_mock_test(self._do_test_config_dict)
+
+ def _do_test_config_dict(self) -> None:
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ export = [e for e in conf.exports['foo'] if e.export_id == 1][0]
+ ex_dict = export.to_dict()
+
+ assert ex_dict == {'access_type': 'RW',
+ 'clients': [{'access_type': None,
+ 'addresses': ['192.168.0.10', '192.168.1.0/8'],
+ 'squash': 'None'},
+ {'access_type': 'RO',
+ 'addresses': ['192.168.0.0/16'],
+ 'squash': 'All'}],
+ 'cluster_id': self.cluster_id,
+ 'export_id': 1,
+ 'fsal': {'fs_name': 'a', 'name': 'CEPH', 'user_id': 'ganesha'},
+ 'path': '/',
+ 'protocols': [4],
+ 'pseudo': '/cephfs_a/',
+ 'security_label': True,
+ 'squash': 'no_root_squash',
+ 'transports': []}
+
+ export = [e for e in conf.exports['foo'] if e.export_id == 2][0]
+ ex_dict = export.to_dict()
+ assert ex_dict == {'access_type': 'RW',
+ 'clients': [],
+ 'cluster_id': self.cluster_id,
+ 'export_id': 2,
+ 'fsal': {'name': 'RGW',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ 'user_id': 'nfs.foo.bucket'},
+ 'path': '/',
+ 'protocols': [3, 4],
+ 'pseudo': '/rgw',
+ 'security_label': True,
+ 'squash': 'AllAnonymous',
+ 'transports': ['TCP', 'UDP']}
+
+ def test_config_from_dict(self) -> None:
+ self._do_mock_test(self._do_test_config_from_dict)
+
+ def _do_test_config_from_dict(self) -> None:
+ export = Export.from_dict(1, {
+ 'export_id': 1,
+ 'path': '/',
+ 'cluster_id': self.cluster_id,
+ 'pseudo': '/cephfs_a',
+ 'access_type': 'RW',
+ 'squash': 'root_squash',
+ 'security_label': True,
+ 'protocols': [4],
+ 'transports': ['TCP', 'UDP'],
+ 'clients': [{
+ 'addresses': ["192.168.0.10", "192.168.1.0/8"],
+ 'access_type': None,
+ 'squash': 'no_root_squash'
+ }, {
+ 'addresses': ["192.168.0.0/16"],
+ 'access_type': 'RO',
+ 'squash': 'all_squash'
+ }],
+ 'fsal': {
+ 'name': 'CEPH',
+ 'user_id': 'ganesha',
+ 'fs_name': 'a',
+ 'sec_label_xattr': 'security.selinux'
+ }
+ })
+
+ assert export.export_id == 1
+ assert export.path == "/"
+ assert export.pseudo == "/cephfs_a"
+ assert export.access_type == "RW"
+ assert export.squash == "root_squash"
+ assert set(export.protocols) == {4}
+ assert set(export.transports) == {"TCP", "UDP"}
+ assert export.fsal.name == "CEPH"
+ assert export.fsal.user_id == "ganesha"
+ assert export.fsal.fs_name == "a"
+ assert export.fsal.sec_label_xattr == 'security.selinux'
+ assert len(export.clients) == 2
+ assert export.clients[0].addresses == \
+ ["192.168.0.10", "192.168.1.0/8"]
+ assert export.clients[0].squash == "no_root_squash"
+ assert export.clients[0].access_type is None
+ assert export.clients[1].addresses == ["192.168.0.0/16"]
+ assert export.clients[1].squash == "all_squash"
+ assert export.clients[1].access_type == "RO"
+ assert export.cluster_id == self.cluster_id
+ assert export.attr_expiration_time == 0
+ assert export.security_label
+
+ export = Export.from_dict(2, {
+ 'export_id': 2,
+ 'path': 'bucket',
+ 'pseudo': '/rgw',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RW',
+ 'squash': 'all_squash',
+ 'security_label': False,
+ 'protocols': [4, 3],
+ 'transports': ['TCP', 'UDP'],
+ 'clients': [],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'rgw.foo.bucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key'
+ }
+ })
+
+ assert export.export_id == 2
+ assert export.path == "bucket"
+ assert export.pseudo == "/rgw"
+ assert export.access_type == "RW"
+ assert export.squash == "all_squash"
+ assert set(export.protocols) == {4, 3}
+ assert set(export.transports) == {"TCP", "UDP"}
+ assert export.fsal.name == "RGW"
+ assert export.fsal.user_id == "rgw.foo.bucket"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 0
+ assert export.cluster_id == self.cluster_id
+
+ @pytest.mark.parametrize(
+ "block",
+ [
+ export_1,
+ export_2,
+ ]
+ )
+ def test_export_from_to_export_block(self, block):
+ blocks = GaneshaConfParser(block).parse()
+ export = Export.from_export_block(blocks[0], self.cluster_id)
+ newblock = export.to_export_block()
+ export2 = Export.from_export_block(newblock, self.cluster_id)
+ newblock2 = export2.to_export_block()
+ assert newblock == newblock2
+
+ @pytest.mark.parametrize(
+ "block",
+ [
+ export_1,
+ export_2,
+ ]
+ )
+ def test_export_from_to_dict(self, block):
+ blocks = GaneshaConfParser(block).parse()
+ export = Export.from_export_block(blocks[0], self.cluster_id)
+ j = export.to_dict()
+ export2 = Export.from_dict(j['export_id'], j)
+ j2 = export2.to_dict()
+ assert j == j2
+
+ @pytest.mark.parametrize(
+ "block",
+ [
+ export_1,
+ export_2,
+ ]
+ )
+ def test_export_validate(self, block):
+ blocks = GaneshaConfParser(block).parse()
+ export = Export.from_export_block(blocks[0], self.cluster_id)
+ nfs_mod = Module('nfs', '', '')
+ with mock.patch('nfs.export_utils.check_fs', return_value=True):
+ export.validate(nfs_mod)
+
+ def test_update_export(self):
+ self._do_mock_test(self._do_test_update_export)
+
+ def _do_test_update_export(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ r = conf.apply_export(self.cluster_id, json.dumps({
+ 'export_id': 2,
+ 'path': 'bucket',
+ 'pseudo': '/rgw/bucket',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RW',
+ 'squash': 'all_squash',
+ 'security_label': False,
+ 'protocols': [4, 3],
+ 'transports': ['TCP', 'UDP'],
+ 'clients': [{
+ 'addresses': ["192.168.0.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.bucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ }))
+ assert r[0] == 0
+
+ export = conf._fetch_export('foo', '/rgw/bucket')
+ assert export.export_id == 2
+ assert export.path == "bucket"
+ assert export.pseudo == "/rgw/bucket"
+ assert export.access_type == "RW"
+ assert export.squash == "all_squash"
+ assert export.protocols == [4, 3]
+ assert export.transports == ["TCP", "UDP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash is None
+ assert export.clients[0].access_type is None
+ assert export.cluster_id == self.cluster_id
+
+ # do it again, with changes
+ r = conf.apply_export(self.cluster_id, json.dumps({
+ 'export_id': 2,
+ 'path': 'newbucket',
+ 'pseudo': '/rgw/bucket',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RO',
+ 'squash': 'root',
+ 'security_label': False,
+ 'protocols': [4],
+ 'transports': ['TCP'],
+ 'clients': [{
+ 'addresses': ["192.168.10.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.newbucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ }))
+ assert r[0] == 0
+
+ export = conf._fetch_export('foo', '/rgw/bucket')
+ assert export.export_id == 2
+ assert export.path == "newbucket"
+ assert export.pseudo == "/rgw/bucket"
+ assert export.access_type == "RO"
+ assert export.squash == "root"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash is None
+ assert export.clients[0].access_type is None
+ assert export.cluster_id == self.cluster_id
+
+ # again, but without export_id
+ r = conf.apply_export(self.cluster_id, json.dumps({
+ 'path': 'newestbucket',
+ 'pseudo': '/rgw/bucket',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RW',
+ 'squash': 'root',
+ 'security_label': False,
+ 'protocols': [4],
+ 'transports': ['TCP'],
+ 'clients': [{
+ 'addresses': ["192.168.10.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.newestbucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ }))
+ assert r[0] == 0
+
+ export = conf._fetch_export(self.cluster_id, '/rgw/bucket')
+ assert export.export_id == 2
+ assert export.path == "newestbucket"
+ assert export.pseudo == "/rgw/bucket"
+ assert export.access_type == "RW"
+ assert export.squash == "root"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash is None
+ assert export.clients[0].access_type is None
+ assert export.cluster_id == self.cluster_id
+
+ def test_update_export_with_ganesha_conf(self):
+ self._do_mock_test(self._do_test_update_export_with_ganesha_conf)
+
+ def _do_test_update_export_with_ganesha_conf(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ r = conf.apply_export(self.cluster_id, self.export_3)
+ assert r[0] == 0
+
+ def test_update_export_with_list(self):
+ self._do_mock_test(self._do_test_update_export_with_list)
+
+ def _do_test_update_export_with_list(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ r = conf.apply_export(self.cluster_id, json.dumps([
+ {
+ 'path': 'bucket',
+ 'pseudo': '/rgw/bucket',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RW',
+ 'squash': 'root',
+ 'security_label': False,
+ 'protocols': [4],
+ 'transports': ['TCP'],
+ 'clients': [{
+ 'addresses': ["192.168.0.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.bucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ },
+ {
+ 'path': 'bucket2',
+ 'pseudo': '/rgw/bucket2',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RO',
+ 'squash': 'root',
+ 'security_label': False,
+ 'protocols': [4],
+ 'transports': ['TCP'],
+ 'clients': [{
+ 'addresses': ["192.168.0.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.bucket2',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ },
+ ]))
+ assert r[0] == 0
+
+ export = conf._fetch_export('foo', '/rgw/bucket')
+ assert export.export_id == 3
+ assert export.path == "bucket"
+ assert export.pseudo == "/rgw/bucket"
+ assert export.access_type == "RW"
+ assert export.squash == "root"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash is None
+ assert export.clients[0].access_type is None
+ assert export.cluster_id == self.cluster_id
+
+ export = conf._fetch_export('foo', '/rgw/bucket2')
+ assert export.export_id == 4
+ assert export.path == "bucket2"
+ assert export.pseudo == "/rgw/bucket2"
+ assert export.access_type == "RO"
+ assert export.squash == "root"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash is None
+ assert export.clients[0].access_type is None
+ assert export.cluster_id == self.cluster_id
+
+ def test_remove_export(self) -> None:
+ self._do_mock_test(self._do_test_remove_export)
+
+ def _do_test_remove_export(self) -> None:
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ assert len(conf.exports[self.cluster_id]) == 2
+ assert conf.delete_export(cluster_id=self.cluster_id,
+ pseudo_path="/rgw") == (0, "Successfully deleted export", "")
+ exports = conf.exports[self.cluster_id]
+ assert len(exports) == 1
+ assert exports[0].export_id == 1
+
+ def test_create_export_rgw_bucket(self):
+ self._do_mock_test(self._do_test_create_export_rgw_bucket)
+
+ def _do_test_create_export_rgw_bucket(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+
+ exports = conf.list_exports(cluster_id=self.cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 2
+
+ r = conf.create_export(
+ fsal_type='rgw',
+ cluster_id=self.cluster_id,
+ bucket='bucket',
+ pseudo_path='/mybucket',
+ read_only=False,
+ squash='root',
+ addr=["192.168.0.0/16"]
+ )
+ assert r[0] == 0
+
+ exports = conf.list_exports(cluster_id=self.cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 3
+
+ export = conf._fetch_export('foo', '/mybucket')
+ assert export.export_id
+ assert export.path == "bucket"
+ assert export.pseudo == "/mybucket"
+ assert export.access_type == "none"
+ assert export.squash == "none"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.user_id == "bucket_owner_user"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash == 'root'
+ assert export.clients[0].access_type == 'rw'
+ assert export.clients[0].addresses == ["192.168.0.0/16"]
+ assert export.cluster_id == self.cluster_id
+
+ def test_create_export_rgw_bucket_user(self):
+ self._do_mock_test(self._do_test_create_export_rgw_bucket_user)
+
+ def _do_test_create_export_rgw_bucket_user(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+
+ exports = conf.list_exports(cluster_id=self.cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 2
+
+ r = conf.create_export(
+ fsal_type='rgw',
+ cluster_id=self.cluster_id,
+ bucket='bucket',
+ user_id='other_user',
+ pseudo_path='/mybucket',
+ read_only=False,
+ squash='root',
+ addr=["192.168.0.0/16"]
+ )
+ assert r[0] == 0
+
+ exports = conf.list_exports(cluster_id=self.cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 3
+
+ export = conf._fetch_export('foo', '/mybucket')
+ assert export.export_id
+ assert export.path == "bucket"
+ assert export.pseudo == "/mybucket"
+ assert export.access_type == "none"
+ assert export.squash == "none"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash == 'root'
+ assert export.fsal.user_id == "other_user"
+ assert export.clients[0].access_type == 'rw'
+ assert export.clients[0].addresses == ["192.168.0.0/16"]
+ assert export.cluster_id == self.cluster_id
+
+ def test_create_export_rgw_user(self):
+ self._do_mock_test(self._do_test_create_export_rgw_user)
+
+ def _do_test_create_export_rgw_user(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+
+ exports = conf.list_exports(cluster_id=self.cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 2
+
+ r = conf.create_export(
+ fsal_type='rgw',
+ cluster_id=self.cluster_id,
+ user_id='some_user',
+ pseudo_path='/mybucket',
+ read_only=False,
+ squash='root',
+ addr=["192.168.0.0/16"]
+ )
+ assert r[0] == 0
+
+ exports = conf.list_exports(cluster_id=self.cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 3
+
+ export = conf._fetch_export('foo', '/mybucket')
+ assert export.export_id
+ assert export.path == "/"
+ assert export.pseudo == "/mybucket"
+ assert export.access_type == "none"
+ assert export.squash == "none"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "RGW"
+ assert export.fsal.access_key_id == "the_access_key"
+ assert export.fsal.secret_access_key == "the_secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash == 'root'
+ assert export.fsal.user_id == "some_user"
+ assert export.clients[0].access_type == 'rw'
+ assert export.clients[0].addresses == ["192.168.0.0/16"]
+ assert export.cluster_id == self.cluster_id
+
+ def test_create_export_cephfs(self):
+ self._do_mock_test(self._do_test_create_export_cephfs)
+
+ def _do_test_create_export_cephfs(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+
+ exports = conf.list_exports(cluster_id=self.cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 2
+
+ r = conf.create_export(
+ fsal_type='cephfs',
+ cluster_id=self.cluster_id,
+ fs_name='myfs',
+ path='/',
+ pseudo_path='/cephfs2',
+ read_only=False,
+ squash='root',
+ addr=["192.168.1.0/8"],
+ )
+ assert r[0] == 0
+
+ exports = conf.list_exports(cluster_id=self.cluster_id)
+ ls = json.loads(exports[1])
+ assert len(ls) == 3
+
+ export = conf._fetch_export('foo', '/cephfs2')
+ assert export.export_id
+ assert export.path == "/"
+ assert export.pseudo == "/cephfs2"
+ assert export.access_type == "none"
+ assert export.squash == "none"
+ assert export.protocols == [4]
+ assert export.transports == ["TCP"]
+ assert export.fsal.name == "CEPH"
+ assert export.fsal.user_id == "nfs.foo.3"
+ assert export.fsal.cephx_key == "thekeyforclientabc"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash == 'root'
+ assert export.clients[0].access_type == 'rw'
+ assert export.clients[0].addresses == ["192.168.1.0/8"]
+ assert export.cluster_id == self.cluster_id
+
+ def _do_test_cluster_ls(self):
+ nfs_mod = Module('nfs', '', '')
+ cluster = NFSCluster(nfs_mod)
+
+ rc, out, err = cluster.list_nfs_cluster()
+ assert rc == 0
+ assert out == self.cluster_id
+
+ def test_cluster_ls(self):
+ self._do_mock_test(self._do_test_cluster_ls)
+
+ def _do_test_cluster_info(self):
+ nfs_mod = Module('nfs', '', '')
+ cluster = NFSCluster(nfs_mod)
+
+ rc, out, err = cluster.show_nfs_cluster_info(self.cluster_id)
+ assert rc == 0
+ assert json.loads(out) == {"foo": {"virtual_ip": None, "backend": []}}
+
+ def test_cluster_info(self):
+ self._do_mock_test(self._do_test_cluster_info)
+
+ def _do_test_cluster_config(self):
+ nfs_mod = Module('nfs', '', '')
+ cluster = NFSCluster(nfs_mod)
+
+ rc, out, err = cluster.get_nfs_cluster_config(self.cluster_id)
+ assert rc == 0
+ assert out == ""
+
+ rc, out, err = cluster.set_nfs_cluster_config(self.cluster_id, '# foo\n')
+ assert rc == 0
+
+ rc, out, err = cluster.get_nfs_cluster_config(self.cluster_id)
+ assert rc == 0
+ assert out == "# foo\n"
+
+ rc, out, err = cluster.reset_nfs_cluster_config(self.cluster_id)
+ assert rc == 0
+
+ rc, out, err = cluster.get_nfs_cluster_config(self.cluster_id)
+ assert rc == 0
+ assert out == ""
+
+ def test_cluster_config(self):
+ self._do_mock_test(self._do_test_cluster_config)
+
+
+@pytest.mark.parametrize(
+ "path,expected",
+ [
+ ("/foo/bar/baz", "/foo/bar/baz"),
+ ("/foo/bar/baz/", "/foo/bar/baz"),
+ ("/foo/bar/baz ", "/foo/bar/baz"),
+ ("/foo/./bar/baz", "/foo/bar/baz"),
+ ("/foo/bar/baz/..", "/foo/bar"),
+ ("//foo/bar/baz", "/foo/bar/baz"),
+ ("", ""),
+ ]
+)
+def test_normalize_path(path, expected):
+ assert normalize_path(path) == expected
diff --git a/src/pybind/mgr/nfs/utils.py b/src/pybind/mgr/nfs/utils.py
new file mode 100644
index 000000000..ac857d6d9
--- /dev/null
+++ b/src/pybind/mgr/nfs/utils.py
@@ -0,0 +1,59 @@
+from typing import List, TYPE_CHECKING
+
+import orchestrator
+
+if TYPE_CHECKING:
+ from nfs.module import Module
+
+EXPORT_PREFIX: str = "export-"
+CONF_PREFIX: str = "conf-nfs."
+USER_CONF_PREFIX: str = "userconf-nfs."
+
+
+def export_obj_name(export_id: int) -> str:
+ """Return a rados object name for the export."""
+ return f"{EXPORT_PREFIX}{export_id}"
+
+
+def conf_obj_name(cluster_id: str) -> str:
+ """Return a rados object name for the config."""
+ return f"{CONF_PREFIX}{cluster_id}"
+
+
+def user_conf_obj_name(cluster_id: str) -> str:
+ """Returna a rados object name for the user config."""
+ return f"{USER_CONF_PREFIX}{cluster_id}"
+
+
+def available_clusters(mgr: 'Module') -> List[str]:
+ '''
+ This method returns list of available cluster ids.
+ Service name is service_type.service_id
+ Example:
+ completion.result value:
+ <ServiceDescription of <NFSServiceSpec for service_name=nfs.vstart>>
+ return value: ['vstart']
+ '''
+ # TODO check cephadm cluster list with rados pool conf objects
+ completion = mgr.describe_service(service_type='nfs')
+ orchestrator.raise_if_exception(completion)
+ assert completion.result is not None
+ return [cluster.spec.service_id for cluster in completion.result
+ if cluster.spec.service_id]
+
+
+def restart_nfs_service(mgr: 'Module', cluster_id: str) -> None:
+ '''
+ This methods restarts the nfs daemons
+ '''
+ completion = mgr.service_action(action='restart',
+ service_name='nfs.' + cluster_id)
+ orchestrator.raise_if_exception(completion)
+
+
+def check_fs(mgr: 'Module', fs_name: str) -> bool:
+ '''
+ This method checks if given fs is valid
+ '''
+ fs_map = mgr.get('fs_map')
+ return fs_name in [fs['mdsmap']['fs_name'] for fs in fs_map['filesystems']]