summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/mirroring/fs/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/mirroring/fs/utils.py')
-rw-r--r--src/pybind/mgr/mirroring/fs/utils.py152
1 files changed, 152 insertions, 0 deletions
diff --git a/src/pybind/mgr/mirroring/fs/utils.py b/src/pybind/mgr/mirroring/fs/utils.py
new file mode 100644
index 000000000..5e1d05373
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/utils.py
@@ -0,0 +1,152 @@
+import errno
+import logging
+import threading
+
+import rados
+import cephfs
+
+from .exception import MirrorException
+
+MIRROR_OBJECT_PREFIX = 'cephfs_mirror'
+MIRROR_OBJECT_NAME = MIRROR_OBJECT_PREFIX
+
+INSTANCE_ID_PREFIX = "instance_"
+DIRECTORY_MAP_PREFIX = "dir_map_"
+
+log = logging.getLogger(__name__)
+
+def connect_to_cluster(client_name, cluster_name, conf_dct, desc=''):
+ try:
+ log.debug(f'connecting to {desc} cluster: {client_name}/{cluster_name}')
+ mon_host = conf_dct.get('mon_host', '')
+ cephx_key = conf_dct.get('key', '')
+ if mon_host and cephx_key:
+ r_rados = rados.Rados(rados_id=client_name, conf={'mon_host': mon_host,
+ 'key': cephx_key})
+ else:
+ r_rados = rados.Rados(rados_id=client_name, clustername=cluster_name)
+ r_rados.conf_read_file()
+ r_rados.connect()
+ log.debug(f'connected to {desc} cluster')
+ return r_rados
+ except rados.Error as e:
+ if e.errno == errno.ENOENT:
+ raise MirrorException(-e.errno, f'cluster {cluster_name} does not exist')
+ else:
+ log.error(f'error connecting to cluster: {e}')
+ raise Exception(-e.errno)
+
+def disconnect_from_cluster(cluster_name, cluster):
+ try:
+ log.debug(f'disconnecting from cluster {cluster_name}')
+ cluster.shutdown()
+ log.debug(f'disconnected from cluster {cluster_name}')
+ except Exception as e:
+ log.error(f'error disconnecting: {e}')
+
+def connect_to_filesystem(client_name, cluster_name, fs_name, desc, conf_dct={}):
+ try:
+ cluster = connect_to_cluster(client_name, cluster_name, conf_dct, desc)
+ log.debug(f'connecting to {desc} filesystem: {fs_name}')
+ fs = cephfs.LibCephFS(rados_inst=cluster)
+ fs.conf_set("client_mount_uid", "0")
+ fs.conf_set("client_mount_gid", "0")
+ fs.conf_set("client_check_pool_perm", "false")
+ log.debug('CephFS initializing...')
+ fs.init()
+ log.debug('CephFS mounting...')
+ fs.mount(filesystem_name=fs_name.encode('utf-8'))
+ log.debug(f'Connection to cephfs {fs_name} complete')
+ return (cluster, fs)
+ except cephfs.Error as e:
+ if e.errno == errno.ENOENT:
+ raise MirrorException(-e.errno, f'filesystem {fs_name} does not exist')
+ else:
+ log.error(f'error connecting to filesystem {fs_name}: {e}')
+ raise Exception(-e.errno)
+
+def disconnect_from_filesystem(cluster_name, fs_name, cluster, fs_handle):
+ try:
+ log.debug(f'disconnecting from filesystem {fs_name}')
+ fs_handle.shutdown()
+ log.debug(f'disconnected from filesystem {fs_name}')
+ disconnect_from_cluster(cluster_name, cluster)
+ except Exception as e:
+ log.error(f'error disconnecting: {e}')
+
+class _ThreadWrapper(threading.Thread):
+ def __init__(self, name):
+ self.q = []
+ self.stopping = threading.Event()
+ self.terminated = threading.Event()
+ self.lock = threading.Lock()
+ self.cond = threading.Condition(self.lock)
+ super().__init__(name=name)
+ super().start()
+
+ def run(self):
+ try:
+ with self.lock:
+ while True:
+ self.cond.wait_for(lambda: self.q or self.stopping.is_set())
+ if self.stopping.is_set():
+ log.debug('thread exiting')
+ self.terminated.set()
+ self.cond.notifyAll()
+ return
+ q = self.q.copy()
+ self.q.clear()
+ self.lock.release()
+ try:
+ for item in q:
+ log.debug(f'calling {item[0]} params {item[1]}')
+ item[0](*item[1])
+ except Exception as e:
+ log.warn(f'callback exception: {e}')
+ self.lock.acquire()
+ except Exception as e:
+ log.info(f'threading exception: {e}')
+
+ def queue(self, cbk, args):
+ with self.lock:
+ self.q.append((cbk, args))
+ self.cond.notifyAll()
+
+ def stop(self):
+ with self.lock:
+ self.stopping.set()
+ self.cond.notifyAll()
+ self.cond.wait_for(lambda: self.terminated.is_set())
+
+class Finisher:
+ def __init__(self):
+ self.lock = threading.Lock()
+ self.thread = _ThreadWrapper(name='finisher')
+
+ def queue(self, cbk, args=[]):
+ with self.lock:
+ self.thread.queue(cbk, args)
+
+class AsyncOpTracker:
+ def __init__(self):
+ self.ops_in_progress = 0
+ self.lock = threading.Lock()
+ self.cond = threading.Condition(self.lock)
+
+ def start_async_op(self):
+ with self.lock:
+ self.ops_in_progress += 1
+ log.debug(f'start_async_op: {self.ops_in_progress}')
+
+ def finish_async_op(self):
+ with self.lock:
+ self.ops_in_progress -= 1
+ log.debug(f'finish_async_op: {self.ops_in_progress}')
+ assert(self.ops_in_progress >= 0)
+ self.cond.notifyAll()
+
+ def wait_for_ops(self):
+ with self.lock:
+ log.debug(f'wait_for_ops: {self.ops_in_progress}')
+ self.cond.wait_for(lambda: self.ops_in_progress == 0)
+ log.debug(f'done')