summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/volumes/fs/purge_queue.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/volumes/fs/purge_queue.py
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/volumes/fs/purge_queue.py')
-rw-r--r--src/pybind/mgr/volumes/fs/purge_queue.py113
1 files changed, 113 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/purge_queue.py b/src/pybind/mgr/volumes/fs/purge_queue.py
new file mode 100644
index 000000000..abace19d0
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/purge_queue.py
@@ -0,0 +1,113 @@
+import errno
+import logging
+import os
+import stat
+
+import cephfs
+
+from .async_job import AsyncJobs
+from .exception import VolumeException
+from .operations.resolver import resolve_trash
+from .operations.template import SubvolumeOpType
+from .operations.group import open_group
+from .operations.subvolume import open_subvol
+from .operations.volume import open_volume, open_volume_lockless
+from .operations.trash import open_trashcan
+
+log = logging.getLogger(__name__)
+
+
+# helper for fetching a trash entry for a given volume
+def get_trash_entry_for_volume(fs_client, volspec, volname, running_jobs):
+ log.debug("fetching trash entry for volume '{0}'".format(volname))
+
+ try:
+ with open_volume_lockless(fs_client, volname) as fs_handle:
+ try:
+ with open_trashcan(fs_handle, volspec) as trashcan:
+ path = trashcan.get_trash_entry(running_jobs)
+ return 0, path
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ return 0, None
+ raise ve
+ except VolumeException as ve:
+ log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve))
+ return ve.errno, None
+
+
+def subvolume_purge(fs_client, volspec, volname, trashcan, subvolume_trash_entry, should_cancel):
+ groupname, subvolname = resolve_trash(volspec, subvolume_trash_entry.decode('utf-8'))
+ log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname))
+
+ try:
+ with open_volume(fs_client, volname) as fs_handle:
+ with open_group(fs_handle, volspec, groupname) as group:
+ with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
+ log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable))
+ if not subvolume.purgeable:
+ return
+ # this is fine under the global lock -- there are just a handful
+ # of entries in the subvolume to purge. moreover, the purge needs
+ # to be guarded since a create request might sneak in.
+ trashcan.purge(subvolume.base_path, should_cancel)
+ except VolumeException as ve:
+ if not ve.errno == -errno.ENOENT:
+ raise
+
+
+# helper for starting a purge operation on a trash entry
+def purge_trash_entry_for_volume(fs_client, volspec, volname, purge_entry, should_cancel):
+ log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname))
+
+ ret = 0
+ try:
+ with open_volume_lockless(fs_client, volname) as fs_handle:
+ with open_trashcan(fs_handle, volspec) as trashcan:
+ try:
+ pth = os.path.join(trashcan.path, purge_entry)
+ stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+ if stat.S_ISLNK(stx['mode']):
+ tgt = fs_handle.readlink(pth, 4096)
+ tgt = tgt[:stx['size']]
+ log.debug("purging entry pointing to subvolume trash: {0}".format(tgt))
+ delink = True
+ try:
+ trashcan.purge(tgt, should_cancel)
+ except VolumeException as ve:
+ if not ve.errno == -errno.ENOENT:
+ delink = False
+ return ve.errno
+ finally:
+ if delink:
+ subvolume_purge(fs_client, volspec, volname, trashcan, tgt, should_cancel)
+ log.debug("purging trash link: {0}".format(purge_entry))
+ trashcan.delink(purge_entry)
+ else:
+ log.debug("purging entry pointing to trash: {0}".format(pth))
+ trashcan.purge(pth, should_cancel)
+ except cephfs.Error as e:
+ log.warn("failed to remove trash entry: {0}".format(e))
+ except VolumeException as ve:
+ ret = ve.errno
+ return ret
+
+
+class ThreadPoolPurgeQueueMixin(AsyncJobs):
+ """
+ Purge queue mixin class maintaining a pool of threads for purging trash entries.
+ Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
+ entries (belonging to a set of volumes) have huge directory tree's (such as, lots
+ of small files in a directory w/ deep directory trees), this model may lead to
+ _all_ threads purging entries for one volume (starving other volumes).
+ """
+ def __init__(self, volume_client, tp_size):
+ self.vc = volume_client
+ super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "purgejob", tp_size)
+
+ def get_next_job(self, volname, running_jobs):
+ return get_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, running_jobs)
+
+ def execute_job(self, volname, job, should_cancel):
+ purge_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, job, should_cancel)