summaryrefslogtreecommitdiffstats
path: root/crmsh/ui_cluster.py
blob: bff5b574388116c660623a3c8073e71b0685ae0e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
# Copyright (C) 2008-2011 Dejan Muhamedagic <dmuhamedagic@suse.de>
# Copyright (C) 2013 Kristoffer Gronlund <kgronlund@suse.com>
# See COPYING for license information.

import sys
import re
import argparse
import typing
from argparse import ArgumentParser, RawDescriptionHelpFormatter

import crmsh.parallax
from . import command, sh
from . import utils
from . import scripts
from . import completers as compl
from . import bootstrap
from . import corosync
from . import qdevice
from .cibconfig import cib_factory
from .prun import prun
from .service_manager import ServiceManager
from .sh import ShellUtils
from .ui_node import parse_option_for_nodes
from . import constants


from . import log
logger = log.setup_logger(__name__)


def parse_options(parser, args):
    try:
        options, args = parser.parse_known_args(list(args))
    except:
        return None, None
    if hasattr(options, 'help') and options.help:
        parser.print_help()
        return None, None
    utils.check_empty_option_value(options)
    return options, args


def _remove_completer(args):
    try:
        n = utils.list_cluster_nodes()
    except:
        n = []
    for node in args[1:]:
        if node in n:
            n.remove(node)
    return scripts.param_completion_list('remove') + n


def script_printer():
    from .ui_script import ConsolePrinter
    return ConsolePrinter()


def script_args(args):
    from .ui_script import _nvpairs2parameters
    return _nvpairs2parameters(args)


def get_cluster_name():
    cluster_name = None
    if not ServiceManager(sh.ClusterShellAdaptorForLocalShell(sh.LocalShell())).service_is_active("corosync.service"):
        name = corosync.get_values('totem.cluster_name')
        if name:
            cluster_name = name[0]
    else:
        cluster_name = cib_factory.get_property('cluster-name')
    return cluster_name


class ArgparseCustomizableAction(argparse.Action):
    def __call__(self, parser, namespace, value, option_string=None):
        previous_value = getattr(namespace, self.dest, None)
        parsed_value = self.parse(parser, previous_value, value, option_string=None)
        self.validate(parser, parsed_value, option_string)
        setattr(namespace, self.dest, parsed_value)

    def parse(self, parser, previous_value, raw_value, option_string):
        """Parse one argument and return the parsed value.

        Arguments:
            previous_value: The previous value hold in the destinating attribute.
            raw_value: The string value to be parse.
            option_string: The command-line option string associated with this action.
        """
        raise NotImplementedError

    def validate(self, parser, parsed_value, option_string):
        pass


class ArgparseActionSplitAndAppendParseMixin(ArgparseCustomizableAction):
    """Parse `--foo a;b --foo "c d" --foo e` into ['a', 'b', 'c', 'd', 'e']"""
    def parse(self, parser, previous_value, raw_value, option_string):
        items = previous_value if previous_value is not None else []
        items.extend([x for x in re.split("[; ]", raw_value) if x])
        return items


class ArgparseActionUniqueListItemValidateMixin(ArgparseCustomizableAction):
    """Validate te uniqueness of parsed list items."""
    def validate(self, parser, parsed_value, option_string):
        if len(parsed_value) != len(set(parsed_value)):
            parser.error(f"Duplicated input for '{'/'.join(self.option_strings)}' option")


class CustomAppendAction(ArgparseActionSplitAndAppendParseMixin, ArgparseActionUniqueListItemValidateMixin):
    """
    Custom class for argparse append action:
    - Flatten the value like '-s "/dev/sda1;/dev/sda2"'
    - Detect duplicated input
    """
    pass


class ArgparseActionUniqueHostInListItemValidateMixin(ArgparseCustomizableAction):
    """Validate the uniqueness of hosts in a parsed list in the format of 'user@host'"""
    def validate(self, parser, parsed_value, option_string):
        known_hosts = set()
        for item in parsed_value:
            match = re.match("^(?:[^@]+@)?([^@]+)$", item)
            if match is None:
                parser.error("Malformed value for option {} [<user>@]<host>: {}.".format(
                    '/'.join(self.option_strings), parsed_value
                ))
            host = match.group(1)
            if host in known_hosts:
                parser.error("Duplicated host in option {}: {}".format(
                    '/'.join(self.option_strings), parsed_value
                ))
            known_hosts.add(host)


class ArgparseUserAtHostAppendAction(
    ArgparseActionSplitAndAppendParseMixin,
    ArgparseActionUniqueHostInListItemValidateMixin,
):
    pass



