summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/volumes/fs/operations/clone_index.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/volumes/fs/operations/clone_index.py')
-rw-r--r--src/pybind/mgr/volumes/fs/operations/clone_index.py98
1 files changed, 98 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/operations/clone_index.py b/src/pybind/mgr/volumes/fs/operations/clone_index.py
new file mode 100644
index 000000000..a2b31f858
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/clone_index.py
@@ -0,0 +1,98 @@
+import os
+import uuid
+import stat
+import errno
+import logging
+from contextlib import contextmanager
+
+import cephfs
+
+from .index import Index
+from ..exception import IndexException, VolumeException
+from ..fs_util import list_one_entry_at_a_time
+
+log = logging.getLogger(__name__)
+
+class CloneIndex(Index):
+ SUB_GROUP_NAME = "clone"
+ PATH_MAX = 4096
+
+ @property
+ def path(self):
+ return os.path.join(super(CloneIndex, self).path, CloneIndex.SUB_GROUP_NAME.encode('utf-8'))
+
+ def _track(self, sink_path):
+ tracking_id = str(uuid.uuid4())
+ source_path = os.path.join(self.path, tracking_id.encode('utf-8'))
+ log.info("tracking-id {0} for path {1}".format(tracking_id, sink_path))
+
+ self.fs.symlink(sink_path, source_path)
+ return tracking_id
+
+ def track(self, sink_path):
+ try:
+ return self._track(sink_path)
+ except (VolumeException, cephfs.Error) as e:
+ if isinstance(e, cephfs.Error):
+ e = IndexException(-e.args[0], e.args[1])
+ elif isinstance(e, VolumeException):
+ e = IndexException(e.errno, e.error_str)
+ raise e
+
+ def untrack(self, tracking_id):
+ log.info("untracking {0}".format(tracking_id))
+ source_path = os.path.join(self.path, tracking_id.encode('utf-8'))
+ try:
+ self.fs.unlink(source_path)
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+
+ def get_oldest_clone_entry(self, exclude=[]):
+ min_ctime_entry = None
+ exclude_tracking_ids = [v[0] for v in exclude]
+ log.debug("excluded tracking ids: {0}".format(exclude_tracking_ids))
+ for entry in list_one_entry_at_a_time(self.fs, self.path):
+ dname = entry.d_name
+ dpath = os.path.join(self.path, dname)
+ st = self.fs.lstat(dpath)
+ if dname not in exclude_tracking_ids and stat.S_ISLNK(st.st_mode):
+ if min_ctime_entry is None or st.st_ctime < min_ctime_entry[1].st_ctime:
+ min_ctime_entry = (dname, st)
+ if min_ctime_entry:
+ try:
+ linklen = min_ctime_entry[1].st_size
+ sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), CloneIndex.PATH_MAX)
+ return (min_ctime_entry[0], sink_path[:linklen])
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+ return None
+
+ def find_clone_entry_index(self, sink_path):
+ try:
+ for entry in list_one_entry_at_a_time(self.fs, self.path):
+ dname = entry.d_name
+ dpath = os.path.join(self.path, dname)
+ st = self.fs.lstat(dpath)
+ if stat.S_ISLNK(st.st_mode):
+ target_path = self.fs.readlink(dpath, CloneIndex.PATH_MAX)
+ if sink_path == target_path[:st.st_size]:
+ return dname
+ return None
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+
+def create_clone_index(fs, vol_spec):
+ clone_index = CloneIndex(fs, vol_spec)
+ try:
+ fs.mkdirs(clone_index.path, 0o700)
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+
+@contextmanager
+def open_clone_index(fs, vol_spec):
+ clone_index = CloneIndex(fs, vol_spec)
+ try:
+ fs.stat(clone_index.path)
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+ yield clone_index