diff options
Diffstat (limited to 'src/pybind/mgr/volumes/fs/operations/clone_index.py')
-rw-r--r-- | src/pybind/mgr/volumes/fs/operations/clone_index.py | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/operations/clone_index.py b/src/pybind/mgr/volumes/fs/operations/clone_index.py new file mode 100644 index 000000000..a2b31f858 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/clone_index.py @@ -0,0 +1,98 @@ +import os +import uuid +import stat +import errno +import logging +from contextlib import contextmanager + +import cephfs + +from .index import Index +from ..exception import IndexException, VolumeException +from ..fs_util import list_one_entry_at_a_time + +log = logging.getLogger(__name__) + +class CloneIndex(Index): + SUB_GROUP_NAME = "clone" + PATH_MAX = 4096 + + @property + def path(self): + return os.path.join(super(CloneIndex, self).path, CloneIndex.SUB_GROUP_NAME.encode('utf-8')) + + def _track(self, sink_path): + tracking_id = str(uuid.uuid4()) + source_path = os.path.join(self.path, tracking_id.encode('utf-8')) + log.info("tracking-id {0} for path {1}".format(tracking_id, sink_path)) + + self.fs.symlink(sink_path, source_path) + return tracking_id + + def track(self, sink_path): + try: + return self._track(sink_path) + except (VolumeException, cephfs.Error) as e: + if isinstance(e, cephfs.Error): + e = IndexException(-e.args[0], e.args[1]) + elif isinstance(e, VolumeException): + e = IndexException(e.errno, e.error_str) + raise e + + def untrack(self, tracking_id): + log.info("untracking {0}".format(tracking_id)) + source_path = os.path.join(self.path, tracking_id.encode('utf-8')) + try: + self.fs.unlink(source_path) + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + + def get_oldest_clone_entry(self, exclude=[]): + min_ctime_entry = None + exclude_tracking_ids = [v[0] for v in exclude] + log.debug("excluded tracking ids: {0}".format(exclude_tracking_ids)) + for entry in list_one_entry_at_a_time(self.fs, self.path): + dname = entry.d_name + dpath = os.path.join(self.path, dname) + st = self.fs.lstat(dpath) + if dname not in exclude_tracking_ids and stat.S_ISLNK(st.st_mode): + if min_ctime_entry is None or st.st_ctime < min_ctime_entry[1].st_ctime: + min_ctime_entry = (dname, st) + if min_ctime_entry: + try: + linklen = min_ctime_entry[1].st_size + sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), CloneIndex.PATH_MAX) + return (min_ctime_entry[0], sink_path[:linklen]) + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + return None + + def find_clone_entry_index(self, sink_path): + try: + for entry in list_one_entry_at_a_time(self.fs, self.path): + dname = entry.d_name + dpath = os.path.join(self.path, dname) + st = self.fs.lstat(dpath) + if stat.S_ISLNK(st.st_mode): + target_path = self.fs.readlink(dpath, CloneIndex.PATH_MAX) + if sink_path == target_path[:st.st_size]: + return dname + return None + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + +def create_clone_index(fs, vol_spec): + clone_index = CloneIndex(fs, vol_spec) + try: + fs.mkdirs(clone_index.path, 0o700) + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + +@contextmanager +def open_clone_index(fs, vol_spec): + clone_index = CloneIndex(fs, vol_spec) + try: + fs.stat(clone_index.path) + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + yield clone_index |