summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/mirroring/fs/dir_map/update.py
blob: a70baa01af56aa305973231ba16fe2109fe827c1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import errno
import pickle
import logging

import rados

from ..utils import MIRROR_OBJECT_NAME, DIRECTORY_MAP_PREFIX, \
    INSTANCE_ID_PREFIX, MIRROR_OBJECT_PREFIX

log = logging.getLogger(__name__)

MAX_UPDATE = 256

class UpdateDirMapRequest:
    def __init__(self, ioctx, update_mapping, removals, on_finish_callback):
        self.ioctx = ioctx
        self.update_mapping = update_mapping
        self.removals = removals
        self.on_finish_callback = on_finish_callback

    @staticmethod
    def omap_key(dir_path):
        return f'{DIRECTORY_MAP_PREFIX}{dir_path}'

    def send(self):
        log.info('updating image map')
        self.send_update()

    def send_update(self):
        log.debug(f'pending updates: {len(self.update_mapping)}+{len(self.removals)}')
        try:
            with rados.WriteOpCtx() as write_op:
                keys = []
                vals = []
                dir_keys = list(self.update_mapping.keys())[0:MAX_UPDATE]
                # gather updates
                for dir_path in dir_keys:
                    mapping = self.update_mapping.pop(dir_path)
                    keys.append(UpdateDirMapRequest.omap_key(dir_path))
                    vals.append(pickle.dumps(mapping))
                    self.ioctx.set_omap(write_op, tuple(keys), tuple(vals))
                # gather deletes
                slicept = MAX_UPDATE - len(dir_keys)
                removals = [UpdateDirMapRequest.omap_key(dir_path) for dir_path in self.removals[0:slicept]]
                self.removals = self.removals[slicept:]
                self.ioctx.remove_omap_keys(write_op, tuple(removals))
                log.debug(f'applying {len(keys)} updates, {len(removals)} deletes')
                self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update)
        except rados.Error as e:
            log.error(f'UpdateDirMapRequest.send_update exception: {e}')
            self.finish(-e.args[0])

    def handle_update(self, completion):
        r = completion.get_return_value()
        log.debug(f'handle_update: r={r}')
        if not r == 0:
            self.finish(r)
        elif self.update_mapping or self.removals:
            self.send_update()
        else:
            self.finish(0)

    def finish(self, r):
        log.info(f'finish: r={r}')
        self.on_finish_callback(r)

class UpdateInstanceRequest:
    def __init__(self, ioctx, instances_added, instances_removed, on_finish_callback):
        self.ioctx = ioctx
        self.instances_added = instances_added
        # purge vs remove: purge list is for purging on-disk instance
        # object. remove is for purging instance map.
        self.instances_removed = instances_removed.copy()
        self.instances_purge = instances_removed.copy()
        self.on_finish_callback = on_finish_callback

    @staticmethod
    def omap_key(instance_id):
        return f'{INSTANCE_ID_PREFIX}{instance_id}'

    @staticmethod
    def cephfs_mirror_object_name(instance_id):
        assert instance_id != ''
        return f'{MIRROR_OBJECT_PREFIX}.{instance_id}'

    def send(self):
        log.info('updating instances')
        self.send_update()

    def send_update(self):
        self.remove_instance_object()

    def remove_instance_object(self):
        log.debug(f'pending purges: {len(self.instances_purge)}')
        if not self.instances_purge:
            self.update_instance_map()
            return
        instance_id = self.instances_purge.pop()
        self.ioctx.aio_remove(
            UpdateInstanceRequest.cephfs_mirror_object_name(instance_id), oncomplete=self.handle_remove)

    def handle_remove(self, completion):
        r = completion.get_return_value()
        log.debug(f'handle_remove: r={r}')
        # cephfs-mirror instances remove their respective instance
        # objects upon termination. so we handle ENOENT here. note
        # that when an instance is blocklisted, it wont be able to
        # purge its instance object, so we do it on its behalf.
        if not r == 0 and not r == -errno.ENOENT:
            self.finish(r)
            return
        self.remove_instance_object()

    def update_instance_map(self):
        log.debug(f'pending updates: {len(self.instances_added)}+{len(self.instances_removed)}')
        try:
            with rados.WriteOpCtx() as write_op:
                keys = []
                vals = []
                instance_ids = list(self.instances_added.keys())[0:MAX_UPDATE]
                # gather updates
                for instance_id in instance_ids:
                    data = self.instances_added.pop(instance_id)
                    keys.append(UpdateInstanceRequest.omap_key(instance_id))
                    vals.append(pickle.dumps(data))
                    self.ioctx.set_omap(write_op, tuple(keys), tuple(vals))
                # gather deletes
                slicept = MAX_UPDATE - len(instance_ids)
                removals = [UpdateInstanceRequest.omap_key(instance_id) \
                            for instance_id in self.instances_removed[0:slicept]]
                self.instances_removed = self.instances_removed[slicept:]
                self.ioctx.remove_omap_keys(write_op, tuple(removals))
                log.debug(f'applying {len(keys)} updates, {len(removals)} deletes')
                self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update)
        except rados.Error as e:
            log.error(f'UpdateInstanceRequest.update_instance_map exception: {e}')
            self.finish(-e.args[0])

    def handle_update(self, completion):
        r = completion.get_return_value()
        log.debug(f'handle_update: r={r}')
        if not r == 0:
            self.finish(r)
        elif self.instances_added or self.instances_removed:
            self.update_instance_map()
        else:
            self.finish(0)

    def finish(self, r):
        log.info(f'finish: r={r}')
        self.on_finish_callback(r)