summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/volumes/fs/purge_queue.py
blob: d67ef9af373f6fcaaf347c4ca5695ec42e78115b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import errno
import logging
import os
import stat

import cephfs

from .async_job import AsyncJobs
from .exception import VolumeException
from .operations.resolver import resolve_trash
from .operations.template import SubvolumeOpType
from .operations.group import open_group
from .operations.subvolume import open_subvol
from .operations.volume import open_volume, open_volume_lockless
from .operations.trash import open_trashcan

log = logging.getLogger(__name__)

# helper for fetching a trash entry for a given volume
def get_trash_entry_for_volume(fs_client, volspec, volname, running_jobs):
    log.debug("fetching trash entry for volume '{0}'".format(volname))

    try:
        with open_volume_lockless(fs_client, volname) as fs_handle:
            try:
                with open_trashcan(fs_handle, volspec) as trashcan:
                    path = trashcan.get_trash_entry(running_jobs)
                    return 0, path
            except VolumeException as ve:
                if ve.errno == -errno.ENOENT:
                    return 0, None
                raise ve
    except VolumeException as ve:
        log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve))
        return ve.errno, None

def subvolume_purge(fs_client, volspec, volname, trashcan, subvolume_trash_entry, should_cancel):
    groupname, subvolname = resolve_trash(volspec, subvolume_trash_entry.decode('utf-8'))
    log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname))

    try:
        with open_volume(fs_client, volname) as fs_handle:
            with open_group(fs_handle, volspec, groupname) as group:
                with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
                    log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable))
                    if not subvolume.purgeable:
                        return
                    # this is fine under the global lock -- there are just a handful
                    # of entries in the subvolume to purge. moreover, the purge needs
                    # to be guarded since a create request might sneak in.
                    trashcan.purge(subvolume.base_path, should_cancel)
    except VolumeException as ve:
        if not ve.errno == -errno.ENOENT:
            raise

# helper for starting a purge operation on a trash entry
def purge_trash_entry_for_volume(fs_client, volspec, volname, purge_entry, should_cancel):
    log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname))

    ret = 0
    try:
        with open_volume_lockless(fs_client, volname) as fs_handle:
            with open_trashcan(fs_handle, volspec) as trashcan:
                try:
                    pth = os.path.join(trashcan.path, purge_entry)
                    stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE,
                                          cephfs.AT_SYMLINK_NOFOLLOW)
                    if stat.S_ISLNK(stx['mode']):
                        tgt = fs_handle.readlink(pth, 4096)
                        tgt = tgt[:stx['size']]
                        log.debug("purging entry pointing to subvolume trash: {0}".format(tgt))
                        delink = True
                        try:
                            trashcan.purge(tgt, should_cancel)
                        except VolumeException as ve:
                            if not ve.errno == -errno.ENOENT:
                                delink = False
                                return ve.errno
                        finally:
                            if delink:
                                subvolume_purge(fs_client, volspec, volname, trashcan, tgt, should_cancel)
                                log.debug("purging trash link: {0}".format(purge_entry))
                                trashcan.delink(purge_entry)
                    else:
                        log.debug("purging entry pointing to trash: {0}".format(pth))
                        trashcan.purge(pth, should_cancel)
                except cephfs.Error as e:
                    log.warn("failed to remove trash entry: {0}".format(e))
    except VolumeException as ve:
        ret = ve.errno
    return ret

class ThreadPoolPurgeQueueMixin(AsyncJobs):
    """
    Purge queue mixin class maintaining a pool of threads for purging trash entries.
    Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
    entries (belonging to a set of volumes) have huge directory tree's (such as, lots
    of small files in a directory w/ deep directory trees), this model may lead to
    _all_ threads purging entries for one volume (starving other volumes).
    """
    def __init__(self, volume_client, tp_size):
        self.vc = volume_client
        super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size)

    def get_next_job(self, volname, running_jobs):
        return get_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, running_jobs)

    def execute_job(self, volname, job, should_cancel):
        purge_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, job, should_cancel)