diff options
Diffstat (limited to 'src/pybind/mgr/mirroring/fs/utils.py')
-rw-r--r-- | src/pybind/mgr/mirroring/fs/utils.py | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/src/pybind/mgr/mirroring/fs/utils.py b/src/pybind/mgr/mirroring/fs/utils.py new file mode 100644 index 000000000..5e1d05373 --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/utils.py @@ -0,0 +1,152 @@ +import errno +import logging +import threading + +import rados +import cephfs + +from .exception import MirrorException + +MIRROR_OBJECT_PREFIX = 'cephfs_mirror' +MIRROR_OBJECT_NAME = MIRROR_OBJECT_PREFIX + +INSTANCE_ID_PREFIX = "instance_" +DIRECTORY_MAP_PREFIX = "dir_map_" + +log = logging.getLogger(__name__) + +def connect_to_cluster(client_name, cluster_name, conf_dct, desc=''): + try: + log.debug(f'connecting to {desc} cluster: {client_name}/{cluster_name}') + mon_host = conf_dct.get('mon_host', '') + cephx_key = conf_dct.get('key', '') + if mon_host and cephx_key: + r_rados = rados.Rados(rados_id=client_name, conf={'mon_host': mon_host, + 'key': cephx_key}) + else: + r_rados = rados.Rados(rados_id=client_name, clustername=cluster_name) + r_rados.conf_read_file() + r_rados.connect() + log.debug(f'connected to {desc} cluster') + return r_rados + except rados.Error as e: + if e.errno == errno.ENOENT: + raise MirrorException(-e.errno, f'cluster {cluster_name} does not exist') + else: + log.error(f'error connecting to cluster: {e}') + raise Exception(-e.errno) + +def disconnect_from_cluster(cluster_name, cluster): + try: + log.debug(f'disconnecting from cluster {cluster_name}') + cluster.shutdown() + log.debug(f'disconnected from cluster {cluster_name}') + except Exception as e: + log.error(f'error disconnecting: {e}') + +def connect_to_filesystem(client_name, cluster_name, fs_name, desc, conf_dct={}): + try: + cluster = connect_to_cluster(client_name, cluster_name, conf_dct, desc) + log.debug(f'connecting to {desc} filesystem: {fs_name}') + fs = cephfs.LibCephFS(rados_inst=cluster) + fs.conf_set("client_mount_uid", "0") + fs.conf_set("client_mount_gid", "0") + fs.conf_set("client_check_pool_perm", "false") + log.debug('CephFS initializing...') + fs.init() + log.debug('CephFS mounting...') + fs.mount(filesystem_name=fs_name.encode('utf-8')) + log.debug(f'Connection to cephfs {fs_name} complete') + return (cluster, fs) + except cephfs.Error as e: + if e.errno == errno.ENOENT: + raise MirrorException(-e.errno, f'filesystem {fs_name} does not exist') + else: + log.error(f'error connecting to filesystem {fs_name}: {e}') + raise Exception(-e.errno) + +def disconnect_from_filesystem(cluster_name, fs_name, cluster, fs_handle): + try: + log.debug(f'disconnecting from filesystem {fs_name}') + fs_handle.shutdown() + log.debug(f'disconnected from filesystem {fs_name}') + disconnect_from_cluster(cluster_name, cluster) + except Exception as e: + log.error(f'error disconnecting: {e}') + +class _ThreadWrapper(threading.Thread): + def __init__(self, name): + self.q = [] + self.stopping = threading.Event() + self.terminated = threading.Event() + self.lock = threading.Lock() + self.cond = threading.Condition(self.lock) + super().__init__(name=name) + super().start() + + def run(self): + try: + with self.lock: + while True: + self.cond.wait_for(lambda: self.q or self.stopping.is_set()) + if self.stopping.is_set(): + log.debug('thread exiting') + self.terminated.set() + self.cond.notifyAll() + return + q = self.q.copy() + self.q.clear() + self.lock.release() + try: + for item in q: + log.debug(f'calling {item[0]} params {item[1]}') + item[0](*item[1]) + except Exception as e: + log.warn(f'callback exception: {e}') + self.lock.acquire() + except Exception as e: + log.info(f'threading exception: {e}') + + def queue(self, cbk, args): + with self.lock: + self.q.append((cbk, args)) + self.cond.notifyAll() + + def stop(self): + with self.lock: + self.stopping.set() + self.cond.notifyAll() + self.cond.wait_for(lambda: self.terminated.is_set()) + +class Finisher: + def __init__(self): + self.lock = threading.Lock() + self.thread = _ThreadWrapper(name='finisher') + + def queue(self, cbk, args=[]): + with self.lock: + self.thread.queue(cbk, args) + +class AsyncOpTracker: + def __init__(self): + self.ops_in_progress = 0 + self.lock = threading.Lock() + self.cond = threading.Condition(self.lock) + + def start_async_op(self): + with self.lock: + self.ops_in_progress += 1 + log.debug(f'start_async_op: {self.ops_in_progress}') + + def finish_async_op(self): + with self.lock: + self.ops_in_progress -= 1 + log.debug(f'finish_async_op: {self.ops_in_progress}') + assert(self.ops_in_progress >= 0) + self.cond.notifyAll() + + def wait_for_ops(self): + with self.lock: + log.debug(f'wait_for_ops: {self.ops_in_progress}') + self.cond.wait_for(lambda: self.ops_in_progress == 0) + log.debug(f'done') |