summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/mirroring/fs/utils.py
blob: 5e1d0537356d408b0d574b39977411ce0526889d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import errno
import logging
import threading

import rados
import cephfs

from .exception import MirrorException

MIRROR_OBJECT_PREFIX = 'cephfs_mirror'
MIRROR_OBJECT_NAME = MIRROR_OBJECT_PREFIX

INSTANCE_ID_PREFIX = "instance_"
DIRECTORY_MAP_PREFIX = "dir_map_"

log = logging.getLogger(__name__)

def connect_to_cluster(client_name, cluster_name, conf_dct, desc=''):
    try:
        log.debug(f'connecting to {desc} cluster: {client_name}/{cluster_name}')
        mon_host = conf_dct.get('mon_host', '')
        cephx_key = conf_dct.get('key', '')
        if mon_host and cephx_key:
            r_rados = rados.Rados(rados_id=client_name, conf={'mon_host': mon_host,
                                                              'key': cephx_key})
        else:
            r_rados = rados.Rados(rados_id=client_name, clustername=cluster_name)
            r_rados.conf_read_file()
        r_rados.connect()
        log.debug(f'connected to {desc} cluster')
        return r_rados
    except rados.Error as e:
        if e.errno == errno.ENOENT:
            raise MirrorException(-e.errno, f'cluster {cluster_name} does not exist')
        else:
            log.error(f'error connecting to cluster: {e}')
            raise Exception(-e.errno)

def disconnect_from_cluster(cluster_name, cluster):
    try:
        log.debug(f'disconnecting from cluster {cluster_name}')
        cluster.shutdown()
        log.debug(f'disconnected from cluster {cluster_name}')
    except Exception as e:
        log.error(f'error disconnecting: {e}')

def connect_to_filesystem(client_name, cluster_name, fs_name, desc, conf_dct={}):
    try:
        cluster = connect_to_cluster(client_name, cluster_name, conf_dct, desc)
        log.debug(f'connecting to {desc} filesystem: {fs_name}')
        fs = cephfs.LibCephFS(rados_inst=cluster)
        fs.conf_set("client_mount_uid", "0")
        fs.conf_set("client_mount_gid", "0")
        fs.conf_set("client_check_pool_perm", "false")
        log.debug('CephFS initializing...')
        fs.init()
        log.debug('CephFS mounting...')
        fs.mount(filesystem_name=fs_name.encode('utf-8'))
        log.debug(f'Connection to cephfs {fs_name} complete')
        return (cluster, fs)
    except cephfs.Error as e:
        if e.errno == errno.ENOENT:
            raise MirrorException(-e.errno, f'filesystem {fs_name} does not exist')
        else:
            log.error(f'error connecting to filesystem {fs_name}: {e}')
            raise Exception(-e.errno)

def disconnect_from_filesystem(cluster_name, fs_name, cluster, fs_handle):
    try:
        log.debug(f'disconnecting from filesystem {fs_name}')
        fs_handle.shutdown()
        log.debug(f'disconnected from filesystem {fs_name}')
        disconnect_from_cluster(cluster_name, cluster)
    except Exception as e:
        log.error(f'error disconnecting: {e}')

class _ThreadWrapper(threading.Thread):
    def __init__(self, name):
        self.q = []
        self.stopping = threading.Event()
        self.terminated = threading.Event()
        self.lock = threading.Lock()
        self.cond = threading.Condition(self.lock)
        super().__init__(name=name)
        super().start()

    def run(self):
        try:
            with self.lock:
                while True:
                    self.cond.wait_for(lambda: self.q or self.stopping.is_set())
                    if self.stopping.is_set():
                        log.debug('thread exiting')
                        self.terminated.set()
                        self.cond.notifyAll()
                        return
                    q = self.q.copy()
                    self.q.clear()
                    self.lock.release()
                    try:
                        for item in q:
                            log.debug(f'calling {item[0]} params {item[1]}')
                            item[0](*item[1])
                    except Exception as e:
                        log.warn(f'callback exception: {e}')
                    self.lock.acquire()
        except Exception as e:
            log.info(f'threading exception: {e}')

    def queue(self, cbk, args):
        with self.lock:
            self.q.append((cbk, args))
            self.cond.notifyAll()

    def stop(self):
        with self.lock:
            self.stopping.set()
            self.cond.notifyAll()
            self.cond.wait_for(lambda: self.terminated.is_set())

class Finisher:
    def __init__(self):
        self.lock = threading.Lock()
        self.thread = _ThreadWrapper(name='finisher')

    def queue(self, cbk, args=[]):
        with self.lock:
            self.thread.queue(cbk, args)

class AsyncOpTracker:
    def __init__(self):
        self.ops_in_progress = 0
        self.lock = threading.Lock()
        self.cond = threading.Condition(self.lock)

    def start_async_op(self):
        with self.lock:
            self.ops_in_progress += 1
            log.debug(f'start_async_op: {self.ops_in_progress}')

    def finish_async_op(self):
        with self.lock:
            self.ops_in_progress -= 1
            log.debug(f'finish_async_op: {self.ops_in_progress}')
            assert(self.ops_in_progress >= 0)
            self.cond.notifyAll()

    def wait_for_ops(self):
        with self.lock:
            log.debug(f'wait_for_ops: {self.ops_in_progress}')
            self.cond.wait_for(lambda: self.ops_in_progress == 0)
            log.debug(f'done')