diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/volumes/fs/purge_queue.py | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/volumes/fs/purge_queue.py')
-rw-r--r-- | src/pybind/mgr/volumes/fs/purge_queue.py | 113 |
1 files changed, 113 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/purge_queue.py b/src/pybind/mgr/volumes/fs/purge_queue.py new file mode 100644 index 000000000..abace19d0 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/purge_queue.py @@ -0,0 +1,113 @@ +import errno +import logging +import os +import stat + +import cephfs + +from .async_job import AsyncJobs +from .exception import VolumeException +from .operations.resolver import resolve_trash +from .operations.template import SubvolumeOpType +from .operations.group import open_group +from .operations.subvolume import open_subvol +from .operations.volume import open_volume, open_volume_lockless +from .operations.trash import open_trashcan + +log = logging.getLogger(__name__) + + +# helper for fetching a trash entry for a given volume +def get_trash_entry_for_volume(fs_client, volspec, volname, running_jobs): + log.debug("fetching trash entry for volume '{0}'".format(volname)) + + try: + with open_volume_lockless(fs_client, volname) as fs_handle: + try: + with open_trashcan(fs_handle, volspec) as trashcan: + path = trashcan.get_trash_entry(running_jobs) + return 0, path + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + return 0, None + raise ve + except VolumeException as ve: + log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve)) + return ve.errno, None + + +def subvolume_purge(fs_client, volspec, volname, trashcan, subvolume_trash_entry, should_cancel): + groupname, subvolname = resolve_trash(volspec, subvolume_trash_entry.decode('utf-8')) + log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname)) + + try: + with open_volume(fs_client, volname) as fs_handle: + with open_group(fs_handle, volspec, groupname) as group: + with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume: + log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable)) + if not subvolume.purgeable: + return + # this is fine under the global lock -- there are just a handful + # of entries in the subvolume to purge. moreover, the purge needs + # to be guarded since a create request might sneak in. + trashcan.purge(subvolume.base_path, should_cancel) + except VolumeException as ve: + if not ve.errno == -errno.ENOENT: + raise + + +# helper for starting a purge operation on a trash entry +def purge_trash_entry_for_volume(fs_client, volspec, volname, purge_entry, should_cancel): + log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname)) + + ret = 0 + try: + with open_volume_lockless(fs_client, volname) as fs_handle: + with open_trashcan(fs_handle, volspec) as trashcan: + try: + pth = os.path.join(trashcan.path, purge_entry) + stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE, + cephfs.AT_SYMLINK_NOFOLLOW) + if stat.S_ISLNK(stx['mode']): + tgt = fs_handle.readlink(pth, 4096) + tgt = tgt[:stx['size']] + log.debug("purging entry pointing to subvolume trash: {0}".format(tgt)) + delink = True + try: + trashcan.purge(tgt, should_cancel) + except VolumeException as ve: + if not ve.errno == -errno.ENOENT: + delink = False + return ve.errno + finally: + if delink: + subvolume_purge(fs_client, volspec, volname, trashcan, tgt, should_cancel) + log.debug("purging trash link: {0}".format(purge_entry)) + trashcan.delink(purge_entry) + else: + log.debug("purging entry pointing to trash: {0}".format(pth)) + trashcan.purge(pth, should_cancel) + except cephfs.Error as e: + log.warn("failed to remove trash entry: {0}".format(e)) + except VolumeException as ve: + ret = ve.errno + return ret + + +class ThreadPoolPurgeQueueMixin(AsyncJobs): + """ + Purge queue mixin class maintaining a pool of threads for purging trash entries. + Subvolumes are chosen from volumes in a round robin fashion. If some of the purge + entries (belonging to a set of volumes) have huge directory tree's (such as, lots + of small files in a directory w/ deep directory trees), this model may lead to + _all_ threads purging entries for one volume (starving other volumes). + """ + def __init__(self, volume_client, tp_size): + self.vc = volume_client + super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "purgejob", tp_size) + + def get_next_job(self, volname, running_jobs): + return get_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, running_jobs) + + def execute_job(self, volname, job, should_cancel): + purge_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, job, should_cancel) |