class Cluster(command.UI):
    '''
    Whole cluster management.

    - Package installation
    - System configuration
    - Network troubleshooting
    - Perform other callouts/cluster-wide devops operations
    '''
    name = "cluster"

    def requires(self):
        return True

    def __init__(self):
        command.UI.__init__(self)
        # ugly hack to allow overriding the node list
        # for the cluster commands that operate before
        # there is an actual cluster
        self._inventory_nodes = None
        self._inventory_target = None

    @command.skill_level('administrator')
    def do_start(self, context, *args):
        '''
        Starts the cluster stack on all nodes or specific node(s)
        '''
        service_check_list = ["pacemaker.service"]
        start_qdevice = False
        if utils.is_qdevice_configured():
            start_qdevice = True
            service_check_list.append("corosync-qdevice.service")

        service_manager = ServiceManager()
        node_list = parse_option_for_nodes(context, *args)
        for node in node_list[:]:
            if all([service_manager.service_is_active(srv, remote_addr=node) for srv in service_check_list]):
                logger.info("The cluster stack already started on {}".format(node))
                node_list.remove(node)
        if not node_list:
            return

        if start_qdevice:
            service_manager.start_service("corosync-qdevice", node_list=node_list)
        node_list = bootstrap.start_pacemaker(node_list)
        if start_qdevice:
            qdevice.QDevice.check_qdevice_vote()
        for node in node_list:
            logger.info("The cluster stack started on {}".format(node))

    @staticmethod
    def _node_ready_to_stop_cluster_service(node):
        """
        Check if the specific node is ready to stop cluster service

        If both corosync.service and pacemaker.service is active, return True
        If some services started, stop them first and return False
        """
        service_manager = ServiceManager()

        corosync_active = service_manager.service_is_active("corosync.service", remote_addr=node)
        sbd_active = service_manager.service_is_active("sbd.service", remote_addr=node)
        pacemaker_active = service_manager.service_is_active("pacemaker.service", remote_addr=node)

        if not corosync_active:
            if sbd_active:
                service_manager.stop_service("corosync", remote_addr=node)
                logger.info(f"The cluster stack stopped on {node}")
            else:
                logger.info(f"The cluster stack already stopped on {node}")
            return False

        elif not pacemaker_active:
            service_manager.stop_service("corosync", remote_addr=node)
            logger.info("The cluster stack stopped on {}".format(node))
            return False

        return True

    @staticmethod
    def _wait_for_dc(node=None):
        """
        Wait for the cluster's DC to become available
        """
        if not ServiceManager().service_is_active("pacemaker.service", remote_addr=node):
            return

        dc_deadtime = utils.get_property("dc-deadtime", peer=node) or str(constants.DC_DEADTIME_DEFAULT)
        dc_timeout = int(dc_deadtime.strip('s')) + 5
        try:
            utils.check_function_with_timeout(utils.get_dc, wait_timeout=dc_timeout, peer=node)
        except TimeoutError:
            logger.error("No DC found currently, please wait if the cluster is still starting")
            raise utils.TerminateSubCommand

    @staticmethod
    def _set_dlm(node=None):
        """
        When dlm running and quorum is lost, before stop cluster service, should set
        enable_quorum_fencing=0, enable_quorum_lockspace=0 for dlm config option
        """
        if utils.is_dlm_running(node) and not utils.is_quorate(node):
            logger.debug("Quorum is lost; Set enable_quorum_fencing=0 and enable_quorum_lockspace=0 for dlm")
            utils.set_dlm_option(peer=node, enable_quorum_fencing=0, enable_quorum_lockspace=0)

    @command.skill_level('administrator')
    def do_stop(self, context, *args):
        '''
        Stops the cluster stack on all nodes or specific node(s)
        '''
        node_list = parse_option_for_nodes(context, *args)
        node_list = [n for n in node_list if self._node_ready_to_stop_cluster_service(n)]
        if not node_list:
            return
        logger.debug(f"stop node list: {node_list}")

        self._wait_for_dc(node_list[0])

        self._set_dlm(node_list[0])

        service_manager = ServiceManager()
        # Stop pacemaker since it can make sure cluster has quorum until stop corosync
        node_list = service_manager.stop_service("pacemaker", node_list=node_list)
        # Then, stop qdevice if is active
        if service_manager.service_is_active("corosync-qdevice.service"):
            service_manager.stop_service("corosync-qdevice.service", node_list=node_list)
        # Last, stop corosync
        node_list = service_manager.stop_service("corosync", node_list=node_list)

        for node in node_list:
            logger.info("The cluster stack stopped on {}".format(node))

    @command.skill_level('administrator')
    def do_restart(self, context, *args):
        '''
        Restarts the cluster stack on all nodes or specific node(s)
        '''
        parse_option_for_nodes(context, *args)
        self.do_stop(context, *args)
        self.do_start(context, *args)

    @command.skill_level('administrator')
    def do_enable(self, context, *args):
        '''
        Enable the cluster services on this node
        '''
        node_list = parse_option_for_nodes(context, *args)
        service_manager = ServiceManager()
        node_list = service_manager.enable_service("pacemaker.service", node_list=node_list)
        if service_manager.service_is_available("corosync-qdevice.service") and utils.is_qdevice_configured():
            service_manager.enable_service("corosync-qdevice.service", node_list=node_list)
        for node in node_list:
            logger.info("Cluster services enabled on %s", node)

    @command.skill_level('administrator')
    def do_disable(self, context, *args):
        '''
        Disable the cluster services on this node
        '''
        node_list = parse_option_for_nodes(context, *args)
        service_manager = ServiceManager()
        node_list = service_manager.disable_service("pacemaker.service", node_list=node_list)
        service_manager.disable_service("corosync-qdevice.service", node_list=node_list)
        for node in node_list:
            logger.info("Cluster services disabled on %s", node)

    def _args_implicit(self, context, args, name):
        '''
        handle early non-nvpair arguments as
        values in an implicit list
        '''
        args = list(args)
        vals = []
        while args and args[0].find('=') == -1:
            vals.append(args[0])
            args = args[1:]
        if vals:
            return args + ['%s=%s' % (name, ','.join(vals))]
        return args

    # @command.completers_repeating(compl.call(scripts.param_completion_list, 'init'))
    @command.skill_level('administrator')
    def do_init(self, context, *args):
        '''
        Initialize a cluster.
        '''
        def looks_like_hostnames(lst):
            sectionlist = bootstrap.INIT_STAGES
            return all(not (l.startswith('-') or l in sectionlist) for l in lst)
        if len(args) > 0:
            if '--dry-run' in args or looks_like_hostnames(args):
                args = ['--yes', '--nodes'] + [arg for arg in args if arg != '--dry-run']
        parser = ArgumentParser(description="""
Initialize a cluster from scratch. This command configures
a complete cluster, and can also add additional cluster
nodes to the initial one-node cluster using the --nodes
option.""", usage="init [options] [STAGE]", epilog="""

Stage can be one of:
    ssh         Create SSH keys for passwordless SSH between cluster nodes
    csync2      Configure csync2
    corosync    Configure corosync
    sbd         Configure SBD (requires -s <dev>)
    cluster     Bring the cluster online
    ocfs2       Configure OCFS2 (requires -o <dev>) NOTE: this is a Technical Preview
    vgfs        Create volume group and filesystem (ocfs2 template only,
                    requires -o <dev>) NOTE: this stage is an alias of ocfs2 stage
    admin       Create administration virtual IP (optional)
    qdevice     Configure qdevice and qnetd

Note:
  - If stage is not specified, the script will run through each stage
    in sequence, with prompts for required information.

Examples:
  # Setup the cluster on the current node
  crm cluster init -y

  # Setup the cluster with multiple nodes
  (NOTE: the current node will be part of the cluster even not listed in the -N option as below)
  crm cluster init -N node1 -N node2 -N node3 -y

  # Setup the cluster on the current node, with two network interfaces
  crm cluster init -i eth1 -i eth2 -y

  # Setup the cluster on the current node, with disk-based SBD
  crm cluster init -s <share disk> -y

  # Setup the cluster on the current node, with diskless SBD
  crm cluster init -S  -y

  # Setup the cluster on the current node, with QDevice
  crm cluster init --qnetd-hostname <qnetd addr> -y

  # Setup the cluster on the current node, with SBD+OCFS2
  crm cluster init -s <share disk1> -o <share disk2> -y

  # Setup the cluster on the current node, with SBD+OCFS2+Cluster LVM
  crm cluster init -s <share disk1> -o <share disk2> -o <share disk3> -C -y

  # Add SBD on a running cluster
  crm cluster init sbd -s <share disk> -y

  # Replace SBD device on a running cluster which already configured SBD
  crm -F cluster init sbd -s <share disk> -y

  # Add diskless SBD on a running cluster
  crm cluster init sbd -S -y

  # Add QDevice on a running cluster
  crm cluster init qdevice --qnetd-hostname <qnetd addr> -y

  # Add OCFS2+Cluster LVM on a running cluster
  crm cluster init ocfs2 -o <share disk1> -o <share disk2> -C -y
""", add_help=False, formatter_class=RawDescriptionHelpFormatter)

        parser.add_argument("-h", "--help", action="store_true", dest="help", help="Show this help message")
        parser.add_argument("-q", "--quiet", action="store_true", dest="quiet",
                            help="Be quiet (don't describe what's happening, just do it)")
        parser.add_argument("-y", "--yes", action="store_true", dest="yes_to_all",
                            help='Answer "yes" to all prompts (use with caution, this is destructive, especially those storage related configurations and stages.)')
        parser.add_argument("-n", "--name", metavar="NAME", dest="cluster_name", default="hacluster",
                            help='Set the name of the configured cluster.')
        parser.add_argument("-N", "--node", metavar="[USER@]HOST", dest="user_at_node_list", action=ArgparseUserAtHostAppendAction, default=[],
                            help='The member node of the cluster. Note: the current node is always get initialized during bootstrap in the beginning.')
        parser.add_argument("-S", "--enable-sbd", dest="diskless_sbd", action="store_true",
                            help="Enable SBD even if no SBD device is configured (diskless mode)")
        parser.add_argument("-w", "--watchdog", dest="watchdog", metavar="WATCHDOG",
                            help="Use the given watchdog device or driver name")
        parser.add_argument("-x", "--skip-csync2-sync", dest="skip_csync2", action="store_true",
                            help="Skip csync2 initialization (an experimental option)")
        parser.add_argument("--no-overwrite-sshkey", action="store_true", dest="no_overwrite_sshkey",
                            help='Avoid "/root/.ssh/id_rsa" overwrite if "-y" option is used (False by default; Deprecated)')
        parser.add_argument('--use-ssh-agent', action='store_true', dest='use_ssh_agent',
                            help="Use an existing key from ssh-agent instead of creating new key pairs")

        network_group = parser.add_argument_group("Network configuration", "Options for configuring the network and messaging layer.")
        network_group.add_argument("-i", "--interface", dest="nic_list", metavar="IF", action=CustomAppendAction, choices=utils.interface_choice(), default=[],
                                   help="Bind to IP address on interface IF. Use -i second time for second interface")
        network_group.add_argument("-u", "--unicast", action="store_true", dest="unicast",
                                   help="Configure corosync to communicate over unicast(udpu). This is the default transport type")
        network_group.add_argument("-U", "--multicast", action="store_true", dest="multicast",
                                   help="Configure corosync to communicate over multicast. Default is unicast")
        network_group.add_argument("-A", "--admin-ip", dest="admin_ip", metavar="IP",
                                   help="Configure IP address as an administration virtual IP")
        network_group.add_argument("-M", "--multi-heartbeats", action="store_true", dest="second_heartbeat",
                                   help="Configure corosync with second heartbeat line")
        network_group.add_argument("-I", "--ipv6", action="store_true", dest="ipv6",
                                   help="Configure corosync use IPv6")

        qdevice_group = parser.add_argument_group("QDevice configuration", re.sub('  ', '', constants.QDEVICE_HELP_INFO) + "\n\nOptions for configuring QDevice and QNetd.")
        qdevice_group.add_argument("--qnetd-hostname", dest="qnetd_addr", metavar="[USER@]HOST",
                                   help="User and host of the QNetd server. The host can be specified in either hostname or IP address.")
        qdevice_group.add_argument("--qdevice-port", dest="qdevice_port", metavar="PORT", type=int, default=5403,
                                   help="TCP PORT of QNetd server (default:5403)")
        qdevice_group.add_argument("--qdevice-algo", dest="qdevice_algo", metavar="ALGORITHM", default="ffsplit", choices=['ffsplit', 'lms'],
                                   help="QNetd decision ALGORITHM (ffsplit/lms, default:ffsplit)")
        qdevice_group.add_argument("--qdevice-tie-breaker", dest="qdevice_tie_breaker", metavar="TIE_BREAKER", default="lowest",
                                   help="QNetd TIE_BREAKER (lowest/highest/valid_node_id, default:lowest)")
        qdevice_group.add_argument("--qdevice-tls", dest="qdevice_tls", metavar="TLS", default="on", choices=['on', 'off', 'required'],
                                   help="Whether using TLS on QDevice/QNetd (on/off/required, default:on)")
        qdevice_group.add_argument("--qdevice-heuristics", dest="qdevice_heuristics", metavar="COMMAND",
                                   help="COMMAND to run with absolute path. For multiple commands, use \";\" to separate (details about heuristics can see man 8 corosync-qdevice)")
        qdevice_group.add_argument("--qdevice-heuristics-mode", dest="qdevice_heuristics_mode", metavar="MODE", choices=['on', 'sync', 'off'],
                                   help="MODE of operation of heuristics (on/sync/off, default:sync)")

        storage_group = parser.add_argument_group("Storage configuration", "Options for configuring shared storage.")
        storage_group.add_argument("-s", "--sbd-device", dest="sbd_devices", metavar="DEVICE", action=CustomAppendAction, default=[],
                                   help="Block device to use for SBD fencing, use \";\" as separator or -s multiple times for multi path (up to 3 devices)")
        storage_group.add_argument("-o", "--ocfs2-device", dest="ocfs2_devices", metavar="DEVICE", action=CustomAppendAction, default=[],
                help="Block device to use for OCFS2; When using Cluster LVM2 to manage the shared storage, user can specify one or multiple raw disks, use \";\" as separator or -o multiple times for multi path (must specify -C option) NOTE: this is a Technical Preview")
        storage_group.add_argument("-C", "--cluster-lvm2", action="store_true", dest="use_cluster_lvm2",
                help="Use Cluster LVM2 (only valid together with -o option) NOTE: this is a Technical Preview")
        storage_group.add_argument("-m", "--mount-point", dest="mount_point", metavar="MOUNT", default="/srv/clusterfs",
                help="Mount point for OCFS2 device (default is /srv/clusterfs, only valid together with -o option) NOTE: this is a Technical Preview")

        options, args = parse_options(parser, args)
        if options is None or args is None:
            return

        stage = ""
        if len(args):
            stage = args[0]
        if stage == "vgfs":
            stage = "ocfs2"
            logger.warning("vgfs stage was deprecated and is an alias of ocfs2 stage now")
        if stage not in bootstrap.INIT_STAGES and stage != "":
            parser.error("Invalid stage (%s)" % (stage))

        if options.qnetd_addr:
            if not ServiceManager().service_is_available("corosync-qdevice.service"):
                utils.fatal("corosync-qdevice.service is not available")
            if options.qdevice_heuristics_mode and not options.qdevice_heuristics:
                parser.error("Option --qdevice-heuristics is required if want to configure heuristics mode")
            options.qdevice_heuristics_mode = options.qdevice_heuristics_mode or "sync"
        elif re.search("--qdevice-.*", ' '.join(sys.argv)) or (stage == "qdevice" and options.yes_to_all):
            parser.error("Option --qnetd-hostname is required if want to configure qdevice")

        # if options.geo and options.name == "hacluster":
        #    parser.error("For a geo cluster, each cluster must have a unique name (use --name to set)")
        boot_context = bootstrap.Context.set_context(options)
        boot_context.ui_context = context
        boot_context.stage = stage
        boot_context.args = args
        boot_context.cluster_is_running = ServiceManager(sh.ClusterShellAdaptorForLocalShell(sh.LocalShell())).service_is_active("pacemaker.service")
        boot_context.type = "init"
        boot_context.initialize_qdevice()
        boot_context.validate_option()

        bootstrap.bootstrap_init(boot_context)
        bootstrap.bootstrap_add(boot_context)

        return True

    @command.skill_level('administrator')
    def do_join(self, context, *args):
        '''
        Join this node to an existing cluster
        '''
        parser = ArgumentParser(description="""
Join the current node to an existing cluster. The
current node cannot be a member of a cluster already.
Pass any node in the existing cluster as the argument
to the -c option.""",usage="join [options] [STAGE]", epilog="""

Stage can be one of:
    ssh         Obtain SSH keys from existing cluster node (requires -c <host>)
    csync2      Configure csync2 (requires -c <host>)
    ssh_merge   Merge root's SSH known_hosts across all nodes (csync2 must
                already be configured).
    cluster     Start the cluster on this node

If stage is not specified, each stage will be invoked in sequence.

Examples:
  # Join with a cluster node
  crm cluster join -c <node> -y

  # Join with a cluster node, with the same network interface used by that node
  crm cluster join -c <node> -i eth1 -i eth2 -y
""", add_help=False, formatter_class=RawDescriptionHelpFormatter)
        parser.add_argument("-h", "--help", action="store_true", dest="help", help="Show this help message")
        parser.add_argument("-q", "--quiet", help="Be quiet (don't describe what's happening, just do it)", action="store_true", dest="quiet")
        parser.add_argument("-y", "--yes", help='Answer "yes" to all prompts (use with caution)', action="store_true", dest="yes_to_all")
        parser.add_argument("-w", "--watchdog", dest="watchdog", metavar="WATCHDOG", help="Use the given watchdog device")
        parser.add_argument('--use-ssh-agent', action='store_true', dest='use_ssh_agent',
                            help="Use an existing key from ssh-agent instead of creating new key pairs")

        network_group = parser.add_argument_group("Network configuration", "Options for configuring the network and messaging layer.")
        network_group.add_argument(
            "-c", "--cluster-node", metavar="[USER@]HOST", dest="cluster_node",
            help="User and host to login to an existing cluster node. The host can be specified with either a hostname or an IP.",
        )
        network_group.add_argument("-i", "--interface", dest="nic_list", metavar="IF", action=CustomAppendAction, choices=utils.interface_choice(), default=[],
                help="Bind to IP address on interface IF. Use -i second time for second interface")
        options, args = parse_options(parser, args)
        if options is None or args is None:
            return

        stage = ""
        if len(args) == 1:
            stage = args[0]
        if stage not in ("ssh", "csync2", "ssh_merge", "cluster", ""):
            parser.error("Invalid stage (%s)" % (stage))

        join_context = bootstrap.Context.set_context(options)
        join_context.ui_context = context
        join_context.stage = stage
        join_context.type = "join"
        join_context.validate_option()

        bootstrap.bootstrap_join(join_context)

        return True

    @command.alias("delete")
    @command.completers_repeating(_remove_completer)
    @command.skill_level('administrator')
    def do_remove(self, context, *args):
        '''
        Remove the given node(s) from the cluster.
        '''
        parser = ArgumentParser(description="""
Remove one or more nodes from the cluster.

This command can remove the last node in the cluster,
thus effectively removing the whole cluster. To remove
the last node, pass --force argument to crm or set
the config.core.force option.""",
                usage="remove [options] [<node> ...]", add_help=False, formatter_class=RawDescriptionHelpFormatter)
        parser.add_argument("-h", "--help", action="store_true", dest="help", help="Show this help message")
        parser.add_argument("-q", "--quiet", help="Be quiet (don't describe what's happening, just do it)", action="store_true", dest="quiet")
        parser.add_argument("-y", "--yes", help='Answer "yes" to all prompts (use with caution)', action="store_true", dest="yes_to_all")
        parser.add_argument("-c", "--cluster-node", dest="cluster_node", help="IP address or hostname of cluster node which will be deleted", metavar="HOST")
        parser.add_argument("-F", "--force", dest="force", help="Remove current node", action="store_true")
        parser.add_argument("--qdevice", dest="qdevice_rm_flag", help="Remove QDevice configuration and service from cluster", action="store_true")
        options, args = parse_options(parser, args)
        if options is None or args is None:
            return

        if options.cluster_node is not None and options.cluster_node not in args:
            args = list(args) + [options.cluster_node]

        rm_context = bootstrap.Context.set_context(options)
        rm_context.ui_context = context

        if len(args) == 0:
            bootstrap.bootstrap_remove(rm_context)
        else:
            for node in args:
                rm_context.cluster_node = node
                bootstrap.bootstrap_remove(rm_context)
        return True

    @command.skill_level('administrator')
    def do_rename(self, context, new_name):
        '''
        Rename the cluster.
        '''
        if not ServiceManager(sh.ClusterShellAdaptorForLocalShell(sh.LocalShell())).service_is_active("corosync.service"):
            context.fatal_error("Can't rename cluster when cluster service is stopped")
        old_name = cib_factory.get_property('cluster-name')
        if old_name and new_name == old_name:
            context.fatal_error("Expected a different name")

        # Update config file with the new name on all nodes
        nodes = utils.list_cluster_nodes()
        corosync.set_value('totem.cluster_name', new_name)
        if len(nodes) > 1:
            nodes.remove(utils.this_node())
            context.info("Copy cluster config file to \"{}\"".format(' '.join(nodes)))
            corosync.push_configuration(nodes)

        # Change the cluster-name property in the CIB
        cib_factory.create_object("property", "cluster-name={}".format(new_name))
        if not cib_factory.commit():
            context.fatal_error("Change property cluster-name failed!")

        # it's a safe way to give user a hints that need to restart service
        context.info("To apply the change, restart the cluster service at convenient time")

    def _parse_clustermap(self, clusters):
        '''
        Helper function to parse the cluster map into a dictionary:

        name=ip; name2=ip2 -> { name: ip, name2: ip2 }
        '''
        if clusters is None:
            return None
        try:
            return dict([re.split('[=:]+', o) for o in re.split('[ ,;]+', clusters)])
        except TypeError:
            return None
        except ValueError:
            return None

    @command.name("geo_init")
    @command.alias("geo-init")
    @command.skill_level('administrator')
    def do_geo_init(self, context, *args):
        '''
        Make this cluster a geo cluster.
        Needs some information to set up.

        * cluster map: "cluster-name=ip cluster-name=ip"
        * arbitrator IP / hostname (optional)
        * list of tickets (can be empty)
        '''
        parser = ArgumentParser(description="""
Create a new geo cluster with the current cluster as the
first member. Pass the complete geo cluster topology as
arguments to this command, and then use geo-join and
geo-init-arbitrator to add the remaining members to
the geo cluster.""",
        usage="geo-init [options]", epilog="""

Cluster Description

  This is a map of cluster names to IP addresses.
  Each IP address will be configured as a virtual IP
  representing that cluster in the geo cluster
  configuration.

  Example with two clusters named paris and amsterdam:

  --clusters "paris=192.168.10.10 amsterdam=192.168.10.11"

  Name clusters using the --name parameter to
  crm bootstrap init.
""", add_help=False, formatter_class=RawDescriptionHelpFormatter)
        parser.add_argument("-h", "--help", action="store_true", dest="help", help="Show this help message")
        parser.add_argument("-q", "--quiet", help="Be quiet (don't describe what's happening, just do it)", action="store_true", dest="quiet")
        parser.add_argument("-y", "--yes", help='Answer "yes" to all prompts (use with caution)', action="store_true", dest="yes_to_all")
        parser.add_argument(
            "-a", "--arbitrator", dest="arbitrator", metavar="[USER@]HOST",
            help="Geo cluster arbitrator",
        )
        parser.add_argument("-s", "--clusters", help="Geo cluster description (see details below)", dest="clusters", metavar="DESC")
        parser.add_argument("-t", "--tickets", help="Tickets to create (space-separated)", dest="tickets", metavar="LIST")
        options, args = parse_options(parser, args)
        if options is None or args is None:
            return

        if options.clusters is None:
            errs = []
            if options.clusters is None:
                errs.append("The --clusters argument is required.")
            parser.error(" ".join(errs))

        clustermap = self._parse_clustermap(options.clusters)
        if clustermap is None:
            parser.error("Invalid cluster description format")
        ticketlist = []
        if options.tickets is not None:
            try:
                ticketlist = [t for t in re.split('[ ,;]+', options.tickets)]
            except ValueError:
                parser.error("Invalid ticket list")

        geo_context = bootstrap.Context.set_context(options)
        geo_context.clusters = clustermap
        geo_context.tickets = ticketlist
        geo_context.ui_context = context

        bootstrap.bootstrap_init_geo(geo_context)
        return True

    @command.name("geo_join")
    @command.alias("geo-join")
    @command.skill_level('administrator')
    def do_geo_join(self, context, *args):
        '''
        Join this cluster to a geo configuration.
        '''
        parser = ArgumentParser(description="""
This command should be run from one of the nodes in a cluster
which is currently not a member of a geo cluster. The geo
cluster configuration will be fetched from the provided node,
and the cluster will be added to the geo cluster.

Note that each cluster in a geo cluster needs to have a unique
name set. The cluster name can be set using the --name argument
to init, or by configuring corosync with the cluster name in
an existing cluster.""",
                usage="geo-join [options]", add_help=False, formatter_class=RawDescriptionHelpFormatter)
        parser.add_argument("-h", "--help", action="store_true", dest="help", help="Show this help message")
        parser.add_argument("-q", "--quiet", help="Be quiet (don't describe what's happening, just do it)", action="store_true", dest="quiet")
        parser.add_argument("-y", "--yes", help='Answer "yes" to all prompts (use with caution)', action="store_true", dest="yes_to_all")
        parser.add_argument("-c", "--cluster-node", metavar="[USER@]HOST", help="An already-configured geo cluster or arbitrator", dest="cluster_node")
        parser.add_argument("-s", "--clusters", help="Geo cluster description (see geo-init for details)", dest="clusters", metavar="DESC")
        options, args = parse_options(parser, args)
        if options is None or args is None:
            return
        errs = []
        if options.cluster_node is None:
            errs.append("The --cluster-node argument is required.")
        if options.clusters is None:
            errs.append("The --clusters argument is required.")
        if len(errs) > 0:
            parser.error(" ".join(errs))
        clustermap = self._parse_clustermap(options.clusters)
        if clustermap is None:
            parser.error("Invalid cluster description format")

        geo_context = bootstrap.Context.set_context(options)
        geo_context.clusters = clustermap
        geo_context.ui_context = context

        bootstrap.bootstrap_join_geo(geo_context)
        return True

    @command.name("geo_init_arbitrator")
    @command.alias("geo-init-arbitrator")
    @command.skill_level('administrator')
    def do_geo_init_arbitrator(self, context, *args):
        '''
        Make this node a geo arbitrator.
        '''
        parser = ArgumentParser(description="""
Configure the current node as a geo arbitrator. The command
requires an existing geo cluster or geo arbitrator from which
to get the geo cluster configuration.""",
                usage="geo-init-arbitrator [options]", add_help=False, formatter_class=RawDescriptionHelpFormatter)
        parser.add_argument("-h", "--help", action="store_true", dest="help", help="Show this help message")
        parser.add_argument("-q", "--quiet", help="Be quiet (don't describe what's happening, just do it)", action="store_true", dest="quiet")
        parser.add_argument("-y", "--yes", help='Answer "yes" to all prompts (use with caution)', action="store_true", dest="yes_to_all")
        parser.add_argument("-c", "--cluster-node", metavar="[USER@]HOST", help="An already-configured geo cluster", dest="cluster_node")
        parser.add_argument('--use-ssh-agent', action='store_true', dest='use_ssh_agent',
                            help="Use an existing key from ssh-agent instead of creating new key pairs")
        options, args = parse_options(parser, args)
        if options is None or args is None:
            return

        geo_context = bootstrap.Context.set_context(options)
        geo_context.ui_context = context

        bootstrap.bootstrap_arbitrator(geo_context)
        return True

    @command.completers_repeating(compl.call(scripts.param_completion_list, 'health'))
    def do_health(self, context, *args):
        '''
        Extensive health check.
        '''
        params = self._args_implicit(context, args, 'nodes')
        script = scripts.load_script('health')
        if script is None:
            raise ValueError("health script failed to load")
        return scripts.run(script, script_args(params), script_printer())

    def _node_in_cluster(self, node):
        return node in utils.list_cluster_nodes()

    def do_status(self, context):
        '''
        Quick cluster health status. Corosync status, DRBD status...
        '''
        print("Name: {}\n".format(get_cluster_name()))
        print("Services:")
        for svc in ["corosync", "pacemaker"]:
            info = utils.service_info(svc)
            if info:
                print("%-16s %s" % (svc, info))
            else:
                print("%-16s unknown" % (svc))

        rc, outp = ShellUtils().get_stdout(['corosync-cfgtool', '-s'], shell=False)
        if rc == 0:
            print("")
            print(outp)
        else:
            print("Failed to get corosync status")

    @command.completers_repeating(compl.choice(['10', '60', '600']))
    def do_wait_for_startup(self, context, timeout='10'):
        "usage: wait_for_startup [<timeout>]"
        import time
        t0 = time.time()
        timeout = float(timeout)
        cmd = 'crm_mon -bD1 >/dev/null 2>&1'
        ret = utils.ext_cmd(cmd)
        while ret in (107, 64) and time.time() < t0 + timeout:
            time.sleep(1)
            ret = utils.ext_cmd(cmd)
        if ret != 0:
            context.fatal_error("Timed out waiting for cluster (rc = %s)" % (ret))

    @command.skill_level('expert')
    def do_run(self, context, cmd, *nodes):
        '''
        Execute the given command on all nodes/specific node(s), report outcome
        '''
        if nodes:
            hosts = list(nodes)
        else:
            hosts = utils.list_cluster_nodes()
            if hosts is None:
                context.fatal_error("failed to get node list from cluster")

        for host, result in prun.prun({x: cmd for x in hosts}).items():
            if isinstance(result, prun.PRunError):
                logger.error("[%s]: %s", host, result)
            else:
                if result.returncode != 0:
                    logger.error(
                        "[%s]: Exited with error code %s. Error output: %s",
                        host, result.returncode, utils.to_ascii(result.stderr),
                    )
                else:
                    if not result.stdout:
                        logger.info("[%s]", host)
                    else:
                        logger.info("[%s]\n%s", host, utils.to_ascii(result.stdout))

    def do_copy(self, context, local_file, *nodes):
        '''
        usage: copy <filename> [nodes ...]
        Copy file to other cluster nodes.
        If given no nodes as arguments, copy to all other cluster nodes.
        '''
        return utils.cluster_copy_file(local_file, nodes)

    def do_diff(self, context, filename, *nodes):
        "usage: diff <filename> [--checksum] [nodes...]. Diff file across cluster."
        nodes = list(nodes)
        this_node = utils.this_node()
        checksum = False
        if len(nodes) and nodes[0] == '--checksum':
            nodes = nodes[1:]
            checksum = True
        if not nodes:
            nodes = utils.list_cluster_nodes()
        if checksum:
            utils.remote_checksum(filename, nodes, this_node)
        elif len(nodes) == 1:
            utils.remote_diff_this(filename, nodes, this_node)
        elif this_node in nodes:
            nodes.remove(this_node)
            utils.remote_diff_this(filename, nodes, this_node)
        elif len(nodes):
            utils.remote_diff(filename, nodes)

    def do_crash_test(self, context, *args):
        """
        """
        from .crash_test import main
        sys.argv[1:] = args
        main.ctx.process_name = context.command_name
        main.run(main.ctx)
        return True