summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py
blob: 718735d91b131a323a40e0382da4396b543c52c0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import os
import errno
import logging
import sys
import threading
import configparser
import re

import cephfs

from ...exception import MetadataMgrException

log = logging.getLogger(__name__)

# _lock needs to be shared across all instances of MetadataManager.
# that is why we have a file level instance
_lock = threading.Lock()


def _conf_reader(fs, fd, offset=0, length=4096):
    while True:
        buf = fs.read(fd, offset, length)
        offset += len(buf)
        if not buf:
            return
        yield buf.decode('utf-8')


class _ConfigWriter:
    def __init__(self, fs, fd):
        self._fs = fs
        self._fd = fd
        self._wrote = 0

    def write(self, value):
        buf = value.encode('utf-8')
        wrote = self._fs.write(self._fd, buf, -1)
        self._wrote += wrote
        return wrote

    def fsync(self):
        self._fs.fsync(self._fd, 0)

    @property
    def wrote(self):
        return self._wrote

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        self._fs.close(self._fd)


class MetadataManager(object):
    GLOBAL_SECTION = "GLOBAL"
    USER_METADATA_SECTION   = "USER_METADATA"
    GLOBAL_META_KEY_VERSION = "version"
    GLOBAL_META_KEY_TYPE    = "type"
    GLOBAL_META_KEY_PATH    = "path"
    GLOBAL_META_KEY_STATE   = "state"

    CLONE_FAILURE_SECTION = "CLONE_FAILURE"
    CLONE_FAILURE_META_KEY_ERRNO = "errno"
    CLONE_FAILURE_META_KEY_ERROR_MSG = "error_msg"

    def __init__(self, fs, config_path, mode):
        self.fs = fs
        self.mode = mode
        self.config_path = config_path
        self.config = configparser.ConfigParser()

    def refresh(self):
        fd = None
        try:
            log.debug("opening config {0}".format(self.config_path))
            with _lock:
                fd = self.fs.open(self.config_path, os.O_RDONLY)
                cfg = ''.join(_conf_reader(self.fs, fd))
            self.config.read_string(cfg, source=self.config_path)
        except UnicodeDecodeError:
            raise MetadataMgrException(-errno.EINVAL,
                    "failed to decode, erroneous metadata config '{0}'".format(self.config_path))
        except cephfs.ObjectNotFound:
            raise MetadataMgrException(-errno.ENOENT, "metadata config '{0}' not found".format(self.config_path))
        except cephfs.Error as e:
            raise MetadataMgrException(-e.args[0], e.args[1])
        except configparser.Error:
            raise MetadataMgrException(-errno.EINVAL, "failed to parse, erroneous metadata config "
                    "'{0}'".format(self.config_path))
        finally:
            if fd is not None:
                self.fs.close(fd)

    def flush(self):
        # cull empty sections
        for section in list(self.config.sections()):
            if len(self.config.items(section)) == 0:
                self.config.remove_section(section)

        try:
            with _lock:
                tmp_config_path = self.config_path + b'.tmp'
                fd = self.fs.open(tmp_config_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, self.mode)
                with _ConfigWriter(self.fs, fd) as cfg_writer:
                    self.config.write(cfg_writer)
                    cfg_writer.fsync()
                self.fs.rename(tmp_config_path, self.config_path)
            log.info(f"wrote {cfg_writer.wrote} bytes to config {tmp_config_path}")
            log.info(f"Renamed {tmp_config_path} to config {self.config_path}")
        except cephfs.Error as e:
            raise MetadataMgrException(-e.args[0], e.args[1])

    def init(self, version, typ, path, state):
        # you may init just once before refresh (helps to overwrite conf)
        if self.config.has_section(MetadataManager.GLOBAL_SECTION):
            raise MetadataMgrException(-errno.EINVAL, "init called on an existing config")

        self.add_section(MetadataManager.GLOBAL_SECTION)
        self.update_section_multi(
            MetadataManager.GLOBAL_SECTION, {MetadataManager.GLOBAL_META_KEY_VERSION : str(version),
                                             MetadataManager.GLOBAL_META_KEY_TYPE    : str(typ),
                                             MetadataManager.GLOBAL_META_KEY_PATH    : str(path),
                                             MetadataManager.GLOBAL_META_KEY_STATE   : str(state)
            })

    def add_section(self, section):
        try:
            self.config.add_section(section)
        except configparser.DuplicateSectionError:
            return
        except:
            raise MetadataMgrException(-errno.EINVAL, "error adding section to config")

    def remove_option(self, section, key):
        if not self.config.has_section(section):
            raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
        return self.config.remove_option(section, key)

    def remove_section(self, section):
        self.config.remove_section(section)

    def update_section(self, section, key, value):
        if not self.config.has_section(section):
            raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
        self.config.set(section, key, str(value))

    def update_section_multi(self, section, dct):
        if not self.config.has_section(section):
            raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
        for key,value in dct.items():
            self.config.set(section, key, str(value))

    def update_global_section(self, key, value):
        self.update_section(MetadataManager.GLOBAL_SECTION, key, str(value))

    def get_option(self, section, key):
        if not self.config.has_section(section):
            raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
        if not self.config.has_option(section, key):
            raise MetadataMgrException(-errno.ENOENT, "no config '{0}' in section '{1}'".format(key, section))
        return self.config.get(section, key)

    def get_global_option(self, key):
        return self.get_option(MetadataManager.GLOBAL_SECTION, key)

    def list_all_options_from_section(self, section):
        metadata_dict = {}
        if self.config.has_section(section):
            options = self.config.options(section)
            for option in options:
                metadata_dict[option] = self.config.get(section,option)
        return metadata_dict

    def list_all_keys_with_specified_values_from_section(self, section, value):
        keys = []
        if self.config.has_section(section):
            options = self.config.options(section)
            for option in options:
                if (value == self.config.get(section, option)) :
                    keys.append(option)
        return keys

    def section_has_item(self, section, item):
        if not self.config.has_section(section):
            raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
        return item in [v[1] for v in self.config.items(section)]

    def has_snap_metadata_section(self):
        sections = self.config.sections()
        r = re.compile('SNAP_METADATA_.*')
        for section in sections:
            if r.match(section):
                return True
        return False

    def list_snaps_with_metadata(self):
        sections = self.config.sections()
        r = re.compile('SNAP_METADATA_.*')
        return [section[len("SNAP_METADATA_"):] for section in sections if r.match(section)]