summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/ssh
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/pybind/mgr/ssh
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/ssh')
-rw-r--r--src/pybind/mgr/ssh/.gitignore1
-rw-r--r--src/pybind/mgr/ssh/README.md93
-rw-r--r--src/pybind/mgr/ssh/Vagrantfile39
-rw-r--r--src/pybind/mgr/ssh/__init__.py1
-rw-r--r--src/pybind/mgr/ssh/ceph.repo23
-rw-r--r--src/pybind/mgr/ssh/module.py740
-rw-r--r--src/pybind/mgr/ssh/remotes.py81
7 files changed, 978 insertions, 0 deletions
diff --git a/src/pybind/mgr/ssh/.gitignore b/src/pybind/mgr/ssh/.gitignore
new file mode 100644
index 00000000..8000dd9d
--- /dev/null
+++ b/src/pybind/mgr/ssh/.gitignore
@@ -0,0 +1 @@
+.vagrant
diff --git a/src/pybind/mgr/ssh/README.md b/src/pybind/mgr/ssh/README.md
new file mode 100644
index 00000000..10f268cd
--- /dev/null
+++ b/src/pybind/mgr/ssh/README.md
@@ -0,0 +1,93 @@
+# dev environment setup
+
+1. start vms with _only_ the ceph packages installed
+
+In `src/pybind/mgr/ssh` run `vagrant up` to create a cluster with a monitor,
+manager, and osd nodes. The osd node will have two small extra disks attached.
+
+2. generate an `ssh_config` file for the vm hosts
+
+Execute `vagrant ssh-config > /path/to/ssh_config` to generate a ssh
+configuration file that contains hosts, usernames, and keys that will be used by
+the bootstrap cluster / ssh orchestrator to establish ssh connections to the
+vagrant vms.
+
+3. install ssh orchestrator dependencies
+
+The primary dependency is the `remoto` package that contains a Python SSH client
+for connecting to remote nodes and executing commands.
+
+Install with `dnf install python3-remoto`. The version must be >= 0.0.35. At the
+time of writing this version is being packaged and is not available. To install
+from source:
+
+```
+git clone https://github.com/ceph/remoto
+cd remoto
+python3 setup.py sdist
+pip3 install --prefix=/usr dist/remoto-0.0.35.tar.gz
+```
+
+4. start the bootstrap cluster (in this case a `vstart.sh` cluster)
+
+Start with a network binding to which the vms can route traffic:
+
+ `vstart.sh -n -i 192.168.121.1`
+
+The following is a manual method for finding this address. TODO: documenting a
+automated/deterministic method would be very helpful.
+
+First, ensure that your firewall settings permit each VM to communicate with the
+host. On Fedora, the `trusted` profile is sufficient: `firewall-cmd
+--set-default-zone trusted` and also allows traffic on Ceph ports. Then ssh into
+one of the vm nodes and ping the default gateway, which happens to be setup as
+the host machine.
+
+```
+[nwatkins@smash ssh]$ vagrant ssh mon0 -c "getent hosts gateway"
+192.168.121.1 gateway
+```
+
+5. setup the ssh orchestrator backend
+
+Enable and configure the ssh orchestrator as the active backend:
+
+```
+ceph mgr module enable ssh
+ceph orchestrator set backend ssh
+
+# optional: this document assumes the orchestrator CLI is enabled
+ceph mgr module enable orchestrator_cli
+```
+
+Configure the ssh orchestrator by setting the `ssh_config` option to point at
+the ssh configuration file generated above:
+
+```
+ceph config set mgr mgr/ssh/ssh_config_file /path/to/config
+```
+
+The setting can be confirmed by retrieving the configuration settings:
+
+```
+[nwatkins@smash build]$ ceph config get mgr.
+WHO MASK LEVEL OPTION VALUE RO
+mgr advanced mgr/orchestrator_cli/orchestrator ssh *
+mgr advanced mgr/ssh/ssh_config_file /home/nwatkins/src/ceph/src/pybind/mgr/ssh/config *
+```
+
+An SSH config file can also be provided through standard input that avoids the
+need to have an accessible file path. Use the following command:
+
+
+```
+ceph ssh set-ssh-config -i <path to ssh_config>
+```
+
+The next set of instructions we should move to the docs folder
+
+ceph orchestrator host add osd0
+ceph orchestrator host add mgr0
+ceph orchestrator host add mon0
+ceph orchestrator device ls
+ceph orchestrator mgr update 3 mgr0 mgr1
diff --git a/src/pybind/mgr/ssh/Vagrantfile b/src/pybind/mgr/ssh/Vagrantfile
new file mode 100644
index 00000000..0a2a6389
--- /dev/null
+++ b/src/pybind/mgr/ssh/Vagrantfile
@@ -0,0 +1,39 @@
+# vi: set ft=ruby :
+
+NUM_DAEMONS = ENV["NUM_DAEMONS"] ? ENV["NUM_DAEMONS"].to_i : 1
+
+Vagrant.configure("2") do |config|
+ config.vm.synced_folder ".", "/vagrant", disabled: true
+ config.vm.network "private_network", type: "dhcp"
+ config.vm.box = "centos/7"
+
+ (0..NUM_DAEMONS - 1).each do |i|
+ config.vm.define "mon#{i}" do |mon|
+ mon.vm.hostname = "mon#{i}"
+ end
+ config.vm.define "mgr#{i}" do |mgr|
+ mgr.vm.hostname = "mgr#{i}"
+ end
+ config.vm.define "osd#{i}" do |osd|
+ osd.vm.hostname = "osd#{i}"
+ osd.vm.provider :libvirt do |libvirt|
+ libvirt.storage :file, :size => '5G'
+ libvirt.storage :file, :size => '5G'
+ end
+ end
+ end
+
+ config.vm.provision "shell" do |s|
+ ssh_pub_key = File.readlines("#{Dir.home}/.ssh/id_rsa.pub").first.strip
+ s.inline = "echo #{ssh_pub_key} >> /home/vagrant/.ssh/authorized_keys"
+ end
+
+ config.vm.provision "shell", inline: <<-SHELL
+ sudo yum install -y yum-utils
+ sudo yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
+ sudo rpm --import 'https://download.ceph.com/keys/release.asc'
+ curl -L https://shaman.ceph.com/api/repos/ceph/master/latest/centos/7/repo/ | sudo tee /etc/yum.repos.d/shaman.repo
+ sudo yum install -y ceph python36
+ sudo ln -s /usr/bin/python36 /usr/bin/python3
+ SHELL
+end
diff --git a/src/pybind/mgr/ssh/__init__.py b/src/pybind/mgr/ssh/__init__.py
new file mode 100644
index 00000000..3f41e016
--- /dev/null
+++ b/src/pybind/mgr/ssh/__init__.py
@@ -0,0 +1 @@
+from .module import SSHOrchestrator
diff --git a/src/pybind/mgr/ssh/ceph.repo b/src/pybind/mgr/ssh/ceph.repo
new file mode 100644
index 00000000..6f710e7c
--- /dev/null
+++ b/src/pybind/mgr/ssh/ceph.repo
@@ -0,0 +1,23 @@
+[ceph]
+name=Ceph packages for $basearch
+baseurl=https://download.ceph.com/rpm-mimic/el7/$basearch
+enabled=1
+priority=2
+gpgcheck=1
+gpgkey=https://download.ceph.com/keys/release.asc
+
+[ceph-noarch]
+name=Ceph noarch packages
+baseurl=https://download.ceph.com/rpm-mimic/el7/noarch
+enabled=1
+priority=2
+gpgcheck=1
+gpgkey=https://download.ceph.com/keys/release.asc
+
+[ceph-source]
+name=Ceph source packages
+baseurl=https://download.ceph.com/rpm-mimic/el7/SRPMS
+enabled=0
+priority=2
+gpgcheck=1
+gpgkey=https://download.ceph.com/keys/release.asc
diff --git a/src/pybind/mgr/ssh/module.py b/src/pybind/mgr/ssh/module.py
new file mode 100644
index 00000000..73675e52
--- /dev/null
+++ b/src/pybind/mgr/ssh/module.py
@@ -0,0 +1,740 @@
+import json
+import errno
+import logging
+from functools import wraps
+
+import six
+import os
+import tempfile
+import multiprocessing.pool
+
+from mgr_module import MgrModule
+import orchestrator
+
+from . import remotes
+
+try:
+ import remoto
+ import remoto.process
+except ImportError as e:
+ remoto = None
+ remoto_import_error = str(e)
+
+logger = logging.getLogger(__name__)
+
+# high-level TODO:
+# - bring over some of the protections from ceph-deploy that guard against
+# multiple bootstrapping / initialization
+
+class SSHCompletionmMixin(object):
+ def __init__(self, result):
+ if isinstance(result, multiprocessing.pool.AsyncResult):
+ self._result = [result]
+ else:
+ self._result = result
+ assert isinstance(self._result, list)
+
+ @property
+ def result(self):
+ return list(map(lambda r: r.get(), self._result))
+
+class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion):
+ @property
+ def is_complete(self):
+ return all(map(lambda r: r.ready(), self._result))
+
+
+class SSHWriteCompletion(SSHCompletionmMixin, orchestrator.WriteCompletion):
+
+ @property
+ def is_persistent(self):
+ return all(map(lambda r: r.ready(), self._result))
+
+ @property
+ def is_effective(self):
+ return all(map(lambda r: r.ready(), self._result))
+
+ @property
+ def is_errored(self):
+ for r in self._result:
+ if not r.ready():
+ return False
+ if not r.successful():
+ return True
+ return False
+
+class SSHWriteCompletionReady(SSHWriteCompletion):
+ def __init__(self, result):
+ orchestrator.WriteCompletion.__init__(self)
+ self._result = result
+
+ @property
+ def result(self):
+ return self._result
+
+ @property
+ def is_persistent(self):
+ return True
+
+ @property
+ def is_effective(self):
+ return True
+
+ @property
+ def is_errored(self):
+ return False
+
+class SSHConnection(object):
+ """
+ Tie tempfile lifetime (e.g. ssh_config) to a remoto connection.
+ """
+ def __init__(self):
+ self.conn = None
+ self.temp_file = None
+
+ # proxy to the remoto connection
+ def __getattr__(self, name):
+ return getattr(self.conn, name)
+
+
+def log_exceptions(f):
+ if six.PY3:
+ return f
+ else:
+ # Python 2 does no exception chaining, thus the
+ # real exception is lost
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ try:
+ return f(*args, **kwargs)
+ except Exception:
+ logger.exception('something went wrong.')
+ raise
+ return wrapper
+
+
+class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
+
+ _STORE_HOST_PREFIX = "host"
+ _DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN = 10
+
+ MODULE_OPTIONS = [
+ {'name': 'ssh_config_file'},
+ {'name': 'inventory_cache_timeout_min'},
+ ]
+
+ COMMANDS = [
+ {
+ 'cmd': 'ssh set-ssh-config',
+ 'desc': 'Set the ssh_config file (use -i <ssh_config>)',
+ 'perm': 'rw'
+ },
+ {
+ 'cmd': 'ssh clear-ssh-config',
+ 'desc': 'Clear the ssh_config file',
+ 'perm': 'rw'
+ },
+ ]
+
+ def __init__(self, *args, **kwargs):
+ super(SSHOrchestrator, self).__init__(*args, **kwargs)
+ self._cluster_fsid = None
+ self._worker_pool = multiprocessing.pool.ThreadPool(1)
+
+ # the keys in inventory_cache are authoritative.
+ # You must not call remove_outdated()
+ # The values are cached by instance.
+ # cache is invalidated by
+ # 1. timeout
+ # 2. refresh parameter
+ self.inventory_cache = orchestrator.OutdatablePersistentDict(self, self._STORE_HOST_PREFIX)
+
+ def handle_command(self, inbuf, command):
+ if command["prefix"] == "ssh set-ssh-config":
+ return self._set_ssh_config(inbuf, command)
+ elif command["prefix"] == "ssh clear-ssh-config":
+ return self._clear_ssh_config(inbuf, command)
+ else:
+ raise NotImplementedError(command["prefix"])
+
+ @staticmethod
+ def can_run():
+ if remoto is not None:
+ return True, ""
+ else:
+ return False, "loading remoto library:{}".format(
+ remoto_import_error)
+
+ def available(self):
+ """
+ The SSH orchestrator is always available.
+ """
+ return self.can_run()
+
+ def wait(self, completions):
+ self.log.info("wait: completions={}".format(completions))
+
+ complete = True
+ for c in completions:
+ if c.is_complete:
+ continue
+
+ if not isinstance(c, SSHReadCompletion) and \
+ not isinstance(c, SSHWriteCompletion):
+ raise TypeError("unexpected completion: {}".format(c.__class__))
+
+ complete = False
+
+ return complete
+
+ def _get_cluster_fsid(self):
+ """
+ Fetch and cache the cluster fsid.
+ """
+ if not self._cluster_fsid:
+ self._cluster_fsid = self.get("mon_map")["fsid"]
+ assert isinstance(self._cluster_fsid, six.string_types)
+ return self._cluster_fsid
+
+ def _require_hosts(self, hosts):
+ """
+ Raise an error if any of the given hosts are unregistered.
+ """
+ if isinstance(hosts, six.string_types):
+ hosts = [hosts]
+ keys = self.inventory_cache.keys()
+ unregistered_hosts = set(hosts) - keys
+ if unregistered_hosts:
+ logger.warning('keys = {}'.format(keys))
+ raise RuntimeError("Host(s) {} not registered".format(
+ ", ".join(map(lambda h: "'{}'".format(h),
+ unregistered_hosts))))
+
+ def _set_ssh_config(self, inbuf, command):
+ """
+ Set an ssh_config file provided from stdin
+
+ TODO:
+ - validation
+ """
+ if len(inbuf) == 0:
+ return errno.EINVAL, "", "empty ssh config provided"
+ self.set_store("ssh_config", inbuf)
+ return 0, "", ""
+
+ def _clear_ssh_config(self, inbuf, command):
+ """
+ Clear the ssh_config file provided from stdin
+ """
+ self.set_store("ssh_config", None)
+ self.ssh_config_tmp = None
+ return 0, "", ""
+
+ def _get_connection(self, host):
+ """
+ Setup a connection for running commands on remote host.
+ """
+ ssh_options = None
+
+ conn = SSHConnection()
+
+ ssh_config = self.get_store("ssh_config")
+ if ssh_config is not None:
+ conn.temp_file = tempfile.NamedTemporaryFile()
+ conn.temp_file.write(ssh_config.encode('utf-8'))
+ conn.temp_file.flush() # make visible to other processes
+ ssh_config_fname = conn.temp_file.name
+ else:
+ ssh_config_fname = self.get_localized_module_option("ssh_config_file")
+
+ if ssh_config_fname:
+ if not os.path.isfile(ssh_config_fname):
+ raise Exception("ssh_config \"{}\" does not exist".format(ssh_config_fname))
+ ssh_options = "-F {}".format(ssh_config_fname)
+
+ self.log.info("opening connection to host '{}' with ssh "
+ "options '{}'".format(host, ssh_options))
+
+ conn.conn = remoto.Connection(host,
+ logger=self.log,
+ detect_sudo=True,
+ ssh_options=ssh_options)
+
+ conn.conn.import_module(remotes)
+
+ return conn
+
+ def _executable_path(self, conn, executable):
+ """
+ Remote validator that accepts a connection object to ensure that a certain
+ executable is available returning its full path if so.
+
+ Otherwise an exception with thorough details will be raised, informing the
+ user that the executable was not found.
+ """
+ executable_path = conn.remote_module.which(executable)
+ if not executable_path:
+ raise RuntimeError("Executable '{}' not found on host '{}'".format(
+ executable, conn.hostname))
+ self.log.info("Found executable '{}' at path '{}'".format(executable,
+ executable_path))
+ return executable_path
+
+ def _build_ceph_conf(self):
+ """
+ Build a minimal `ceph.conf` containing the current monitor hosts.
+
+ Notes:
+ - ceph-volume complains if no section header (e.g. global) exists
+ - other ceph cli tools complained about no EOF newline
+
+ TODO:
+ - messenger v2 syntax?
+ """
+ mon_map = self.get("mon_map")
+ mon_addrs = map(lambda m: m["addr"], mon_map["mons"])
+ mon_hosts = ", ".join(mon_addrs)
+ return "[global]\nmon host = {}\n".format(mon_hosts)
+
+ def _ensure_ceph_conf(self, conn, network=False):
+ """
+ Install ceph.conf on remote node if it doesn't exist.
+ """
+ conf = self._build_ceph_conf()
+ if network:
+ conf += "public_network = {}\n".format(network)
+ conn.remote_module.write_conf("/etc/ceph/ceph.conf", conf)
+
+ def _get_bootstrap_key(self, service_type):
+ """
+ Fetch a bootstrap key for a service type.
+
+ :param service_type: name (e.g. mds, osd, mon, ...)
+ """
+ identity_dict = {
+ 'admin' : 'client.admin',
+ 'mds' : 'client.bootstrap-mds',
+ 'mgr' : 'client.bootstrap-mgr',
+ 'osd' : 'client.bootstrap-osd',
+ 'rgw' : 'client.bootstrap-rgw',
+ 'mon' : 'mon.'
+ }
+
+ identity = identity_dict[service_type]
+
+ ret, out, err = self.mon_command({
+ "prefix": "auth get",
+ "entity": identity
+ })
+
+ if ret == -errno.ENOENT:
+ raise RuntimeError("Entity '{}' not found: '{}'".format(identity, err))
+ elif ret != 0:
+ raise RuntimeError("Error retrieving key for '{}' ret {}: '{}'".format(
+ identity, ret, err))
+
+ return out
+
+ def _bootstrap_mgr(self, conn):
+ """
+ Bootstrap a manager.
+
+ 1. install a copy of ceph.conf
+ 2. install the manager bootstrap key
+
+ :param conn: remote host connection
+ """
+ self._ensure_ceph_conf(conn)
+ keyring = self._get_bootstrap_key("mgr")
+ keyring_path = "/var/lib/ceph/bootstrap-mgr/ceph.keyring"
+ conn.remote_module.write_keyring(keyring_path, keyring)
+ return keyring_path
+
+ def _bootstrap_osd(self, conn):
+ """
+ Bootstrap an osd.
+
+ 1. install a copy of ceph.conf
+ 2. install the osd bootstrap key
+
+ :param conn: remote host connection
+ """
+ self._ensure_ceph_conf(conn)
+ keyring = self._get_bootstrap_key("osd")
+ keyring_path = "/var/lib/ceph/bootstrap-osd/ceph.keyring"
+ conn.remote_module.write_keyring(keyring_path, keyring)
+ return keyring_path
+
+ def _get_hosts(self, wanted=None):
+ return self.inventory_cache.items_filtered(wanted)
+
+ def add_host(self, host):
+ """
+ Add a host to be managed by the orchestrator.
+
+ :param host: host name
+ """
+ @log_exceptions
+ def run(host):
+ self.inventory_cache[host] = orchestrator.OutdatableData()
+ return "Added host '{}'".format(host)
+
+ return SSHWriteCompletion(
+ self._worker_pool.apply_async(run, (host,)))
+
+ def remove_host(self, host):
+ """
+ Remove a host from orchestrator management.
+
+ :param host: host name
+ """
+ @log_exceptions
+ def run(host):
+ del self.inventory_cache[host]
+ return "Removed host '{}'".format(host)
+
+ return SSHWriteCompletion(
+ self._worker_pool.apply_async(run, (host,)))
+
+ def get_hosts(self):
+ """
+ Return a list of hosts managed by the orchestrator.
+
+ Notes:
+ - skip async: manager reads from cache.
+
+ TODO:
+ - InventoryNode probably needs to be able to report labels
+ """
+ nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache]
+ return orchestrator.TrivialReadCompletion(nodes)
+
+ def _get_device_inventory(self, host):
+ """
+ Query storage devices on a remote node.
+
+ :return: list of InventoryDevice
+ """
+ conn = self._get_connection(host)
+
+ try:
+ ceph_volume_executable = self._executable_path(conn, 'ceph-volume')
+ command = [
+ ceph_volume_executable,
+ "inventory",
+ "--format=json"
+ ]
+
+ out, err, code = remoto.process.check(conn, command)
+ host_devices = json.loads(out[0])
+ return host_devices
+
+ except Exception as ex:
+ self.log.exception(ex)
+ raise
+
+ finally:
+ conn.exit()
+
+ def get_inventory(self, node_filter=None, refresh=False):
+ """
+ Return the storage inventory of nodes matching the given filter.
+
+ :param node_filter: node filter
+
+ TODO:
+ - add filtering by label
+ """
+ if node_filter:
+ hosts = node_filter.nodes
+ self._require_hosts(hosts)
+ hosts = self._get_hosts(hosts)
+ else:
+ # this implies the returned hosts are registered
+ hosts = self._get_hosts()
+
+ @log_exceptions
+ def run(host, host_info):
+ # type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode
+
+ timeout_min = int(self.get_module_option(
+ "inventory_cache_timeout_min",
+ self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN))
+
+ if host_info.outdated(timeout_min) or refresh:
+ self.log.info("refresh stale inventory for '{}'".format(host))
+ data = self._get_device_inventory(host)
+ host_info = orchestrator.OutdatableData(data)
+ self.inventory_cache[host] = host_info
+ else:
+ self.log.debug("reading cached inventory for '{}'".format(host))
+
+ devices = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(host_info.data)
+ return orchestrator.InventoryNode(host, devices)
+
+ results = []
+ for key, host_info in hosts:
+ result = self._worker_pool.apply_async(run, (key, host_info))
+ results.append(result)
+
+ return SSHReadCompletion(results)
+
+ @log_exceptions
+ def _create_osd(self, host, drive_group):
+ conn = self._get_connection(host)
+ try:
+ devices = drive_group.data_devices.paths
+ self._bootstrap_osd(conn)
+
+ for device in devices:
+ ceph_volume_executable = self._executable_path(conn, "ceph-volume")
+ command = [
+ ceph_volume_executable,
+ "lvm",
+ "create",
+ "--cluster-fsid", self._get_cluster_fsid(),
+ "--{}".format(drive_group.objectstore),
+ "--data", device
+ ]
+ remoto.process.run(conn, command)
+
+ return "Created osd on host '{}'".format(host)
+
+ except:
+ raise
+
+ finally:
+ conn.exit()
+
+ def create_osds(self, drive_group, all_hosts=None):
+ """
+ Create a new osd.
+
+ The orchestrator CLI currently handles a narrow form of drive
+ specification defined by a single block device using bluestore.
+
+ :param drive_group: osd specification
+
+ TODO:
+ - support full drive_group specification
+ - support batch creation
+ """
+ assert len(drive_group.hosts(all_hosts)) == 1
+ assert len(drive_group.data_devices.paths) > 0
+ assert all(map(lambda p: isinstance(p, six.string_types),
+ drive_group.data_devices.paths))
+
+ host = drive_group.hosts(all_hosts)[0]
+ self._require_hosts(host)
+
+ result = self._worker_pool.apply_async(self._create_osd, (host,
+ drive_group))
+
+ return SSHWriteCompletion(result)
+
+ def _create_mon(self, host, network):
+ """
+ Create a new monitor on the given host.
+ """
+ self.log.info("create_mon({}:{}): starting".format(host, network))
+
+ conn = self._get_connection(host)
+
+ try:
+ self._ensure_ceph_conf(conn, network)
+
+ uid = conn.remote_module.path_getuid("/var/lib/ceph")
+ gid = conn.remote_module.path_getgid("/var/lib/ceph")
+
+ # install client admin key on target mon host
+ admin_keyring = self._get_bootstrap_key("admin")
+ admin_keyring_path = '/etc/ceph/ceph.client.admin.keyring'
+ conn.remote_module.write_keyring(admin_keyring_path, admin_keyring, uid, gid)
+
+ mon_path = "/var/lib/ceph/mon/ceph-{name}".format(name=host)
+ conn.remote_module.create_mon_path(mon_path, uid, gid)
+
+ # bootstrap key
+ conn.remote_module.safe_makedirs("/var/lib/ceph/tmp")
+ monitor_keyring = self._get_bootstrap_key("mon")
+ mon_keyring_path = "/var/lib/ceph/tmp/ceph-{name}.mon.keyring".format(name=host)
+ conn.remote_module.write_file(
+ mon_keyring_path,
+ monitor_keyring,
+ 0o600,
+ None,
+ uid,
+ gid
+ )
+
+ # monitor map
+ monmap_path = "/var/lib/ceph/tmp/ceph.{name}.monmap".format(name=host)
+ remoto.process.run(conn,
+ ['ceph', 'mon', 'getmap', '-o', monmap_path],
+ )
+
+ user_args = []
+ if uid != 0:
+ user_args = user_args + [ '--setuser', str(uid) ]
+ if gid != 0:
+ user_args = user_args + [ '--setgroup', str(gid) ]
+
+ remoto.process.run(conn,
+ ['ceph-mon', '--mkfs', '-i', host,
+ '--monmap', monmap_path, '--keyring', mon_keyring_path
+ ] + user_args
+ )
+
+ remoto.process.run(conn,
+ ['systemctl', 'enable', 'ceph.target'],
+ timeout=7,
+ )
+
+ remoto.process.run(conn,
+ ['systemctl', 'enable', 'ceph-mon@{name}'.format(name=host)],
+ timeout=7,
+ )
+
+ remoto.process.run(conn,
+ ['systemctl', 'start', 'ceph-mon@{name}'.format(name=host)],
+ timeout=7,
+ )
+
+ return "Created mon on host '{}'".format(host)
+
+ except Exception as e:
+ self.log.error("create_mon({}:{}): error: {}".format(host, network, e))
+ raise
+
+ finally:
+ self.log.info("create_mon({}:{}): finished".format(host, network))
+ conn.exit()
+
+ def update_mons(self, num, hosts):
+ """
+ Adjust the number of cluster monitors.
+ """
+ # current support limited to adding monitors.
+ mon_map = self.get("mon_map")
+ num_mons = len(mon_map["mons"])
+ if num == num_mons:
+ return SSHWriteCompletionReady("The requested number of monitors exist.")
+ if num < num_mons:
+ raise NotImplementedError("Removing monitors is not supported.")
+
+ # check that all the hostnames are registered
+ self._require_hosts(map(lambda h: h[0], hosts))
+
+ # current support requires a network to be specified
+ for host, network in hosts:
+ if not network:
+ raise RuntimeError("Host '{}' missing network "
+ "part".format(host))
+
+ # explicit placement: enough hosts provided?
+ num_new_mons = num - num_mons
+ if len(hosts) < num_new_mons:
+ raise RuntimeError("Error: {} hosts provided, expected {}".format(
+ len(hosts), num_new_mons))
+
+ self.log.info("creating {} monitors on hosts: '{}'".format(
+ num_new_mons, ",".join(map(lambda h: ":".join(h), hosts))))
+
+ # TODO: we may want to chain the creation of the monitors so they join
+ # the quroum one at a time.
+ results = []
+ for host, network in hosts:
+ result = self._worker_pool.apply_async(self._create_mon, (host,
+ network))
+ results.append(result)
+
+ return SSHWriteCompletion(results)
+
+ def _create_mgr(self, host):
+ """
+ Create a new manager instance on a host.
+ """
+ self.log.info("create_mgr({}): starting".format(host))
+
+ conn = self._get_connection(host)
+
+ try:
+ bootstrap_keyring_path = self._bootstrap_mgr(conn)
+
+ mgr_path = "/var/lib/ceph/mgr/ceph-{name}".format(name=host)
+ conn.remote_module.safe_makedirs(mgr_path)
+ keyring_path = os.path.join(mgr_path, "keyring")
+
+ command = [
+ 'ceph',
+ '--name', 'client.bootstrap-mgr',
+ '--keyring', bootstrap_keyring_path,
+ 'auth', 'get-or-create', 'mgr.{name}'.format(name=host),
+ 'mon', 'allow profile mgr',
+ 'osd', 'allow *',
+ 'mds', 'allow *',
+ '-o',
+ keyring_path
+ ]
+
+ out, err, ret = remoto.process.check(conn, command)
+ if ret != 0:
+ raise Exception("oops")
+
+ remoto.process.run(conn,
+ ['systemctl', 'enable', 'ceph-mgr@{name}'.format(name=host)],
+ timeout=7
+ )
+
+ remoto.process.run(conn,
+ ['systemctl', 'start', 'ceph-mgr@{name}'.format(name=host)],
+ timeout=7
+ )
+
+ remoto.process.run(conn,
+ ['systemctl', 'enable', 'ceph.target'],
+ timeout=7
+ )
+
+ return "Created mgr on host '{}'".format(host)
+
+ except Exception as e:
+ self.log.error("create_mgr({}): error: {}".format(host, e))
+ raise
+
+ finally:
+ self.log.info("create_mgr({}): finished".format(host))
+ conn.exit()
+
+ def update_mgrs(self, num, hosts):
+ """
+ Adjust the number of cluster managers.
+ """
+ # current support limited to adding managers.
+ mgr_map = self.get("mgr_map")
+ num_mgrs = 1 if mgr_map["active_name"] else 0
+ num_mgrs += len(mgr_map["standbys"])
+ if num == num_mgrs:
+ return SSHWriteCompletionReady("The requested number of managers exist.")
+ if num < num_mgrs:
+ raise NotImplementedError("Removing managers is not supported")
+
+ # check that all the hosts are registered
+ self._require_hosts(hosts)
+
+ # we assume explicit placement by which there are the same number of
+ # hosts specified as the size of increase in number of daemons.
+ num_new_mgrs = num - num_mgrs
+ if len(hosts) < num_new_mgrs:
+ raise RuntimeError("Error: {} hosts provided, expected {}".format(
+ len(hosts), num_new_mgrs))
+
+ self.log.info("creating {} managers on hosts: '{}'".format(
+ num_new_mgrs, ",".join(hosts)))
+
+ results = []
+ for i in range(num_new_mgrs):
+ result = self._worker_pool.apply_async(self._create_mgr, (hosts[i],))
+ results.append(result)
+
+ return SSHWriteCompletion(results)
diff --git a/src/pybind/mgr/ssh/remotes.py b/src/pybind/mgr/ssh/remotes.py
new file mode 100644
index 00000000..da057e83
--- /dev/null
+++ b/src/pybind/mgr/ssh/remotes.py
@@ -0,0 +1,81 @@
+# ceph-deploy ftw
+import os
+import errno
+import tempfile
+import shutil
+
+def safe_makedirs(path, uid=-1, gid=-1):
+ """ create path recursively if it doesn't exist """
+ try:
+ os.makedirs(path)
+ except OSError as e:
+ if e.errno == errno.EEXIST:
+ pass
+ else:
+ raise
+ else:
+ os.chown(path, uid, gid)
+
+def write_conf(path, conf):
+ if not os.path.exists(path):
+ dirpath = os.path.dirname(path)
+ if os.path.exists(dirpath):
+ with open(path, "w") as f:
+ f.write(conf)
+ os.chmod(path, 0o644)
+ else:
+ raise RuntimeError(
+ "{0} does not exist".format(dirpath))
+
+def write_keyring(path, key, overwrite=False, uid=-1, gid=-1):
+ dirname = os.path.dirname(path)
+ if not os.path.exists(dirname):
+ safe_makedirs(dirname, uid, gid)
+ if not overwrite and os.path.exists(path):
+ return
+ with open(path, "wb") as f:
+ f.write(key.encode('utf-8'))
+
+def create_mon_path(path, uid=-1, gid=-1):
+ """create the mon path if it does not exist"""
+ if not os.path.exists(path):
+ os.makedirs(path)
+ os.chown(path, uid, gid);
+
+def write_file(path, content, mode=0o644, directory=None, uid=-1, gid=-1):
+ if directory:
+ if path.startswith("/"):
+ path = path[1:]
+ path = os.path.join(directory, path)
+ if os.path.exists(path):
+ # Delete file in case we are changing its mode
+ os.unlink(path)
+ with os.fdopen(os.open(path, os.O_WRONLY | os.O_CREAT, mode), 'wb') as f:
+ f.write(content.encode('utf-8'))
+ os.chown(path, uid, gid)
+
+def path_getuid(path):
+ return os.stat(path).st_uid
+
+def path_getgid(path):
+ return os.stat(path).st_gid
+
+def which(executable):
+ """find the location of an executable"""
+ locations = (
+ '/usr/local/bin',
+ '/bin',
+ '/usr/bin',
+ '/usr/local/sbin',
+ '/usr/sbin',
+ '/sbin',
+ )
+
+ for location in locations:
+ executable_path = os.path.join(location, executable)
+ if os.path.exists(executable_path) and os.path.isfile(executable_path):
+ return executable_path
+
+if __name__ == '__channelexec__':
+ for item in channel:
+ channel.send(eval(item))