diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/pybind/mgr/ssh | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/ssh')
-rw-r--r-- | src/pybind/mgr/ssh/.gitignore | 1 | ||||
-rw-r--r-- | src/pybind/mgr/ssh/README.md | 93 | ||||
-rw-r--r-- | src/pybind/mgr/ssh/Vagrantfile | 39 | ||||
-rw-r--r-- | src/pybind/mgr/ssh/__init__.py | 1 | ||||
-rw-r--r-- | src/pybind/mgr/ssh/ceph.repo | 23 | ||||
-rw-r--r-- | src/pybind/mgr/ssh/module.py | 740 | ||||
-rw-r--r-- | src/pybind/mgr/ssh/remotes.py | 81 |
7 files changed, 978 insertions, 0 deletions
diff --git a/src/pybind/mgr/ssh/.gitignore b/src/pybind/mgr/ssh/.gitignore new file mode 100644 index 00000000..8000dd9d --- /dev/null +++ b/src/pybind/mgr/ssh/.gitignore @@ -0,0 +1 @@ +.vagrant diff --git a/src/pybind/mgr/ssh/README.md b/src/pybind/mgr/ssh/README.md new file mode 100644 index 00000000..10f268cd --- /dev/null +++ b/src/pybind/mgr/ssh/README.md @@ -0,0 +1,93 @@ +# dev environment setup + +1. start vms with _only_ the ceph packages installed + +In `src/pybind/mgr/ssh` run `vagrant up` to create a cluster with a monitor, +manager, and osd nodes. The osd node will have two small extra disks attached. + +2. generate an `ssh_config` file for the vm hosts + +Execute `vagrant ssh-config > /path/to/ssh_config` to generate a ssh +configuration file that contains hosts, usernames, and keys that will be used by +the bootstrap cluster / ssh orchestrator to establish ssh connections to the +vagrant vms. + +3. install ssh orchestrator dependencies + +The primary dependency is the `remoto` package that contains a Python SSH client +for connecting to remote nodes and executing commands. + +Install with `dnf install python3-remoto`. The version must be >= 0.0.35. At the +time of writing this version is being packaged and is not available. To install +from source: + +``` +git clone https://github.com/ceph/remoto +cd remoto +python3 setup.py sdist +pip3 install --prefix=/usr dist/remoto-0.0.35.tar.gz +``` + +4. start the bootstrap cluster (in this case a `vstart.sh` cluster) + +Start with a network binding to which the vms can route traffic: + + `vstart.sh -n -i 192.168.121.1` + +The following is a manual method for finding this address. TODO: documenting a +automated/deterministic method would be very helpful. + +First, ensure that your firewall settings permit each VM to communicate with the +host. On Fedora, the `trusted` profile is sufficient: `firewall-cmd +--set-default-zone trusted` and also allows traffic on Ceph ports. Then ssh into +one of the vm nodes and ping the default gateway, which happens to be setup as +the host machine. + +``` +[nwatkins@smash ssh]$ vagrant ssh mon0 -c "getent hosts gateway" +192.168.121.1 gateway +``` + +5. setup the ssh orchestrator backend + +Enable and configure the ssh orchestrator as the active backend: + +``` +ceph mgr module enable ssh +ceph orchestrator set backend ssh + +# optional: this document assumes the orchestrator CLI is enabled +ceph mgr module enable orchestrator_cli +``` + +Configure the ssh orchestrator by setting the `ssh_config` option to point at +the ssh configuration file generated above: + +``` +ceph config set mgr mgr/ssh/ssh_config_file /path/to/config +``` + +The setting can be confirmed by retrieving the configuration settings: + +``` +[nwatkins@smash build]$ ceph config get mgr. +WHO MASK LEVEL OPTION VALUE RO +mgr advanced mgr/orchestrator_cli/orchestrator ssh * +mgr advanced mgr/ssh/ssh_config_file /home/nwatkins/src/ceph/src/pybind/mgr/ssh/config * +``` + +An SSH config file can also be provided through standard input that avoids the +need to have an accessible file path. Use the following command: + + +``` +ceph ssh set-ssh-config -i <path to ssh_config> +``` + +The next set of instructions we should move to the docs folder + +ceph orchestrator host add osd0 +ceph orchestrator host add mgr0 +ceph orchestrator host add mon0 +ceph orchestrator device ls +ceph orchestrator mgr update 3 mgr0 mgr1 diff --git a/src/pybind/mgr/ssh/Vagrantfile b/src/pybind/mgr/ssh/Vagrantfile new file mode 100644 index 00000000..0a2a6389 --- /dev/null +++ b/src/pybind/mgr/ssh/Vagrantfile @@ -0,0 +1,39 @@ +# vi: set ft=ruby : + +NUM_DAEMONS = ENV["NUM_DAEMONS"] ? ENV["NUM_DAEMONS"].to_i : 1 + +Vagrant.configure("2") do |config| + config.vm.synced_folder ".", "/vagrant", disabled: true + config.vm.network "private_network", type: "dhcp" + config.vm.box = "centos/7" + + (0..NUM_DAEMONS - 1).each do |i| + config.vm.define "mon#{i}" do |mon| + mon.vm.hostname = "mon#{i}" + end + config.vm.define "mgr#{i}" do |mgr| + mgr.vm.hostname = "mgr#{i}" + end + config.vm.define "osd#{i}" do |osd| + osd.vm.hostname = "osd#{i}" + osd.vm.provider :libvirt do |libvirt| + libvirt.storage :file, :size => '5G' + libvirt.storage :file, :size => '5G' + end + end + end + + config.vm.provision "shell" do |s| + ssh_pub_key = File.readlines("#{Dir.home}/.ssh/id_rsa.pub").first.strip + s.inline = "echo #{ssh_pub_key} >> /home/vagrant/.ssh/authorized_keys" + end + + config.vm.provision "shell", inline: <<-SHELL + sudo yum install -y yum-utils + sudo yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm + sudo rpm --import 'https://download.ceph.com/keys/release.asc' + curl -L https://shaman.ceph.com/api/repos/ceph/master/latest/centos/7/repo/ | sudo tee /etc/yum.repos.d/shaman.repo + sudo yum install -y ceph python36 + sudo ln -s /usr/bin/python36 /usr/bin/python3 + SHELL +end diff --git a/src/pybind/mgr/ssh/__init__.py b/src/pybind/mgr/ssh/__init__.py new file mode 100644 index 00000000..3f41e016 --- /dev/null +++ b/src/pybind/mgr/ssh/__init__.py @@ -0,0 +1 @@ +from .module import SSHOrchestrator diff --git a/src/pybind/mgr/ssh/ceph.repo b/src/pybind/mgr/ssh/ceph.repo new file mode 100644 index 00000000..6f710e7c --- /dev/null +++ b/src/pybind/mgr/ssh/ceph.repo @@ -0,0 +1,23 @@ +[ceph] +name=Ceph packages for $basearch +baseurl=https://download.ceph.com/rpm-mimic/el7/$basearch +enabled=1 +priority=2 +gpgcheck=1 +gpgkey=https://download.ceph.com/keys/release.asc + +[ceph-noarch] +name=Ceph noarch packages +baseurl=https://download.ceph.com/rpm-mimic/el7/noarch +enabled=1 +priority=2 +gpgcheck=1 +gpgkey=https://download.ceph.com/keys/release.asc + +[ceph-source] +name=Ceph source packages +baseurl=https://download.ceph.com/rpm-mimic/el7/SRPMS +enabled=0 +priority=2 +gpgcheck=1 +gpgkey=https://download.ceph.com/keys/release.asc diff --git a/src/pybind/mgr/ssh/module.py b/src/pybind/mgr/ssh/module.py new file mode 100644 index 00000000..73675e52 --- /dev/null +++ b/src/pybind/mgr/ssh/module.py @@ -0,0 +1,740 @@ +import json +import errno +import logging +from functools import wraps + +import six +import os +import tempfile +import multiprocessing.pool + +from mgr_module import MgrModule +import orchestrator + +from . import remotes + +try: + import remoto + import remoto.process +except ImportError as e: + remoto = None + remoto_import_error = str(e) + +logger = logging.getLogger(__name__) + +# high-level TODO: +# - bring over some of the protections from ceph-deploy that guard against +# multiple bootstrapping / initialization + +class SSHCompletionmMixin(object): + def __init__(self, result): + if isinstance(result, multiprocessing.pool.AsyncResult): + self._result = [result] + else: + self._result = result + assert isinstance(self._result, list) + + @property + def result(self): + return list(map(lambda r: r.get(), self._result)) + +class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion): + @property + def is_complete(self): + return all(map(lambda r: r.ready(), self._result)) + + +class SSHWriteCompletion(SSHCompletionmMixin, orchestrator.WriteCompletion): + + @property + def is_persistent(self): + return all(map(lambda r: r.ready(), self._result)) + + @property + def is_effective(self): + return all(map(lambda r: r.ready(), self._result)) + + @property + def is_errored(self): + for r in self._result: + if not r.ready(): + return False + if not r.successful(): + return True + return False + +class SSHWriteCompletionReady(SSHWriteCompletion): + def __init__(self, result): + orchestrator.WriteCompletion.__init__(self) + self._result = result + + @property + def result(self): + return self._result + + @property + def is_persistent(self): + return True + + @property + def is_effective(self): + return True + + @property + def is_errored(self): + return False + +class SSHConnection(object): + """ + Tie tempfile lifetime (e.g. ssh_config) to a remoto connection. + """ + def __init__(self): + self.conn = None + self.temp_file = None + + # proxy to the remoto connection + def __getattr__(self, name): + return getattr(self.conn, name) + + +def log_exceptions(f): + if six.PY3: + return f + else: + # Python 2 does no exception chaining, thus the + # real exception is lost + @wraps(f) + def wrapper(*args, **kwargs): + try: + return f(*args, **kwargs) + except Exception: + logger.exception('something went wrong.') + raise + return wrapper + + +class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): + + _STORE_HOST_PREFIX = "host" + _DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN = 10 + + MODULE_OPTIONS = [ + {'name': 'ssh_config_file'}, + {'name': 'inventory_cache_timeout_min'}, + ] + + COMMANDS = [ + { + 'cmd': 'ssh set-ssh-config', + 'desc': 'Set the ssh_config file (use -i <ssh_config>)', + 'perm': 'rw' + }, + { + 'cmd': 'ssh clear-ssh-config', + 'desc': 'Clear the ssh_config file', + 'perm': 'rw' + }, + ] + + def __init__(self, *args, **kwargs): + super(SSHOrchestrator, self).__init__(*args, **kwargs) + self._cluster_fsid = None + self._worker_pool = multiprocessing.pool.ThreadPool(1) + + # the keys in inventory_cache are authoritative. + # You must not call remove_outdated() + # The values are cached by instance. + # cache is invalidated by + # 1. timeout + # 2. refresh parameter + self.inventory_cache = orchestrator.OutdatablePersistentDict(self, self._STORE_HOST_PREFIX) + + def handle_command(self, inbuf, command): + if command["prefix"] == "ssh set-ssh-config": + return self._set_ssh_config(inbuf, command) + elif command["prefix"] == "ssh clear-ssh-config": + return self._clear_ssh_config(inbuf, command) + else: + raise NotImplementedError(command["prefix"]) + + @staticmethod + def can_run(): + if remoto is not None: + return True, "" + else: + return False, "loading remoto library:{}".format( + remoto_import_error) + + def available(self): + """ + The SSH orchestrator is always available. + """ + return self.can_run() + + def wait(self, completions): + self.log.info("wait: completions={}".format(completions)) + + complete = True + for c in completions: + if c.is_complete: + continue + + if not isinstance(c, SSHReadCompletion) and \ + not isinstance(c, SSHWriteCompletion): + raise TypeError("unexpected completion: {}".format(c.__class__)) + + complete = False + + return complete + + def _get_cluster_fsid(self): + """ + Fetch and cache the cluster fsid. + """ + if not self._cluster_fsid: + self._cluster_fsid = self.get("mon_map")["fsid"] + assert isinstance(self._cluster_fsid, six.string_types) + return self._cluster_fsid + + def _require_hosts(self, hosts): + """ + Raise an error if any of the given hosts are unregistered. + """ + if isinstance(hosts, six.string_types): + hosts = [hosts] + keys = self.inventory_cache.keys() + unregistered_hosts = set(hosts) - keys + if unregistered_hosts: + logger.warning('keys = {}'.format(keys)) + raise RuntimeError("Host(s) {} not registered".format( + ", ".join(map(lambda h: "'{}'".format(h), + unregistered_hosts)))) + + def _set_ssh_config(self, inbuf, command): + """ + Set an ssh_config file provided from stdin + + TODO: + - validation + """ + if len(inbuf) == 0: + return errno.EINVAL, "", "empty ssh config provided" + self.set_store("ssh_config", inbuf) + return 0, "", "" + + def _clear_ssh_config(self, inbuf, command): + """ + Clear the ssh_config file provided from stdin + """ + self.set_store("ssh_config", None) + self.ssh_config_tmp = None + return 0, "", "" + + def _get_connection(self, host): + """ + Setup a connection for running commands on remote host. + """ + ssh_options = None + + conn = SSHConnection() + + ssh_config = self.get_store("ssh_config") + if ssh_config is not None: + conn.temp_file = tempfile.NamedTemporaryFile() + conn.temp_file.write(ssh_config.encode('utf-8')) + conn.temp_file.flush() # make visible to other processes + ssh_config_fname = conn.temp_file.name + else: + ssh_config_fname = self.get_localized_module_option("ssh_config_file") + + if ssh_config_fname: + if not os.path.isfile(ssh_config_fname): + raise Exception("ssh_config \"{}\" does not exist".format(ssh_config_fname)) + ssh_options = "-F {}".format(ssh_config_fname) + + self.log.info("opening connection to host '{}' with ssh " + "options '{}'".format(host, ssh_options)) + + conn.conn = remoto.Connection(host, + logger=self.log, + detect_sudo=True, + ssh_options=ssh_options) + + conn.conn.import_module(remotes) + + return conn + + def _executable_path(self, conn, executable): + """ + Remote validator that accepts a connection object to ensure that a certain + executable is available returning its full path if so. + + Otherwise an exception with thorough details will be raised, informing the + user that the executable was not found. + """ + executable_path = conn.remote_module.which(executable) + if not executable_path: + raise RuntimeError("Executable '{}' not found on host '{}'".format( + executable, conn.hostname)) + self.log.info("Found executable '{}' at path '{}'".format(executable, + executable_path)) + return executable_path + + def _build_ceph_conf(self): + """ + Build a minimal `ceph.conf` containing the current monitor hosts. + + Notes: + - ceph-volume complains if no section header (e.g. global) exists + - other ceph cli tools complained about no EOF newline + + TODO: + - messenger v2 syntax? + """ + mon_map = self.get("mon_map") + mon_addrs = map(lambda m: m["addr"], mon_map["mons"]) + mon_hosts = ", ".join(mon_addrs) + return "[global]\nmon host = {}\n".format(mon_hosts) + + def _ensure_ceph_conf(self, conn, network=False): + """ + Install ceph.conf on remote node if it doesn't exist. + """ + conf = self._build_ceph_conf() + if network: + conf += "public_network = {}\n".format(network) + conn.remote_module.write_conf("/etc/ceph/ceph.conf", conf) + + def _get_bootstrap_key(self, service_type): + """ + Fetch a bootstrap key for a service type. + + :param service_type: name (e.g. mds, osd, mon, ...) + """ + identity_dict = { + 'admin' : 'client.admin', + 'mds' : 'client.bootstrap-mds', + 'mgr' : 'client.bootstrap-mgr', + 'osd' : 'client.bootstrap-osd', + 'rgw' : 'client.bootstrap-rgw', + 'mon' : 'mon.' + } + + identity = identity_dict[service_type] + + ret, out, err = self.mon_command({ + "prefix": "auth get", + "entity": identity + }) + + if ret == -errno.ENOENT: + raise RuntimeError("Entity '{}' not found: '{}'".format(identity, err)) + elif ret != 0: + raise RuntimeError("Error retrieving key for '{}' ret {}: '{}'".format( + identity, ret, err)) + + return out + + def _bootstrap_mgr(self, conn): + """ + Bootstrap a manager. + + 1. install a copy of ceph.conf + 2. install the manager bootstrap key + + :param conn: remote host connection + """ + self._ensure_ceph_conf(conn) + keyring = self._get_bootstrap_key("mgr") + keyring_path = "/var/lib/ceph/bootstrap-mgr/ceph.keyring" + conn.remote_module.write_keyring(keyring_path, keyring) + return keyring_path + + def _bootstrap_osd(self, conn): + """ + Bootstrap an osd. + + 1. install a copy of ceph.conf + 2. install the osd bootstrap key + + :param conn: remote host connection + """ + self._ensure_ceph_conf(conn) + keyring = self._get_bootstrap_key("osd") + keyring_path = "/var/lib/ceph/bootstrap-osd/ceph.keyring" + conn.remote_module.write_keyring(keyring_path, keyring) + return keyring_path + + def _get_hosts(self, wanted=None): + return self.inventory_cache.items_filtered(wanted) + + def add_host(self, host): + """ + Add a host to be managed by the orchestrator. + + :param host: host name + """ + @log_exceptions + def run(host): + self.inventory_cache[host] = orchestrator.OutdatableData() + return "Added host '{}'".format(host) + + return SSHWriteCompletion( + self._worker_pool.apply_async(run, (host,))) + + def remove_host(self, host): + """ + Remove a host from orchestrator management. + + :param host: host name + """ + @log_exceptions + def run(host): + del self.inventory_cache[host] + return "Removed host '{}'".format(host) + + return SSHWriteCompletion( + self._worker_pool.apply_async(run, (host,))) + + def get_hosts(self): + """ + Return a list of hosts managed by the orchestrator. + + Notes: + - skip async: manager reads from cache. + + TODO: + - InventoryNode probably needs to be able to report labels + """ + nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache] + return orchestrator.TrivialReadCompletion(nodes) + + def _get_device_inventory(self, host): + """ + Query storage devices on a remote node. + + :return: list of InventoryDevice + """ + conn = self._get_connection(host) + + try: + ceph_volume_executable = self._executable_path(conn, 'ceph-volume') + command = [ + ceph_volume_executable, + "inventory", + "--format=json" + ] + + out, err, code = remoto.process.check(conn, command) + host_devices = json.loads(out[0]) + return host_devices + + except Exception as ex: + self.log.exception(ex) + raise + + finally: + conn.exit() + + def get_inventory(self, node_filter=None, refresh=False): + """ + Return the storage inventory of nodes matching the given filter. + + :param node_filter: node filter + + TODO: + - add filtering by label + """ + if node_filter: + hosts = node_filter.nodes + self._require_hosts(hosts) + hosts = self._get_hosts(hosts) + else: + # this implies the returned hosts are registered + hosts = self._get_hosts() + + @log_exceptions + def run(host, host_info): + # type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode + + timeout_min = int(self.get_module_option( + "inventory_cache_timeout_min", + self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN)) + + if host_info.outdated(timeout_min) or refresh: + self.log.info("refresh stale inventory for '{}'".format(host)) + data = self._get_device_inventory(host) + host_info = orchestrator.OutdatableData(data) + self.inventory_cache[host] = host_info + else: + self.log.debug("reading cached inventory for '{}'".format(host)) + + devices = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(host_info.data) + return orchestrator.InventoryNode(host, devices) + + results = [] + for key, host_info in hosts: + result = self._worker_pool.apply_async(run, (key, host_info)) + results.append(result) + + return SSHReadCompletion(results) + + @log_exceptions + def _create_osd(self, host, drive_group): + conn = self._get_connection(host) + try: + devices = drive_group.data_devices.paths + self._bootstrap_osd(conn) + + for device in devices: + ceph_volume_executable = self._executable_path(conn, "ceph-volume") + command = [ + ceph_volume_executable, + "lvm", + "create", + "--cluster-fsid", self._get_cluster_fsid(), + "--{}".format(drive_group.objectstore), + "--data", device + ] + remoto.process.run(conn, command) + + return "Created osd on host '{}'".format(host) + + except: + raise + + finally: + conn.exit() + + def create_osds(self, drive_group, all_hosts=None): + """ + Create a new osd. + + The orchestrator CLI currently handles a narrow form of drive + specification defined by a single block device using bluestore. + + :param drive_group: osd specification + + TODO: + - support full drive_group specification + - support batch creation + """ + assert len(drive_group.hosts(all_hosts)) == 1 + assert len(drive_group.data_devices.paths) > 0 + assert all(map(lambda p: isinstance(p, six.string_types), + drive_group.data_devices.paths)) + + host = drive_group.hosts(all_hosts)[0] + self._require_hosts(host) + + result = self._worker_pool.apply_async(self._create_osd, (host, + drive_group)) + + return SSHWriteCompletion(result) + + def _create_mon(self, host, network): + """ + Create a new monitor on the given host. + """ + self.log.info("create_mon({}:{}): starting".format(host, network)) + + conn = self._get_connection(host) + + try: + self._ensure_ceph_conf(conn, network) + + uid = conn.remote_module.path_getuid("/var/lib/ceph") + gid = conn.remote_module.path_getgid("/var/lib/ceph") + + # install client admin key on target mon host + admin_keyring = self._get_bootstrap_key("admin") + admin_keyring_path = '/etc/ceph/ceph.client.admin.keyring' + conn.remote_module.write_keyring(admin_keyring_path, admin_keyring, uid, gid) + + mon_path = "/var/lib/ceph/mon/ceph-{name}".format(name=host) + conn.remote_module.create_mon_path(mon_path, uid, gid) + + # bootstrap key + conn.remote_module.safe_makedirs("/var/lib/ceph/tmp") + monitor_keyring = self._get_bootstrap_key("mon") + mon_keyring_path = "/var/lib/ceph/tmp/ceph-{name}.mon.keyring".format(name=host) + conn.remote_module.write_file( + mon_keyring_path, + monitor_keyring, + 0o600, + None, + uid, + gid + ) + + # monitor map + monmap_path = "/var/lib/ceph/tmp/ceph.{name}.monmap".format(name=host) + remoto.process.run(conn, + ['ceph', 'mon', 'getmap', '-o', monmap_path], + ) + + user_args = [] + if uid != 0: + user_args = user_args + [ '--setuser', str(uid) ] + if gid != 0: + user_args = user_args + [ '--setgroup', str(gid) ] + + remoto.process.run(conn, + ['ceph-mon', '--mkfs', '-i', host, + '--monmap', monmap_path, '--keyring', mon_keyring_path + ] + user_args + ) + + remoto.process.run(conn, + ['systemctl', 'enable', 'ceph.target'], + timeout=7, + ) + + remoto.process.run(conn, + ['systemctl', 'enable', 'ceph-mon@{name}'.format(name=host)], + timeout=7, + ) + + remoto.process.run(conn, + ['systemctl', 'start', 'ceph-mon@{name}'.format(name=host)], + timeout=7, + ) + + return "Created mon on host '{}'".format(host) + + except Exception as e: + self.log.error("create_mon({}:{}): error: {}".format(host, network, e)) + raise + + finally: + self.log.info("create_mon({}:{}): finished".format(host, network)) + conn.exit() + + def update_mons(self, num, hosts): + """ + Adjust the number of cluster monitors. + """ + # current support limited to adding monitors. + mon_map = self.get("mon_map") + num_mons = len(mon_map["mons"]) + if num == num_mons: + return SSHWriteCompletionReady("The requested number of monitors exist.") + if num < num_mons: + raise NotImplementedError("Removing monitors is not supported.") + + # check that all the hostnames are registered + self._require_hosts(map(lambda h: h[0], hosts)) + + # current support requires a network to be specified + for host, network in hosts: + if not network: + raise RuntimeError("Host '{}' missing network " + "part".format(host)) + + # explicit placement: enough hosts provided? + num_new_mons = num - num_mons + if len(hosts) < num_new_mons: + raise RuntimeError("Error: {} hosts provided, expected {}".format( + len(hosts), num_new_mons)) + + self.log.info("creating {} monitors on hosts: '{}'".format( + num_new_mons, ",".join(map(lambda h: ":".join(h), hosts)))) + + # TODO: we may want to chain the creation of the monitors so they join + # the quroum one at a time. + results = [] + for host, network in hosts: + result = self._worker_pool.apply_async(self._create_mon, (host, + network)) + results.append(result) + + return SSHWriteCompletion(results) + + def _create_mgr(self, host): + """ + Create a new manager instance on a host. + """ + self.log.info("create_mgr({}): starting".format(host)) + + conn = self._get_connection(host) + + try: + bootstrap_keyring_path = self._bootstrap_mgr(conn) + + mgr_path = "/var/lib/ceph/mgr/ceph-{name}".format(name=host) + conn.remote_module.safe_makedirs(mgr_path) + keyring_path = os.path.join(mgr_path, "keyring") + + command = [ + 'ceph', + '--name', 'client.bootstrap-mgr', + '--keyring', bootstrap_keyring_path, + 'auth', 'get-or-create', 'mgr.{name}'.format(name=host), + 'mon', 'allow profile mgr', + 'osd', 'allow *', + 'mds', 'allow *', + '-o', + keyring_path + ] + + out, err, ret = remoto.process.check(conn, command) + if ret != 0: + raise Exception("oops") + + remoto.process.run(conn, + ['systemctl', 'enable', 'ceph-mgr@{name}'.format(name=host)], + timeout=7 + ) + + remoto.process.run(conn, + ['systemctl', 'start', 'ceph-mgr@{name}'.format(name=host)], + timeout=7 + ) + + remoto.process.run(conn, + ['systemctl', 'enable', 'ceph.target'], + timeout=7 + ) + + return "Created mgr on host '{}'".format(host) + + except Exception as e: + self.log.error("create_mgr({}): error: {}".format(host, e)) + raise + + finally: + self.log.info("create_mgr({}): finished".format(host)) + conn.exit() + + def update_mgrs(self, num, hosts): + """ + Adjust the number of cluster managers. + """ + # current support limited to adding managers. + mgr_map = self.get("mgr_map") + num_mgrs = 1 if mgr_map["active_name"] else 0 + num_mgrs += len(mgr_map["standbys"]) + if num == num_mgrs: + return SSHWriteCompletionReady("The requested number of managers exist.") + if num < num_mgrs: + raise NotImplementedError("Removing managers is not supported") + + # check that all the hosts are registered + self._require_hosts(hosts) + + # we assume explicit placement by which there are the same number of + # hosts specified as the size of increase in number of daemons. + num_new_mgrs = num - num_mgrs + if len(hosts) < num_new_mgrs: + raise RuntimeError("Error: {} hosts provided, expected {}".format( + len(hosts), num_new_mgrs)) + + self.log.info("creating {} managers on hosts: '{}'".format( + num_new_mgrs, ",".join(hosts))) + + results = [] + for i in range(num_new_mgrs): + result = self._worker_pool.apply_async(self._create_mgr, (hosts[i],)) + results.append(result) + + return SSHWriteCompletion(results) diff --git a/src/pybind/mgr/ssh/remotes.py b/src/pybind/mgr/ssh/remotes.py new file mode 100644 index 00000000..da057e83 --- /dev/null +++ b/src/pybind/mgr/ssh/remotes.py @@ -0,0 +1,81 @@ +# ceph-deploy ftw +import os +import errno +import tempfile +import shutil + +def safe_makedirs(path, uid=-1, gid=-1): + """ create path recursively if it doesn't exist """ + try: + os.makedirs(path) + except OSError as e: + if e.errno == errno.EEXIST: + pass + else: + raise + else: + os.chown(path, uid, gid) + +def write_conf(path, conf): + if not os.path.exists(path): + dirpath = os.path.dirname(path) + if os.path.exists(dirpath): + with open(path, "w") as f: + f.write(conf) + os.chmod(path, 0o644) + else: + raise RuntimeError( + "{0} does not exist".format(dirpath)) + +def write_keyring(path, key, overwrite=False, uid=-1, gid=-1): + dirname = os.path.dirname(path) + if not os.path.exists(dirname): + safe_makedirs(dirname, uid, gid) + if not overwrite and os.path.exists(path): + return + with open(path, "wb") as f: + f.write(key.encode('utf-8')) + +def create_mon_path(path, uid=-1, gid=-1): + """create the mon path if it does not exist""" + if not os.path.exists(path): + os.makedirs(path) + os.chown(path, uid, gid); + +def write_file(path, content, mode=0o644, directory=None, uid=-1, gid=-1): + if directory: + if path.startswith("/"): + path = path[1:] + path = os.path.join(directory, path) + if os.path.exists(path): + # Delete file in case we are changing its mode + os.unlink(path) + with os.fdopen(os.open(path, os.O_WRONLY | os.O_CREAT, mode), 'wb') as f: + f.write(content.encode('utf-8')) + os.chown(path, uid, gid) + +def path_getuid(path): + return os.stat(path).st_uid + +def path_getgid(path): + return os.stat(path).st_gid + +def which(executable): + """find the location of an executable""" + locations = ( + '/usr/local/bin', + '/bin', + '/usr/bin', + '/usr/local/sbin', + '/usr/sbin', + '/sbin', + ) + + for location in locations: + executable_path = os.path.join(location, executable) + if os.path.exists(executable_path) and os.path.isfile(executable_path): + return executable_path + +if __name__ == '__channelexec__': + for item in channel: + channel.send(eval(item)) |