1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
|
import os
import stat
import uuid
import errno
import logging
import cephfs
from .metadata_manager import MetadataManager
from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures
from .op_sm import SubvolumeOpSm
from .subvolume_v1 import SubvolumeV1
from ..template import SubvolumeTemplate
from ...exception import OpSmException, VolumeException, MetadataMgrException
from ...fs_util import listdir, create_base_dir
from ..template import SubvolumeOpType
log = logging.getLogger(__name__)
class SubvolumeV2(SubvolumeV1):
"""
Version 2 subvolumes creates a subvolume with path as follows,
volumes/<group-name>/<subvolume-name>/<uuid>/
The distinguishing feature of V2 subvolume as compared to V1 subvolumes is its ability to retain snapshots
of a subvolume on removal. This is done by creating snapshots under the <subvolume-name> directory,
rather than under the <uuid> directory, as is the case of V1 subvolumes.
- The directory under which user data resides is <uuid>
- Snapshots of the subvolume are taken within the <subvolume-name> directory
- A meta file is maintained under the <subvolume-name> directory as a metadata store, storing information similar
to V1 subvolumes
- On a request to remove subvolume but retain its snapshots, only the <uuid> directory is moved to trash, retaining
the rest of the subvolume and its meta file.
- The <uuid> directory, when present, is the current incarnation of the subvolume, which may have snapshots of
older incarnations of the same subvolume.
- V1 subvolumes that currently do not have any snapshots are upgraded to V2 subvolumes automatically, to support the
snapshot retention feature
"""
VERSION = 2
@staticmethod
def version():
return SubvolumeV2.VERSION
@property
def features(self):
return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value,
SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value,
SubvolumeFeatures.FEATURE_SNAPSHOT_RETENTION.value]
@property
def retained(self):
try:
self.metadata_mgr.refresh()
if self.state == SubvolumeStates.STATE_RETAINED:
return True
return False
except MetadataMgrException as me:
if me.errno != -errno.ENOENT:
raise VolumeException(me.errno, "internal error while processing subvolume '{0}'".format(self.subvolname))
return False
@property
def purgeable(self):
if not self.retained or self.list_snapshots() or self.has_pending_purges:
return False
return True
@property
def has_pending_purges(self):
try:
return not listdir(self.fs, self.trash_dir) == []
except VolumeException as ve:
if ve.errno == -errno.ENOENT:
return False
raise
@property
def trash_dir(self):
return os.path.join(self.base_path, b".trash")
def create_trashcan(self):
"""per subvolume trash directory"""
try:
self.fs.stat(self.trash_dir)
except cephfs.Error as e:
if e.args[0] == errno.ENOENT:
try:
self.fs.mkdir(self.trash_dir, 0o700)
except cephfs.Error as ce:
raise VolumeException(-ce.args[0], ce.args[1])
else:
raise VolumeException(-e.args[0], e.args[1])
def mark_subvolume(self):
# set subvolume attr, on subvolume root, marking it as a CephFS subvolume
# subvolume root is where snapshots would be taken, and hence is the base_path for v2 subvolumes
try:
# MDS treats this as a noop for already marked subvolume
self.fs.setxattr(self.base_path, 'ceph.dir.subvolume', b'1', 0)
except cephfs.InvalidValue as e:
raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
@staticmethod
def is_valid_uuid(uuid_str):
try:
uuid.UUID(uuid_str)
return True
except ValueError:
return False
def snapshot_base_path(self):
return os.path.join(self.base_path, self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
def snapshot_data_path(self, snapname):
snap_base_path = self.snapshot_path(snapname)
uuid_str = None
try:
with self.fs.opendir(snap_base_path) as dir_handle:
d = self.fs.readdir(dir_handle)
while d:
if d.d_name not in (b".", b".."):
d_full_path = os.path.join(snap_base_path, d.d_name)
stx = self.fs.statx(d_full_path, cephfs.CEPH_STATX_MODE, cephfs.AT_SYMLINK_NOFOLLOW)
if stat.S_ISDIR(stx.get('mode')):
if self.is_valid_uuid(d.d_name.decode('utf-8')):
uuid_str = d.d_name
d = self.fs.readdir(dir_handle)
except cephfs.Error as e:
if e.errno == errno.ENOENT:
raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
raise VolumeException(-e.args[0], e.args[1])
if not uuid_str:
raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
return os.path.join(snap_base_path, uuid_str)
def _remove_on_failure(self, subvol_path, retained):
if retained:
log.info("cleaning up subvolume incarnation with path: {0}".format(subvol_path))
try:
self.fs.rmdir(subvol_path)
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
else:
log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
self.remove(internal_cleanup=True)
def _set_incarnation_metadata(self, subvolume_type, qpath, initial_state):
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_TYPE, subvolume_type.value)
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, qpath)
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, initial_state.value)
def create(self, size, isolate_nspace, pool, mode, uid, gid):
subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
except OpSmException as oe:
raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
retained = self.retained
if retained and self.has_pending_purges:
raise VolumeException(-errno.EAGAIN, "asynchronous purge of subvolume in progress")
subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
try:
# create group directory with default mode(0o755) if it doesn't exist.
create_base_dir(self.fs, self.group.path, self.vol_spec.DEFAULT_MODE)
self.fs.mkdirs(subvol_path, mode)
self.mark_subvolume()
attrs = {
'uid': uid,
'gid': gid,
'data_pool': pool,
'pool_namespace': self.namespace if isolate_nspace else None,
'quota': size
}
self.set_attrs(subvol_path, attrs)
# persist subvolume metadata
qpath = subvol_path.decode('utf-8')
if retained:
self._set_incarnation_metadata(subvolume_type, qpath, initial_state)
self.metadata_mgr.flush()
else:
self.init_config(SubvolumeV2.VERSION, subvolume_type, qpath, initial_state)
# Create the subvolume metadata file which manages auth-ids if it doesn't exist
self.auth_mdata_mgr.create_subvolume_metadata_file(self.group.groupname, self.subvolname)
except (VolumeException, MetadataMgrException, cephfs.Error) as e:
try:
self._remove_on_failure(subvol_path, retained)
except VolumeException as ve:
log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
if isinstance(e, MetadataMgrException):
log.error("metadata manager exception: {0}".format(e))
e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}")
elif isinstance(e, cephfs.Error):
e = VolumeException(-e.args[0], e.args[1])
raise e
def create_clone(self, pool, source_volname, source_subvolume, snapname):
subvolume_type = SubvolumeTypes.TYPE_CLONE
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
except OpSmException as oe:
raise VolumeException(-errno.EINVAL, "clone failed: internal error")
retained = self.retained
if retained and self.has_pending_purges:
raise VolumeException(-errno.EAGAIN, "asynchronous purge of subvolume in progress")
subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
try:
# source snapshot attrs are used to create clone subvolume
# attributes of subvolume's content though, are synced during the cloning process.
attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname))
# The source of the clone may have exceeded its quota limit as
# CephFS quotas are imprecise. Cloning such a source may fail if
# the quota on the destination is set before starting the clone
# copy. So always set the quota on destination after cloning is
# successful.
attrs["quota"] = None
# override snapshot pool setting, if one is provided for the clone
if pool is not None:
attrs["data_pool"] = pool
attrs["pool_namespace"] = None
# create directory and set attributes
self.fs.mkdirs(subvol_path, attrs.get("mode"))
self.mark_subvolume()
self.set_attrs(subvol_path, attrs)
# persist subvolume metadata and clone source
qpath = subvol_path.decode('utf-8')
if retained:
self._set_incarnation_metadata(subvolume_type, qpath, initial_state)
else:
self.metadata_mgr.init(SubvolumeV2.VERSION, subvolume_type.value, qpath, initial_state.value)
self.add_clone_source(source_volname, source_subvolume, snapname)
self.metadata_mgr.flush()
except (VolumeException, MetadataMgrException, cephfs.Error) as e:
try:
self._remove_on_failure(subvol_path, retained)
except VolumeException as ve:
log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
if isinstance(e, MetadataMgrException):
log.error("metadata manager exception: {0}".format(e))
e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}")
elif isinstance(e, cephfs.Error):
e = VolumeException(-e.args[0], e.args[1])
raise e
def allowed_ops_by_type(self, vol_type):
if vol_type == SubvolumeTypes.TYPE_CLONE:
return {op_type for op_type in SubvolumeOpType}
if vol_type == SubvolumeTypes.TYPE_NORMAL:
return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS,
SubvolumeOpType.CLONE_CANCEL,
SubvolumeOpType.CLONE_INTERNAL}
return {}
def allowed_ops_by_state(self, vol_state):
if vol_state == SubvolumeStates.STATE_COMPLETE:
return {op_type for op_type in SubvolumeOpType}
if vol_state == SubvolumeStates.STATE_RETAINED:
return {
SubvolumeOpType.REMOVE,
SubvolumeOpType.REMOVE_FORCE,
SubvolumeOpType.LIST,
SubvolumeOpType.INFO,
SubvolumeOpType.SNAP_REMOVE,
SubvolumeOpType.SNAP_LIST,
SubvolumeOpType.SNAP_INFO,
SubvolumeOpType.SNAP_PROTECT,
SubvolumeOpType.SNAP_UNPROTECT,
SubvolumeOpType.CLONE_SOURCE
}
return {SubvolumeOpType.REMOVE_FORCE,
SubvolumeOpType.CLONE_CREATE,
SubvolumeOpType.CLONE_STATUS,
SubvolumeOpType.CLONE_CANCEL,
SubvolumeOpType.CLONE_INTERNAL,
SubvolumeOpType.CLONE_SOURCE}
def open(self, op_type):
if not isinstance(op_type, SubvolumeOpType):
raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format(
op_type.value, self.subvolname))
try:
self.metadata_mgr.refresh()
# unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark
self.mark_subvolume()
etype = self.subvol_type
if op_type not in self.allowed_ops_by_type(etype):
raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
op_type.value, self.subvolname, etype.value))
estate = self.state
if op_type not in self.allowed_ops_by_state(estate) and estate == SubvolumeStates.STATE_RETAINED:
raise VolumeException(-errno.ENOENT, "subvolume '{0}' is removed and has only snapshots retained".format(
self.subvolname))
if op_type not in self.allowed_ops_by_state(estate) and estate != SubvolumeStates.STATE_RETAINED:
raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format(
self.subvolname, op_type.value))
if estate != SubvolumeStates.STATE_RETAINED:
subvol_path = self.path
log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
st = self.fs.stat(subvol_path)
self.uid = int(st.st_uid)
self.gid = int(st.st_gid)
self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
except MetadataMgrException as me:
if me.errno == -errno.ENOENT:
raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
raise VolumeException(me.args[0], me.args[1])
except cephfs.ObjectNotFound:
log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname))
raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname))
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
def trash_incarnation_dir(self):
"""rename subvolume (uuid component) to trash"""
self.create_trashcan()
try:
bname = os.path.basename(self.path)
tpath = os.path.join(self.trash_dir, bname)
log.debug("trash: {0} -> {1}".format(self.path, tpath))
self.fs.rename(self.path, tpath)
self._link_dir(tpath, bname)
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
@staticmethod
def safe_to_remove_subvolume_clone(subvol_state):
# Both the STATE_FAILED and STATE_CANCELED are handled by 'handle_clone_failed' in the state
# machine which removes the entry from the index. Hence, it's safe to removed clone with
# force option for both.
acceptable_rm_clone_states = [SubvolumeStates.STATE_COMPLETE, SubvolumeStates.STATE_CANCELED,
SubvolumeStates.STATE_FAILED, SubvolumeStates.STATE_RETAINED]
if subvol_state not in acceptable_rm_clone_states:
return False
return True
def remove(self, retainsnaps=False, internal_cleanup=False):
if self.list_snapshots():
if not retainsnaps:
raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
else:
if not internal_cleanup and not self.safe_to_remove_subvolume_clone(self.state):
raise VolumeException(-errno.EAGAIN,
"{0} clone in-progress -- please cancel the clone and retry".format(self.subvolname))
if not self.has_pending_purges:
self.trash_base_dir()
# Delete the volume meta file, if it's not already deleted
self.auth_mdata_mgr.delete_subvolume_metadata_file(self.group.groupname, self.subvolname)
return
if self.state != SubvolumeStates.STATE_RETAINED:
self.trash_incarnation_dir()
self.metadata_mgr.remove_section(MetadataManager.USER_METADATA_SECTION)
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "")
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value)
self.metadata_mgr.flush()
# Delete the volume meta file, if it's not already deleted
self.auth_mdata_mgr.delete_subvolume_metadata_file(self.group.groupname, self.subvolname)
def info(self):
if self.state != SubvolumeStates.STATE_RETAINED:
return super(SubvolumeV2, self).info()
return {'type': self.subvol_type.value, 'features': self.features, 'state': SubvolumeStates.STATE_RETAINED.value}
def remove_snapshot(self, snapname, force=False):
super(SubvolumeV2, self).remove_snapshot(snapname, force)
if self.purgeable:
self.trash_base_dir()
# tickle the volume purge job to purge this entry, using ESTALE
raise VolumeException(-errno.ESTALE, "subvolume '{0}' has been removed as the last retained snapshot is removed".format(self.subvolname))
# if not purgeable, subvol is not retained, or has snapshots, or already has purge jobs that will garbage collect this subvol
|