summaryrefslogtreecommitdiffstats
path: root/crmsh/ocfs2.py
blob: 346cc5c20ef396eb231452bdc4e6f9c94d53c502 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
import re
from contextlib import contextmanager
from . import utils, sh
from . import bootstrap
from . import ra
from . import corosync
from . import log
from . import xmlutil
from . import constants
from .service_manager import ServiceManager

logger = log.setup_logger(__name__)
logger_utils = log.LoggerUtils(logger)


class OCFS2Manager(object):
    """
    Class to manage OCFS2 and configure related resources
    """
    RA_ID_PREFIX = "ocfs2-"
    DLM_RA_ID = "{}dlm".format(RA_ID_PREFIX)
    FS_RA_ID = "{}clusterfs".format(RA_ID_PREFIX)
    LVMLOCKD_RA_ID = "{}lvmlockd".format(RA_ID_PREFIX)
    LVMACTIVATE_RA_ID = "{}lvmactivate".format(RA_ID_PREFIX)
    GROUP_ID = "{}group".format(RA_ID_PREFIX)
    CLONE_ID = "{}clone".format(RA_ID_PREFIX)
    VG_ID = "{}vg".format(RA_ID_PREFIX)
    LV_ID = "{}lv".format(RA_ID_PREFIX)

    MAX_CLONE_NUM = 8
    # Note: using undocumented '-x' switch to avoid prompting if overwriting
    MKFS_CMD = "mkfs.ocfs2 --cluster-stack pcmk --cluster-name {} -N {} -x {}"
    HINTS_WHEN_RUNNING = """
The cluster service has already been initialized, but the prerequisites are missing
to configure OCFS2. Please fix it and use the stage procedure to configure OCFS2 separately,
e.g. crm cluster init ocfs2 -o <ocfs2_device>
    """

    def __init__(self, context):
        """
        Init function
        """
        self.ocfs2_devices = context.ocfs2_devices
        self.use_cluster_lvm2 = context.use_cluster_lvm2
        self.mount_point = context.mount_point
        self.use_stage = context.stage == "ocfs2"
        self.yes_to_all = context.yes_to_all
        self.cluster_name = None
        self.exist_ra_id_list = []
        self.vg_id = None
        self.group_id = None
        self.target_device = None

    def _verify_packages(self, use_cluster_lvm2=False):
        """
        Find if missing required package
        """
        required_packages = ["ocfs2-tools"]
        if use_cluster_lvm2:
            required_packages.append("lvm2-lockd")
        for pkg in required_packages:
            if not utils.package_is_installed(pkg):
                raise ValueError("Missing required package for configuring OCFS2: {}".format(pkg))

    def _verify_options(self):
        """
        Verify options related with OCFS2
        """
        if self.use_stage and not self.ocfs2_devices:
            raise ValueError("ocfs2 stage require -o option")
        if len(self.ocfs2_devices) > 1 and not self.use_cluster_lvm2:
            raise ValueError("Without Cluster LVM2 (-C option), -o option only support one device")
        if self.use_cluster_lvm2 and not self.ocfs2_devices:
            raise ValueError("-C option only valid together with -o option")
        if self.mount_point and utils.has_mount_point_used(self.mount_point):
            raise ValueError("Mount point {} already mounted".format(self.mount_point))

    def _verify_devices(self):
        """
        Verify ocfs2 devices
        """
        for dev in self.ocfs2_devices:
            if not utils.is_block_device(dev):
                raise ValueError("{} doesn't look like a block device".format(dev))
            if utils.is_dev_used_for_lvm(dev) and self.use_cluster_lvm2:
                raise ValueError("{} is a Logical Volume, cannot be used with the -C option".format(dev))
            if utils.has_disk_mounted(dev):
                raise ValueError("{} already mounted".format(dev))

    def _check_if_already_configured(self):
        """
        Check if ocfs2 related resource already configured
        """
        if not self.use_stage:
            return
        out = sh.cluster_shell().get_stdout_or_raise_error("crm configure show")
        if "fstype=ocfs2" in out:
            logger.info("Already configured OCFS2 related resources")
            raise utils.TerminateSubCommand

    def _static_verify(self):
        """
        Verify before configuring on init process
        """
        self._verify_packages(self.use_cluster_lvm2)
        self._check_if_already_configured()
        self._verify_options()
        self._verify_devices()

    def _dynamic_raise_error(self, error_msg):
        """
        Customize error message after cluster running
        """
        raise ValueError(error_msg + ("" if self.use_stage else self.HINTS_WHEN_RUNNING))

    def _check_sbd_and_ocfs2_dev(self):
        """
        Raise error when ocfs2 device is the same with sbd device
        """
        from . import sbd
        if ServiceManager().service_is_enabled("sbd.service"):
            sbd_device_list = sbd.SBDManager.get_sbd_device_from_config()
            for dev in self.ocfs2_devices:
                if dev in sbd_device_list:
                    self._dynamic_raise_error("{} cannot be the same with SBD device".format(dev))

    def _confirm_to_overwrite_ocfs2_dev(self):
        """
        Confirm to overwrit ocfs2 device on interactive mode
        """
        for dev in self.ocfs2_devices:
            msg = ""
            if utils.has_dev_partitioned(dev):
                msg = "Found a partition table in {}".format(dev)
            else:
                fs_type = utils.get_dev_fs_type(dev)
                if fs_type:
                    msg = "{} contains a {} file system".format(dev, fs_type)
            if msg and not bootstrap.confirm("{} - Proceed anyway?".format(msg)):
                raise utils.TerminateSubCommand

        for dev in self.ocfs2_devices:
            sh.cluster_shell().get_stdout_or_raise_error("wipefs -a {}".format(dev))

    def _dynamic_verify(self):
        """
        Verify after cluster running
        """
        if not utils.has_stonith_running():
            self._dynamic_raise_error("OCFS2 requires stonith device configured and running")

        self._check_sbd_and_ocfs2_dev()
        self._confirm_to_overwrite_ocfs2_dev()

    def _gen_ra_scripts(self, ra_type, kv):
        """
        Generate ra scripts
        Return id and scripts
        """
        config_scripts = ""
        kv["id"] = utils.gen_unused_id(self.exist_ra_id_list, kv["id"])
        config_scripts = ra.CONFIGURE_RA_TEMPLATE_DICT[ra_type].format(**kv)
        return kv["id"], config_scripts

    def _mkfs(self, target):
        """
        Creating OCFS2 filesystem for the target device
        """
        with logger_utils.status_long("  Creating OCFS2 filesystem for {}".format(target)):
            self.cluster_name = corosync.get_value('totem.cluster_name')
            sh.cluster_shell().get_stdout_or_raise_error(self.MKFS_CMD.format(self.cluster_name, self.MAX_CLONE_NUM, target))

    @contextmanager
    def _vg_change(self):
        """
        vgchange process using contextmanager
        """
        shell = sh.cluster_shell()
        shell.get_stdout_or_raise_error("vgchange -ay {}".format(self.vg_id))
        try:
            yield
        finally:
            shell.get_stdout_or_raise_error("vgchange -an {}".format(self.vg_id))

    def _create_lv(self):
        """
        Create PV, VG, LV and return LV path
        """
        disks_string = ' '.join(self.ocfs2_devices)
        shell = sh.cluster_shell()

        # Create PV
        with logger_utils.status_long("  Creating PV for {}".format(disks_string)):
            shell.get_stdout_or_raise_error("pvcreate {} -y".format(disks_string))

        # Create VG
        self.vg_id = utils.gen_unused_id(utils.get_all_vg_name(), self.VG_ID)
        with logger_utils.status_long("  Creating VG {}".format(self.vg_id)):
            shell.get_stdout_or_raise_error("vgcreate --shared {} {} -y".format(self.vg_id, disks_string))

        # Create LV
        with logger_utils.status_long("  Creating LV {} on VG {}".format(self.LV_ID, self.vg_id)):
            pe_number = utils.get_pe_number(self.vg_id)
            shell.get_stdout_or_raise_error("lvcreate -l {} {} -n {} -y".format(pe_number, self.vg_id, self.LV_ID))
 
        return "/dev/{}/{}".format(self.vg_id, self.LV_ID)

    def _gen_group_and_clone_scripts(self, ra_list):
        """
        Generate group and clone scripts
        """
        # Group
        group_kv = {"id":self.GROUP_ID, "ra_string":' '.join(ra_list)}
        self.group_id, group_scripts = self._gen_ra_scripts("GROUP", group_kv)
        # Clone
        clone_kv = {"id":self.CLONE_ID, "group_id":self.group_id}
        _, clone_scripts = self._gen_ra_scripts("CLONE", clone_kv)
        return group_scripts + clone_scripts

    def _gen_fs_scripts(self):
        """
        Generate Filesystem scripts
        """
        fs_kv = {
                "id": self.FS_RA_ID,
                "mnt_point": self.mount_point,
                "fs_type": "ocfs2",
                "device": self.target_device
                }
        return self._gen_ra_scripts("Filesystem", fs_kv)

    def _load_append_and_wait(self, scripts, res_id, msg, need_append=True):
        """
        Load scripts, append to exist group and wait resource started
        """
        bootstrap.crm_configure_load("update", scripts)
        if need_append:
            utils.append_res_to_group(self.group_id, res_id)
        bootstrap.wait_for_resource(msg, res_id)

    def _config_dlm(self):
        """
        Configure DLM resource
        """
        config_scripts = ""
        dlm_id, dlm_scripts = self._gen_ra_scripts("DLM", {"id":self.DLM_RA_ID})
        group_clone_scripts = self._gen_group_and_clone_scripts([dlm_id])
        config_scripts = dlm_scripts + group_clone_scripts
        self._load_append_and_wait(config_scripts, dlm_id, "  Wait for DLM({}) start".format(dlm_id), need_append=False)

    def _config_lvmlockd(self):
        """
        Configure LVMLockd resource
        """
        _id, _scripts = self._gen_ra_scripts("LVMLockd", {"id":self.LVMLOCKD_RA_ID})
        self._load_append_and_wait(_scripts, _id, "  Wait for LVMLockd({}) start".format(_id))

    def _config_lvmactivate(self):
        """
        Configure LVMActivate resource
        """
        _id, _scripts = self._gen_ra_scripts("LVMActivate", {"id": self.LVMACTIVATE_RA_ID, "vgname": self.vg_id})
        self._load_append_and_wait(_scripts, _id, "  Wait for LVMActivate({}) start".format(_id))

    def _config_fs(self):
        """
        Configure Filesystem resource
        """
        utils.mkdirp(self.mount_point)
        _id, _scripts = self._gen_fs_scripts()
        self._load_append_and_wait(_scripts, _id, "  Wait for Filesystem({}) start".format(_id))

    def _config_resource_stack_lvm2(self):
        """
        Configure dlm + lvmlockd + lvm-activate + Filesystem
        """
        self._config_dlm()
        self._config_lvmlockd()
        self.target_device = self._create_lv()
        with self._vg_change():
            self._mkfs(self.target_device)
        self._config_lvmactivate()
        self._config_fs()

    def _config_resource_stack_ocfs2_along(self):
        """
        Configure dlm + Filesystem
        """
        self._config_dlm()
        self.target_device = self.ocfs2_devices[0]
        self._mkfs(self.target_device)
        self._config_fs()

    def init_ocfs2(self):
        """
        OCFS2 configure process on init node
        """
        logger.info("Configuring OCFS2")
        self._dynamic_verify()
        self.exist_ra_id_list = utils.all_exist_id()

        no_quorum_policy_value = utils.get_property("no-quorum-policy")
        if not no_quorum_policy_value or no_quorum_policy_value != "freeze":
            utils.set_property("no-quorum-policy", "freeze")
            logger.info("  'no-quorum-policy' is changed to \"freeze\"")

        if self.use_cluster_lvm2:
            self._config_resource_stack_lvm2()
        else:
            self._config_resource_stack_ocfs2_along()
        logger.info("  OCFS2 device %s mounted on %s", self.target_device, self.mount_point)

    def _find_target_on_join(self, peer):
        """
        Find device name from OCF Filesystem param on peer node
        """
        out = sh.cluster_shell().get_stdout_or_raise_error("crm configure show", peer)
        for line in out.splitlines():
            if "fstype=ocfs2" in line:
                res = re.search("device=\"(.*?)\"", line)
                if res:
                    return res.group(1)
                else:
                    raise ValueError("Filesystem require configure device")
        return None

    def join_ocfs2(self, peer):
        """
        Called on join process, to verify ocfs2 environment
        """
        target = self._find_target_on_join(peer)
        if not target:
            return
        with logger_utils.status_long("Verify OCFS2 environment"):
            use_cluster_lvm2 = xmlutil.CrmMonXmlParser(peer).is_resource_configured(constants.LVMLOCKD_RA)
            self._verify_packages(use_cluster_lvm2)
            if utils.is_dev_a_plain_raw_disk_or_partition(target, peer):
                utils.compare_uuid_with_peer_dev([target], peer)

    @classmethod
    def verify_ocfs2(cls, ctx):
        """
        Verify OCFS2 related packages and environment
        """
        inst = cls(ctx)
        inst._static_verify()