1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
import os
import uuid
import stat
import logging
from contextlib import contextmanager
import cephfs
from .index import Index
from ..exception import IndexException, VolumeException
from ..fs_util import list_one_entry_at_a_time
log = logging.getLogger(__name__)
class CloneIndex(Index):
SUB_GROUP_NAME = "clone"
PATH_MAX = 4096
@property
def path(self):
return os.path.join(super(CloneIndex, self).path, CloneIndex.SUB_GROUP_NAME.encode('utf-8'))
def _track(self, sink_path):
tracking_id = str(uuid.uuid4())
source_path = os.path.join(self.path, tracking_id.encode('utf-8'))
log.info("tracking-id {0} for path {1}".format(tracking_id, sink_path))
self.fs.symlink(sink_path, source_path)
return tracking_id
def track(self, sink_path):
try:
return self._track(sink_path)
except (VolumeException, cephfs.Error) as e:
if isinstance(e, cephfs.Error):
e = IndexException(-e.args[0], e.args[1])
elif isinstance(e, VolumeException):
e = IndexException(e.errno, e.error_str)
raise e
def untrack(self, tracking_id):
log.info("untracking {0}".format(tracking_id))
source_path = os.path.join(self.path, tracking_id.encode('utf-8'))
try:
self.fs.unlink(source_path)
except cephfs.Error as e:
raise IndexException(-e.args[0], e.args[1])
def get_oldest_clone_entry(self, exclude=[]):
min_ctime_entry = None
exclude_tracking_ids = [v[0] for v in exclude]
log.debug("excluded tracking ids: {0}".format(exclude_tracking_ids))
for entry in list_one_entry_at_a_time(self.fs, self.path):
dname = entry.d_name
dpath = os.path.join(self.path, dname)
st = self.fs.lstat(dpath)
if dname not in exclude_tracking_ids and stat.S_ISLNK(st.st_mode):
if min_ctime_entry is None or st.st_ctime < min_ctime_entry[1].st_ctime:
min_ctime_entry = (dname, st)
if min_ctime_entry:
try:
linklen = min_ctime_entry[1].st_size
sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), CloneIndex.PATH_MAX)
return (min_ctime_entry[0], sink_path[:linklen])
except cephfs.Error as e:
raise IndexException(-e.args[0], e.args[1])
return None
def find_clone_entry_index(self, sink_path):
try:
for entry in list_one_entry_at_a_time(self.fs, self.path):
dname = entry.d_name
dpath = os.path.join(self.path, dname)
st = self.fs.lstat(dpath)
if stat.S_ISLNK(st.st_mode):
target_path = self.fs.readlink(dpath, CloneIndex.PATH_MAX)
if sink_path == target_path[:st.st_size]:
return dname
return None
except cephfs.Error as e:
raise IndexException(-e.args[0], e.args[1])
def create_clone_index(fs, vol_spec):
clone_index = CloneIndex(fs, vol_spec)
try:
fs.mkdirs(clone_index.path, 0o700)
except cephfs.Error as e:
raise IndexException(-e.args[0], e.args[1])
@contextmanager
def open_clone_index(fs, vol_spec):
clone_index = CloneIndex(fs, vol_spec)
try:
fs.stat(clone_index.path)
except cephfs.Error as e:
raise IndexException(-e.args[0], e.args[1])
yield clone_index
|