summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/migrations.py
blob: f4a3056b2f00140f414be29589bda4dc4480d5e4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import json
import logging
from typing import TYPE_CHECKING, Iterator, Optional, Dict, Any

from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
from cephadm.schedule import HostAssignment
import rados

from mgr_module import NFS_POOL_NAME
from orchestrator import OrchestratorError, DaemonDescription

if TYPE_CHECKING:
    from .module import CephadmOrchestrator

LAST_MIGRATION = 5

logger = logging.getLogger(__name__)


class Migrations:
    def __init__(self, mgr: "CephadmOrchestrator"):
        self.mgr = mgr

        # Why having a global counter, instead of spec versions?
        #
        # for the first migration:
        # The specs don't change in (this) migration. but the scheduler here.
        # Adding the version to the specs at this time just felt wrong to me.
        #
        # And the specs are only another part of cephadm which needs potential upgrades.
        # We have the cache, the inventory, the config store, the upgrade (imagine changing the
        # upgrade code, while an old upgrade is still in progress), naming of daemons,
        # fs-layout of the daemons, etc.
        if self.mgr.migration_current is None:
            self.set(LAST_MIGRATION)

        v = mgr.get_store('nfs_migration_queue')
        self.nfs_migration_queue = json.loads(v) if v else []

        # for some migrations, we don't need to do anything except for
        # incrementing migration_current.
        # let's try to shortcut things here.
        self.migrate(True)

    def set(self, val: int) -> None:
        self.mgr.set_module_option('migration_current', val)
        self.mgr.migration_current = val

    def is_migration_ongoing(self) -> bool:
        return self.mgr.migration_current != LAST_MIGRATION

    def verify_no_migration(self) -> None:
        if self.is_migration_ongoing():
            # this is raised in module.serve()
            raise OrchestratorError(
                "cephadm migration still ongoing. Please wait, until the migration is complete.")

    def migrate(self, startup: bool = False) -> None:
        if self.mgr.migration_current == 0:
            if self.migrate_0_1():
                self.set(1)

        if self.mgr.migration_current == 1:
            if self.migrate_1_2():
                self.set(2)

        if self.mgr.migration_current == 2 and not startup:
            if self.migrate_2_3():
                self.set(3)

        if self.mgr.migration_current == 3:
            if self.migrate_3_4():
                self.set(4)

        if self.mgr.migration_current == 4:
            if self.migrate_4_5():
                self.set(5)

    def migrate_0_1(self) -> bool:
        """
        Migration 0 -> 1
        New scheduler that takes PlacementSpec as the bound and not as recommendation.
        I.e. the new scheduler won't suggest any new placements outside of the hosts
        specified by label etc.

        Which means, we have to make sure, we're not removing any daemons directly after
        upgrading to the new scheduler.

        There is a potential race here:
        1. user updates his spec to remove daemons
        2. mgr gets upgraded to new scheduler, before the old scheduler removed the daemon
        3. now, we're converting the spec to explicit placement, thus reverting (1.)
        I think this is ok.
        """

        def interesting_specs() -> Iterator[ServiceSpec]:
            for s in self.mgr.spec_store.all_specs.values():
                if s.unmanaged:
                    continue
                p = s.placement
                if p is None:
                    continue
                if p.count is None:
                    continue
                if not p.hosts and not p.host_pattern and not p.label:
                    continue
                yield s

        def convert_to_explicit(spec: ServiceSpec) -> None:
            existing_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name())
            placements, to_add, to_remove = HostAssignment(
                spec=spec,
                hosts=self.mgr.inventory.all_specs(),
                unreachable_hosts=self.mgr._unreachable_hosts(),
                daemons=existing_daemons,
            ).place()

            # We have to migrate, only if the new scheduler would remove daemons
            if len(placements) >= len(existing_daemons):
                return

            def to_hostname(d: DaemonDescription) -> HostPlacementSpec:
                if d.hostname in old_hosts:
                    return old_hosts[d.hostname]
                else:
                    assert d.hostname
                    return HostPlacementSpec(d.hostname, '', '')

            old_hosts = {h.hostname: h for h in spec.placement.hosts}
            new_hosts = [to_hostname(d) for d in existing_daemons]

            new_placement = PlacementSpec(
                hosts=new_hosts,
                count=spec.placement.count
            )

            new_spec = ServiceSpec.from_json(spec.to_json())
            new_spec.placement = new_placement

            logger.info(f"Migrating {spec.one_line_str()} to explicit placement")

            self.mgr.spec_store.save(new_spec)

        specs = list(interesting_specs())
        if not specs:
            return True  # nothing to do. shortcut

        if not self.mgr.cache.daemon_cache_filled():
            logger.info("Unable to migrate yet. Daemon Cache still incomplete.")
            return False

        for spec in specs:
            convert_to_explicit(spec)

        return True

    def migrate_1_2(self) -> bool:
        """
        After 15.2.4, we unified some service IDs: MONs, MGRs etc no longer have a service id.
        Which means, the service names changed:

        mon.foo -> mon
        mgr.foo -> mgr

        This fixes the data structure consistency
        """
        bad_specs = {}
        for name, spec in self.mgr.spec_store.all_specs.items():
            if name != spec.service_name():
                bad_specs[name] = (spec.service_name(), spec)

        for old, (new, old_spec) in bad_specs.items():
            if new not in self.mgr.spec_store.all_specs:
                spec = old_spec
            else:
                spec = self.mgr.spec_store.all_specs[new]
            spec.unmanaged = True
            self.mgr.spec_store.save(spec)
            self.mgr.spec_store.finally_rm(old)

        return True

    def migrate_2_3(self) -> bool:
        if self.nfs_migration_queue:
            from nfs.cluster import create_ganesha_pool

            create_ganesha_pool(self.mgr)
            for service_id, pool, ns in self.nfs_migration_queue:
                if pool != '.nfs':
                    self.migrate_nfs_spec(service_id, pool, ns)
            self.nfs_migration_queue = []
            self.mgr.log.info('Done migrating all NFS services')
        return True

    def migrate_nfs_spec(self, service_id: str, pool: str, ns: Optional[str]) -> None:
        renamed = False
        if service_id.startswith('ganesha-'):
            service_id = service_id[8:]
            renamed = True

        self.mgr.log.info(
            f'Migrating nfs.{service_id} from legacy pool {pool} namespace {ns}'
        )

        # read exports
        ioctx = self.mgr.rados.open_ioctx(pool)
        if ns is not None:
            ioctx.set_namespace(ns)
        object_iterator = ioctx.list_objects()
        exports = []
        while True:
            try:
                obj = object_iterator.__next__()
                if obj.key.startswith('export-'):
                    self.mgr.log.debug(f'reading {obj.key}')
                    exports.append(obj.read().decode())
            except StopIteration:
                break
        self.mgr.log.info(f'Found {len(exports)} exports for legacy nfs.{service_id}')

        # copy grace file
        if service_id != ns:
            try:
                grace = ioctx.read("grace")
                new_ioctx = self.mgr.rados.open_ioctx(NFS_POOL_NAME)
                new_ioctx.set_namespace(service_id)
                new_ioctx.write_full("grace", grace)
                self.mgr.log.info('Migrated nfs-ganesha grace file')
            except rados.ObjectNotFound:
                self.mgr.log.debug('failed to read old grace file; skipping')

        if renamed and f'nfs.ganesha-{service_id}' in self.mgr.spec_store:
            # rename from nfs.ganesha-* to nfs.*.  This will destroy old daemons and
            # deploy new ones.
            self.mgr.log.info(f'Replacing nfs.ganesha-{service_id} with nfs.{service_id}')
            spec = self.mgr.spec_store[f'nfs.ganesha-{service_id}'].spec
            self.mgr.spec_store.rm(f'nfs.ganesha-{service_id}')
            spec.service_id = service_id
            self.mgr.spec_store.save(spec, True)

            # We have to remove the old daemons here as well, otherwise we'll end up with a port conflict.
            daemons = [d.name()
                       for d in self.mgr.cache.get_daemons_by_service(f'nfs.ganesha-{service_id}')]
            self.mgr.log.info(f'Removing old nfs.ganesha-{service_id} daemons {daemons}')
            self.mgr.remove_daemons(daemons)
        else:
            # redeploy all ganesha daemons to ensures that the daemon
            # cephx are correct AND container configs are set up properly
            daemons = [d.name() for d in self.mgr.cache.get_daemons_by_service(f'nfs.{service_id}')]
            self.mgr.log.info(f'Removing old nfs.{service_id} daemons {daemons}')
            self.mgr.remove_daemons(daemons)

            # re-save service spec (without pool and namespace properties!)
            spec = self.mgr.spec_store[f'nfs.{service_id}'].spec
            self.mgr.spec_store.save(spec)

        # import exports
        for export in exports:
            ex = ''
            for line in export.splitlines():
                if (
                        line.startswith('        secret_access_key =')
                        or line.startswith('        user_id =')
                ):
                    continue
                ex += line + '\n'
            self.mgr.log.debug(f'importing export: {ex}')
            ret, out, err = self.mgr.mon_command({
                'prefix': 'nfs export apply',
                'cluster_id': service_id
            }, inbuf=ex)
            if ret:
                self.mgr.log.warning(f'Failed to migrate export ({ret}): {err}\nExport was:\n{ex}')
        self.mgr.log.info(f'Done migrating nfs.{service_id}')

    def migrate_3_4(self) -> bool:
        # We can't set any host with the _admin label, but we're
        # going to warn when calling `ceph orch host rm...`
        if 'client.admin' not in self.mgr.keys.keys:
            self.mgr._client_keyring_set(
                entity='client.admin',
                placement='label:_admin',
            )
        return True

    def migrate_4_5(self) -> bool:
        registry_url = self.mgr.get_module_option('registry_url')
        registry_username = self.mgr.get_module_option('registry_username')
        registry_password = self.mgr.get_module_option('registry_password')
        if registry_url and registry_username and registry_password:

            registry_credentials = {'url': registry_url,
                                    'username': registry_username, 'password': registry_password}
            self.mgr.set_store('registry_credentials', json.dumps(registry_credentials))

            self.mgr.set_module_option('registry_url', None)
            self.mgr.check_mon_command({
                'prefix': 'config rm',
                'who': 'mgr',
                'key': 'mgr/cephadm/registry_url',
            })
            self.mgr.set_module_option('registry_username', None)
            self.mgr.check_mon_command({
                'prefix': 'config rm',
                'who': 'mgr',
                'key': 'mgr/cephadm/registry_username',
            })
            self.mgr.set_module_option('registry_password', None)
            self.mgr.check_mon_command({
                'prefix': 'config rm',
                'who': 'mgr',
                'key': 'mgr/cephadm/registry_password',
            })

            self.mgr.log.info('Done migrating registry login info')
        return True


def queue_migrate_nfs_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None:
    """
    After 16.2.5 we dropped the NFSServiceSpec pool and namespace properties.
    Queue up a migration to process later, once we are sure that RADOS is available
    and so on.
    """
    service_id = spec_dict['spec']['service_id']
    args = spec_dict['spec'].get('spec', {})
    pool = args.pop('pool', 'nfs-ganesha')
    ns = args.pop('namespace', service_id)
    queued = mgr.get_store('nfs_migration_queue') or '[]'
    ls = json.loads(queued)
    ls.append([service_id, pool, ns])
    mgr.set_store('nfs_migration_queue', json.dumps(ls))
    mgr.log.info(f'Queued nfs.{service_id} for migration')