summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/volumes/fs/operations/volume.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/volumes/fs/operations/volume.py')
-rw-r--r--src/pybind/mgr/volumes/fs/operations/volume.py348
1 files changed, 348 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/operations/volume.py b/src/pybind/mgr/volumes/fs/operations/volume.py
new file mode 100644
index 00000000..410e5c44
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/volume.py
@@ -0,0 +1,348 @@
+import time
+import errno
+import logging
+import sys
+
+try:
+ from typing import List
+except ImportError:
+ pass # For typing only
+
+from contextlib import contextmanager
+from threading import Lock, Condition
+
+if sys.version_info >= (3, 3):
+ from threading import Timer
+else:
+ from threading import _Timer as Timer
+
+import cephfs
+import orchestrator
+
+from .lock import GlobalLock
+from ..exception import VolumeException
+from ..fs_util import create_pool, remove_pool, create_filesystem, \
+ remove_filesystem, create_mds, volume_exists
+
+log = logging.getLogger(__name__)
+
+class ConnectionPool(object):
+ class Connection(object):
+ def __init__(self, mgr, fs_name):
+ self.fs = None
+ self.mgr = mgr
+ self.fs_name = fs_name
+ self.ops_in_progress = 0
+ self.last_used = time.time()
+ self.fs_id = self.get_fs_id()
+
+ def get_fs_id(self):
+ fs_map = self.mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ if fs['mdsmap']['fs_name'] == self.fs_name:
+ return fs['id']
+ raise VolumeException(
+ -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
+
+ def get_fs_handle(self):
+ self.last_used = time.time()
+ self.ops_in_progress += 1
+ return self.fs
+
+ def put_fs_handle(self, notify):
+ assert self.ops_in_progress > 0
+ self.ops_in_progress -= 1
+ if self.ops_in_progress == 0:
+ notify()
+
+ def del_fs_handle(self, waiter):
+ if waiter:
+ while self.ops_in_progress != 0:
+ waiter()
+ if self.is_connection_valid():
+ self.disconnect()
+ else:
+ self.abort()
+
+ def is_connection_valid(self):
+ fs_id = None
+ try:
+ fs_id = self.get_fs_id()
+ except:
+ # the filesystem does not exist now -- connection is not valid.
+ pass
+ log.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
+ return self.fs_id == fs_id
+
+ def is_connection_idle(self, timeout):
+ return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
+
+ def connect(self):
+ assert self.ops_in_progress == 0
+ log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
+ self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
+ log.debug("Setting user ID and group ID of CephFS mount as root...")
+ self.fs.conf_set("client_mount_uid", "0")
+ self.fs.conf_set("client_mount_gid", "0")
+ log.debug("CephFS initializing...")
+ self.fs.init()
+ log.debug("CephFS mounting...")
+ self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
+ log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
+
+ def disconnect(self):
+ try:
+ assert self.fs
+ assert self.ops_in_progress == 0
+ log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
+ self.fs.shutdown()
+ self.fs = None
+ except Exception as e:
+ log.debug("disconnect: ({0})".format(e))
+ raise
+
+ def abort(self):
+ assert self.fs
+ assert self.ops_in_progress == 0
+ log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
+ self.fs.abort_conn()
+ log.info("abort done from cephfs '{0}'".format(self.fs_name))
+ self.fs = None
+
+ class RTimer(Timer):
+ """
+ recurring timer variant of Timer
+ """
+ def run(self):
+ try:
+ while not self.finished.is_set():
+ self.finished.wait(self.interval)
+ self.function(*self.args, **self.kwargs)
+ self.finished.set()
+ except Exception as e:
+ log.error("ConnectionPool.RTimer: %s", e)
+ raise
+
+ # TODO: make this configurable
+ TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
+ CONNECTION_IDLE_INTERVAL = 60.0 # seconds
+
+ def __init__(self, mgr):
+ self.mgr = mgr
+ self.connections = {}
+ self.lock = Lock()
+ self.cond = Condition(self.lock)
+ self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
+ self.cleanup_connections)
+ self.timer_task.start()
+
+ def cleanup_connections(self):
+ with self.lock:
+ log.info("scanning for idle connections..")
+ idle_fs = [fs_name for fs_name,conn in self.connections.items()
+ if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
+ for fs_name in idle_fs:
+ log.info("cleaning up connection for '{}'".format(fs_name))
+ self._del_fs_handle(fs_name)
+
+ def get_fs_handle(self, fs_name):
+ with self.lock:
+ conn = None
+ try:
+ conn = self.connections.get(fs_name, None)
+ if conn:
+ if conn.is_connection_valid():
+ return conn.get_fs_handle()
+ else:
+ # filesystem id changed beneath us (or the filesystem does not exist).
+ # this is possible if the filesystem got removed (and recreated with
+ # same name) via "ceph fs rm/new" mon command.
+ log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
+ self._del_fs_handle(fs_name)
+ conn = ConnectionPool.Connection(self.mgr, fs_name)
+ conn.connect()
+ except cephfs.Error as e:
+ # try to provide a better error string if possible
+ if e.args[0] == errno.ENOENT:
+ raise VolumeException(
+ -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
+ raise VolumeException(-e.args[0], e.args[1])
+ self.connections[fs_name] = conn
+ return conn.get_fs_handle()
+
+ def put_fs_handle(self, fs_name):
+ with self.lock:
+ conn = self.connections.get(fs_name, None)
+ if conn:
+ conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
+
+ def _del_fs_handle(self, fs_name, wait=False):
+ conn = self.connections.pop(fs_name, None)
+ if conn:
+ conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
+
+ def del_fs_handle(self, fs_name, wait=False):
+ with self.lock:
+ self._del_fs_handle(fs_name, wait)
+
+ def del_all_handles(self):
+ with self.lock:
+ for fs_name in list(self.connections.keys()):
+ log.info("waiting for pending ops for '{}'".format(fs_name))
+ self._del_fs_handle(fs_name, wait=True)
+ log.info("pending ops completed for '{}'".format(fs_name))
+ # no new connections should have been initialized since its
+ # guarded on shutdown.
+ assert len(self.connections) == 0
+
+def gen_pool_names(volname):
+ """
+ return metadata and data pool name (from a filesystem/volume name) as a tuple
+ """
+ return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
+
+def get_mds_map(mgr, volname):
+ """
+ return mdsmap for a volname
+ """
+ mds_map = None
+ fs_map = mgr.get("fs_map")
+ for f in fs_map['filesystems']:
+ if volname == f['mdsmap']['fs_name']:
+ return f['mdsmap']
+ return mds_map
+
+def get_pool_names(mgr, volname):
+ """
+ return metadata and data pools (list) names of volume as a tuple
+ """
+ fs_map = mgr.get("fs_map")
+ metadata_pool_id = None
+ data_pool_ids = [] # type: List[int]
+ for f in fs_map['filesystems']:
+ if volname == f['mdsmap']['fs_name']:
+ metadata_pool_id = f['mdsmap']['metadata_pool']
+ data_pool_ids = f['mdsmap']['data_pools']
+ break
+ if metadata_pool_id is None:
+ return None, None
+
+ osdmap = mgr.get("osd_map")
+ pools = dict([(p['pool'], p['pool_name']) for p in osdmap['pools']])
+ metadata_pool = pools[metadata_pool_id]
+ data_pools = [pools[id] for id in data_pool_ids]
+ return metadata_pool, data_pools
+
+def create_volume(mgr, volname):
+ """
+ create volume (pool, filesystem and mds)
+ """
+ metadata_pool, data_pool = gen_pool_names(volname)
+ # create pools
+ r, outs, outb = create_pool(mgr, metadata_pool, 16)
+ if r != 0:
+ return r, outb, outs
+ r, outb, outs = create_pool(mgr, data_pool, 8)
+ if r != 0:
+ #cleanup
+ remove_pool(mgr, metadata_pool)
+ return r, outb, outs
+ # create filesystem
+ r, outb, outs = create_filesystem(mgr, volname, metadata_pool, data_pool)
+ if r != 0:
+ log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs))
+ #cleanup
+ remove_pool(mgr, data_pool)
+ remove_pool(mgr, metadata_pool)
+ return r, outb, outs
+ # create mds
+ return create_mds(mgr, volname)
+
+def delete_volume(mgr, volname, metadata_pool, data_pools):
+ """
+ delete the given module (tear down mds, remove filesystem, remove pools)
+ """
+ # Tear down MDS daemons
+ try:
+ completion = mgr.remove_stateless_service("mds", volname)
+ mgr._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
+ except (ImportError, orchestrator.OrchestratorError):
+ log.warning("OrchestratorError, not tearing down MDS daemons")
+ except Exception as e:
+ # Don't let detailed orchestrator exceptions (python backtraces)
+ # bubble out to the user
+ log.exception("Failed to tear down MDS daemons")
+ return -errno.EINVAL, "", str(e)
+
+ # In case orchestrator didn't tear down MDS daemons cleanly, or
+ # there was no orchestrator, we force the daemons down.
+ if volume_exists(mgr, volname):
+ r, outb, outs = remove_filesystem(mgr, volname)
+ if r != 0:
+ return r, outb, outs
+ else:
+ err = "Filesystem not found for volume '{0}'".format(volname)
+ log.warning(err)
+ return -errno.ENOENT, "", err
+ r, outb, outs = remove_pool(mgr, metadata_pool)
+ if r != 0:
+ return r, outb, outs
+
+ for data_pool in data_pools:
+ r, outb, outs = remove_pool(mgr, data_pool)
+ if r != 0:
+ return r, outb, outs
+ result_str = "metadata pool: {0} data pool: {1} removed".format(metadata_pool, str(data_pools))
+ return r, result_str, ""
+
+def list_volumes(mgr):
+ """
+ list all filesystem volumes.
+
+ :param: None
+ :return: None
+ """
+ result = []
+ fs_map = mgr.get("fs_map")
+ for f in fs_map['filesystems']:
+ result.append({'name': f['mdsmap']['fs_name']})
+ return result
+
+@contextmanager
+def open_volume(vc, volname):
+ """
+ open a volume for exclusive access. This API is to be used as a context manager.
+
+ :param vc: volume client instance
+ :param volname: volume name
+ :return: yields a volume handle (ceph filesystem handle)
+ """
+ if vc.is_stopping():
+ raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
+
+ g_lock = GlobalLock()
+ fs_handle = vc.connection_pool.get_fs_handle(volname)
+ try:
+ with g_lock.lock_op():
+ yield fs_handle
+ finally:
+ vc.connection_pool.put_fs_handle(volname)
+
+@contextmanager
+def open_volume_lockless(vc, volname):
+ """
+ open a volume with shared access. This API is to be used as a context manager.
+
+ :param vc: volume client instance
+ :param volname: volume name
+ :return: yields a volume handle (ceph filesystem handle)
+ """
+ if vc.is_stopping():
+ raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
+
+ fs_handle = vc.connection_pool.get_fs_handle(volname)
+ try:
+ yield fs_handle
+ finally:
+ vc.connection_pool.put_fs_handle(volname)