summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/dashboard/services/cephfs.py
blob: 604cf4a77fc823d2f90a0fb8373fed7f56f5b518 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import datetime
import logging
import os
from contextlib import contextmanager

import cephfs

from .. import mgr

logger = logging.getLogger('cephfs')


class CephFS(object):
    @classmethod
    def list_filesystems(cls):
        fsmap = mgr.get("fs_map")
        return [{'id': fs['id'], 'name': fs['mdsmap']['fs_name']}
                for fs in fsmap['filesystems']]

    @classmethod
    def fs_name_from_id(cls, fs_id):
        """
        Get the filesystem name from ID.
        :param fs_id: The filesystem ID.
        :type fs_id: int | str
        :return: The filesystem name or None.
        :rtype: str | None
        """
        fs_map = mgr.get("fs_map")
        fs_info = list(filter(lambda x: str(x['id']) == str(fs_id),
                              fs_map['filesystems']))
        if not fs_info:
            return None
        return fs_info[0]['mdsmap']['fs_name']

    def __init__(self, fs_name=None):
        logger.debug("initializing cephfs connection")
        self.cfs = cephfs.LibCephFS(rados_inst=mgr.rados)
        logger.debug("mounting cephfs filesystem: %s", fs_name)
        if fs_name:
            self.cfs.mount(filesystem_name=fs_name)
        else:
            self.cfs.mount()
        logger.debug("mounted cephfs filesystem")

    def __del__(self):
        logger.debug("shutting down cephfs filesystem")
        self.cfs.shutdown()

    @contextmanager
    def opendir(self, dirpath):
        d = None
        try:
            d = self.cfs.opendir(dirpath)
            yield d
        finally:
            if d:
                self.cfs.closedir(d)

    def ls_dir(self, path, depth):
        """
        List directories of specified path with additional information.
        :param path: The root directory path.
        :type path: str | bytes
        :param depth: The number of steps to go down the directory tree.
        :type depth: int | str
        :return: A list of directory dicts which consist of name, path,
            parent, snapshots and quotas.
        :rtype: list
        """
        paths = self._ls_dir(path, int(depth))
        # Convert (bytes => string), prettify paths (strip slashes)
        # and append additional information.
        return [self.get_directory(p) for p in paths if p != path.encode()]

    def _ls_dir(self, path, depth):
        """
        List directories of specified path.
        :param path: The root directory path.
        :type path: str | bytes
        :param depth: The number of steps to go down the directory tree.
        :type depth: int
        :return: A list of directory paths (bytes encoded).
            Example:
            ls_dir('/photos', 1) => [
                b'/photos/flowers', b'/photos/cars'
            ]
        :rtype: list
        """
        if isinstance(path, str):
            path = path.encode()
        logger.debug("get_dir_list dirpath=%s depth=%s", path,
                     depth)
        if depth == 0:
            return [path]
        logger.debug("opening dirpath=%s", path)
        with self.opendir(path) as d:
            dent = self.cfs.readdir(d)
            paths = [path]
            while dent:
                logger.debug("found entry=%s", dent.d_name)
                if dent.d_name in [b'.', b'..']:
                    dent = self.cfs.readdir(d)
                    continue
                if dent.is_dir():
                    logger.debug("found dir=%s", dent.d_name)
                    subdir_path = os.path.join(path, dent.d_name)
                    paths.extend(self._ls_dir(subdir_path, depth - 1))
                dent = self.cfs.readdir(d)
        return paths

    def get_directory(self, path):
        """
        Transforms path of directory into a meaningful dictionary.
        :param path: The root directory path.
        :type path: str | bytes
        :return: Dict consists of name, path, parent, snapshots and quotas.
        :rtype: dict
        """
        path = path.decode()
        not_root = path != os.sep
        return {
            'name': os.path.basename(path) if not_root else path,
            'path': path,
            'parent': os.path.dirname(path) if not_root else None,
            'snapshots': self.ls_snapshots(path),
            'quotas': self.get_quotas(path) if not_root else None
        }

    def dir_exists(self, path):
        try:
            with self.opendir(path):
                return True
        except cephfs.ObjectNotFound:
            return False

    def mk_dirs(self, path):
        """
        Create a directory.
        :param path: The path of the directory.
        """
        if path == os.sep:
            raise Exception('Cannot create root directory "/"')
        if self.dir_exists(path):
            return
        logger.info("Creating directory: %s", path)
        self.cfs.mkdirs(path, 0o755)

    def rm_dir(self, path):
        """
        Remove a directory.
        :param path: The path of the directory.
        """
        if path == os.sep:
            raise Exception('Cannot remove root directory "/"')
        if not self.dir_exists(path):
            return
        logger.info("Removing directory: %s", path)
        self.cfs.rmdir(path)

    def mk_snapshot(self, path, name=None, mode=0o755):
        """
        Create a snapshot.
        :param path: The path of the directory.
        :type path: str
        :param name: The name of the snapshot. If not specified,
            a name using the current time in RFC3339 UTC format
            will be generated.
        :type name: str | None
        :param mode: The permissions the directory should have
            once created.
        :type mode: int
        :return: Returns the name of the snapshot.
        :rtype: str
        """
        if name is None:
            now = datetime.datetime.now()
            tz = now.astimezone().tzinfo
            name = now.replace(tzinfo=tz).isoformat('T')
        client_snapdir = self.cfs.conf_get('client_snapdir')
        snapshot_path = os.path.join(path, client_snapdir, name)
        logger.info("Creating snapshot: %s", snapshot_path)
        self.cfs.mkdir(snapshot_path, mode)
        return name

    def ls_snapshots(self, path):
        """
        List snapshots for the specified path.
        :param path: The path of the directory.
        :type path: str
        :return: A list of dictionaries containing the name and the
          creation time of the snapshot.
        :rtype: list
        """
        result = []
        client_snapdir = self.cfs.conf_get('client_snapdir')
        path = os.path.join(path, client_snapdir).encode()
        with self.opendir(path) as d:
            dent = self.cfs.readdir(d)
            while dent:
                if dent.is_dir():
                    if dent.d_name not in [b'.', b'..'] and not dent.d_name.startswith(b'_'):
                        snapshot_path = os.path.join(path, dent.d_name)
                        stat = self.cfs.stat(snapshot_path)
                        result.append({
                            'name': dent.d_name.decode(),
                            'path': snapshot_path.decode(),
                            'created': '{}Z'.format(stat.st_ctime.isoformat('T'))
                        })
                dent = self.cfs.readdir(d)
        return result

    def rm_snapshot(self, path, name):
        """
        Remove a snapshot.
        :param path: The path of the directory.
        :type path: str
        :param name: The name of the snapshot.
        :type name: str
        """
        client_snapdir = self.cfs.conf_get('client_snapdir')
        snapshot_path = os.path.join(path, client_snapdir, name)
        logger.info("Removing snapshot: %s", snapshot_path)
        self.cfs.rmdir(snapshot_path)

    def get_quotas(self, path):
        """
        Get the quotas of the specified path.
        :param path: The path of the directory/file.
        :type path: str
        :return: Returns a dictionary containing 'max_bytes'
            and 'max_files'.
        :rtype: dict
        """
        try:
            max_bytes = int(self.cfs.getxattr(path, 'ceph.quota.max_bytes'))
        except cephfs.NoData:
            max_bytes = 0
        try:
            max_files = int(self.cfs.getxattr(path, 'ceph.quota.max_files'))
        except cephfs.NoData:
            max_files = 0
        return {'max_bytes': max_bytes, 'max_files': max_files}

    def set_quotas(self, path, max_bytes=None, max_files=None):
        """
        Set the quotas of the specified path.
        :param path: The path of the directory/file.
        :type path: str
        :param max_bytes: The byte limit.
        :type max_bytes: int | None
        :param max_files: The file limit.
        :type max_files: int | None
        """
        if max_bytes is not None:
            self.cfs.setxattr(path, 'ceph.quota.max_bytes',
                              str(max_bytes).encode(), 0)
        if max_files is not None:
            self.cfs.setxattr(path, 'ceph.quota.max_files',
                              str(max_files).encode(), 0)