summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/ssh/module.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/ssh/module.py')
-rw-r--r--src/pybind/mgr/ssh/module.py740
1 files changed, 740 insertions, 0 deletions
diff --git a/src/pybind/mgr/ssh/module.py b/src/pybind/mgr/ssh/module.py
new file mode 100644
index 00000000..73675e52
--- /dev/null
+++ b/src/pybind/mgr/ssh/module.py
@@ -0,0 +1,740 @@
+import json
+import errno
+import logging
+from functools import wraps
+
+import six
+import os
+import tempfile
+import multiprocessing.pool
+
+from mgr_module import MgrModule
+import orchestrator
+
+from . import remotes
+
+try:
+ import remoto
+ import remoto.process
+except ImportError as e:
+ remoto = None
+ remoto_import_error = str(e)
+
+logger = logging.getLogger(__name__)
+
+# high-level TODO:
+# - bring over some of the protections from ceph-deploy that guard against
+# multiple bootstrapping / initialization
+
+class SSHCompletionmMixin(object):
+ def __init__(self, result):
+ if isinstance(result, multiprocessing.pool.AsyncResult):
+ self._result = [result]
+ else:
+ self._result = result
+ assert isinstance(self._result, list)
+
+ @property
+ def result(self):
+ return list(map(lambda r: r.get(), self._result))
+
+class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion):
+ @property
+ def is_complete(self):
+ return all(map(lambda r: r.ready(), self._result))
+
+
+class SSHWriteCompletion(SSHCompletionmMixin, orchestrator.WriteCompletion):
+
+ @property
+ def is_persistent(self):
+ return all(map(lambda r: r.ready(), self._result))
+
+ @property
+ def is_effective(self):
+ return all(map(lambda r: r.ready(), self._result))
+
+ @property
+ def is_errored(self):
+ for r in self._result:
+ if not r.ready():
+ return False
+ if not r.successful():
+ return True
+ return False
+
+class SSHWriteCompletionReady(SSHWriteCompletion):
+ def __init__(self, result):
+ orchestrator.WriteCompletion.__init__(self)
+ self._result = result
+
+ @property
+ def result(self):
+ return self._result
+
+ @property
+ def is_persistent(self):
+ return True
+
+ @property
+ def is_effective(self):
+ return True
+
+ @property
+ def is_errored(self):
+ return False
+
+class SSHConnection(object):
+ """
+ Tie tempfile lifetime (e.g. ssh_config) to a remoto connection.
+ """
+ def __init__(self):
+ self.conn = None
+ self.temp_file = None
+
+ # proxy to the remoto connection
+ def __getattr__(self, name):
+ return getattr(self.conn, name)
+
+
+def log_exceptions(f):
+ if six.PY3:
+ return f
+ else:
+ # Python 2 does no exception chaining, thus the
+ # real exception is lost
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ try:
+ return f(*args, **kwargs)
+ except Exception:
+ logger.exception('something went wrong.')
+ raise
+ return wrapper
+
+
+class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
+
+ _STORE_HOST_PREFIX = "host"
+ _DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN = 10
+
+ MODULE_OPTIONS = [
+ {'name': 'ssh_config_file'},
+ {'name': 'inventory_cache_timeout_min'},
+ ]
+
+ COMMANDS = [
+ {
+ 'cmd': 'ssh set-ssh-config',
+ 'desc': 'Set the ssh_config file (use -i <ssh_config>)',
+ 'perm': 'rw'
+ },
+ {
+ 'cmd': 'ssh clear-ssh-config',
+ 'desc': 'Clear the ssh_config file',
+ 'perm': 'rw'
+ },
+ ]
+
+ def __init__(self, *args, **kwargs):
+ super(SSHOrchestrator, self).__init__(*args, **kwargs)
+ self._cluster_fsid = None
+ self._worker_pool = multiprocessing.pool.ThreadPool(1)
+
+ # the keys in inventory_cache are authoritative.
+ # You must not call remove_outdated()
+ # The values are cached by instance.
+ # cache is invalidated by
+ # 1. timeout
+ # 2. refresh parameter
+ self.inventory_cache = orchestrator.OutdatablePersistentDict(self, self._STORE_HOST_PREFIX)
+
+ def handle_command(self, inbuf, command):
+ if command["prefix"] == "ssh set-ssh-config":
+ return self._set_ssh_config(inbuf, command)
+ elif command["prefix"] == "ssh clear-ssh-config":
+ return self._clear_ssh_config(inbuf, command)
+ else:
+ raise NotImplementedError(command["prefix"])
+
+ @staticmethod
+ def can_run():
+ if remoto is not None:
+ return True, ""
+ else:
+ return False, "loading remoto library:{}".format(
+ remoto_import_error)
+
+ def available(self):
+ """
+ The SSH orchestrator is always available.
+ """
+ return self.can_run()
+
+ def wait(self, completions):
+ self.log.info("wait: completions={}".format(completions))
+
+ complete = True
+ for c in completions:
+ if c.is_complete:
+ continue
+
+ if not isinstance(c, SSHReadCompletion) and \
+ not isinstance(c, SSHWriteCompletion):
+ raise TypeError("unexpected completion: {}".format(c.__class__))
+
+ complete = False
+
+ return complete
+
+ def _get_cluster_fsid(self):
+ """
+ Fetch and cache the cluster fsid.
+ """
+ if not self._cluster_fsid:
+ self._cluster_fsid = self.get("mon_map")["fsid"]
+ assert isinstance(self._cluster_fsid, six.string_types)
+ return self._cluster_fsid
+
+ def _require_hosts(self, hosts):
+ """
+ Raise an error if any of the given hosts are unregistered.
+ """
+ if isinstance(hosts, six.string_types):
+ hosts = [hosts]
+ keys = self.inventory_cache.keys()
+ unregistered_hosts = set(hosts) - keys
+ if unregistered_hosts:
+ logger.warning('keys = {}'.format(keys))
+ raise RuntimeError("Host(s) {} not registered".format(
+ ", ".join(map(lambda h: "'{}'".format(h),
+ unregistered_hosts))))
+
+ def _set_ssh_config(self, inbuf, command):
+ """
+ Set an ssh_config file provided from stdin
+
+ TODO:
+ - validation
+ """
+ if len(inbuf) == 0:
+ return errno.EINVAL, "", "empty ssh config provided"
+ self.set_store("ssh_config", inbuf)
+ return 0, "", ""
+
+ def _clear_ssh_config(self, inbuf, command):
+ """
+ Clear the ssh_config file provided from stdin
+ """
+ self.set_store("ssh_config", None)
+ self.ssh_config_tmp = None
+ return 0, "", ""
+
+ def _get_connection(self, host):
+ """
+ Setup a connection for running commands on remote host.
+ """
+ ssh_options = None
+
+ conn = SSHConnection()
+
+ ssh_config = self.get_store("ssh_config")
+ if ssh_config is not None:
+ conn.temp_file = tempfile.NamedTemporaryFile()
+ conn.temp_file.write(ssh_config.encode('utf-8'))
+ conn.temp_file.flush() # make visible to other processes
+ ssh_config_fname = conn.temp_file.name
+ else:
+ ssh_config_fname = self.get_localized_module_option("ssh_config_file")
+
+ if ssh_config_fname:
+ if not os.path.isfile(ssh_config_fname):
+ raise Exception("ssh_config \"{}\" does not exist".format(ssh_config_fname))
+ ssh_options = "-F {}".format(ssh_config_fname)
+
+ self.log.info("opening connection to host '{}' with ssh "
+ "options '{}'".format(host, ssh_options))
+
+ conn.conn = remoto.Connection(host,
+ logger=self.log,
+ detect_sudo=True,
+ ssh_options=ssh_options)
+
+ conn.conn.import_module(remotes)
+
+ return conn
+
+ def _executable_path(self, conn, executable):
+ """
+ Remote validator that accepts a connection object to ensure that a certain
+ executable is available returning its full path if so.
+
+ Otherwise an exception with thorough details will be raised, informing the
+ user that the executable was not found.
+ """
+ executable_path = conn.remote_module.which(executable)
+ if not executable_path:
+ raise RuntimeError("Executable '{}' not found on host '{}'".format(
+ executable, conn.hostname))
+ self.log.info("Found executable '{}' at path '{}'".format(executable,
+ executable_path))
+ return executable_path
+
+ def _build_ceph_conf(self):
+ """
+ Build a minimal `ceph.conf` containing the current monitor hosts.
+
+ Notes:
+ - ceph-volume complains if no section header (e.g. global) exists
+ - other ceph cli tools complained about no EOF newline
+
+ TODO:
+ - messenger v2 syntax?
+ """
+ mon_map = self.get("mon_map")
+ mon_addrs = map(lambda m: m["addr"], mon_map["mons"])
+ mon_hosts = ", ".join(mon_addrs)
+ return "[global]\nmon host = {}\n".format(mon_hosts)
+
+ def _ensure_ceph_conf(self, conn, network=False):
+ """
+ Install ceph.conf on remote node if it doesn't exist.
+ """
+ conf = self._build_ceph_conf()
+ if network:
+ conf += "public_network = {}\n".format(network)
+ conn.remote_module.write_conf("/etc/ceph/ceph.conf", conf)
+
+ def _get_bootstrap_key(self, service_type):
+ """
+ Fetch a bootstrap key for a service type.
+
+ :param service_type: name (e.g. mds, osd, mon, ...)
+ """
+ identity_dict = {
+ 'admin' : 'client.admin',
+ 'mds' : 'client.bootstrap-mds',
+ 'mgr' : 'client.bootstrap-mgr',
+ 'osd' : 'client.bootstrap-osd',
+ 'rgw' : 'client.bootstrap-rgw',
+ 'mon' : 'mon.'
+ }
+
+ identity = identity_dict[service_type]
+
+ ret, out, err = self.mon_command({
+ "prefix": "auth get",
+ "entity": identity
+ })
+
+ if ret == -errno.ENOENT:
+ raise RuntimeError("Entity '{}' not found: '{}'".format(identity, err))
+ elif ret != 0:
+ raise RuntimeError("Error retrieving key for '{}' ret {}: '{}'".format(
+ identity, ret, err))
+
+ return out
+
+ def _bootstrap_mgr(self, conn):
+ """
+ Bootstrap a manager.
+
+ 1. install a copy of ceph.conf
+ 2. install the manager bootstrap key
+
+ :param conn: remote host connection
+ """
+ self._ensure_ceph_conf(conn)
+ keyring = self._get_bootstrap_key("mgr")
+ keyring_path = "/var/lib/ceph/bootstrap-mgr/ceph.keyring"
+ conn.remote_module.write_keyring(keyring_path, keyring)
+ return keyring_path
+
+ def _bootstrap_osd(self, conn):
+ """
+ Bootstrap an osd.
+
+ 1. install a copy of ceph.conf
+ 2. install the osd bootstrap key
+
+ :param conn: remote host connection
+ """
+ self._ensure_ceph_conf(conn)
+ keyring = self._get_bootstrap_key("osd")
+ keyring_path = "/var/lib/ceph/bootstrap-osd/ceph.keyring"
+ conn.remote_module.write_keyring(keyring_path, keyring)
+ return keyring_path
+
+ def _get_hosts(self, wanted=None):
+ return self.inventory_cache.items_filtered(wanted)
+
+ def add_host(self, host):
+ """
+ Add a host to be managed by the orchestrator.
+
+ :param host: host name
+ """
+ @log_exceptions
+ def run(host):
+ self.inventory_cache[host] = orchestrator.OutdatableData()
+ return "Added host '{}'".format(host)
+
+ return SSHWriteCompletion(
+ self._worker_pool.apply_async(run, (host,)))
+
+ def remove_host(self, host):
+ """
+ Remove a host from orchestrator management.
+
+ :param host: host name
+ """
+ @log_exceptions
+ def run(host):
+ del self.inventory_cache[host]
+ return "Removed host '{}'".format(host)
+
+ return SSHWriteCompletion(
+ self._worker_pool.apply_async(run, (host,)))
+
+ def get_hosts(self):
+ """
+ Return a list of hosts managed by the orchestrator.
+
+ Notes:
+ - skip async: manager reads from cache.
+
+ TODO:
+ - InventoryNode probably needs to be able to report labels
+ """
+ nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache]
+ return orchestrator.TrivialReadCompletion(nodes)
+
+ def _get_device_inventory(self, host):
+ """
+ Query storage devices on a remote node.
+
+ :return: list of InventoryDevice
+ """
+ conn = self._get_connection(host)
+
+ try:
+ ceph_volume_executable = self._executable_path(conn, 'ceph-volume')
+ command = [
+ ceph_volume_executable,
+ "inventory",
+ "--format=json"
+ ]
+
+ out, err, code = remoto.process.check(conn, command)
+ host_devices = json.loads(out[0])
+ return host_devices
+
+ except Exception as ex:
+ self.log.exception(ex)
+ raise
+
+ finally:
+ conn.exit()
+
+ def get_inventory(self, node_filter=None, refresh=False):
+ """
+ Return the storage inventory of nodes matching the given filter.
+
+ :param node_filter: node filter
+
+ TODO:
+ - add filtering by label
+ """
+ if node_filter:
+ hosts = node_filter.nodes
+ self._require_hosts(hosts)
+ hosts = self._get_hosts(hosts)
+ else:
+ # this implies the returned hosts are registered
+ hosts = self._get_hosts()
+
+ @log_exceptions
+ def run(host, host_info):
+ # type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode
+
+ timeout_min = int(self.get_module_option(
+ "inventory_cache_timeout_min",
+ self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN))
+
+ if host_info.outdated(timeout_min) or refresh:
+ self.log.info("refresh stale inventory for '{}'".format(host))
+ data = self._get_device_inventory(host)
+ host_info = orchestrator.OutdatableData(data)
+ self.inventory_cache[host] = host_info
+ else:
+ self.log.debug("reading cached inventory for '{}'".format(host))
+
+ devices = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(host_info.data)
+ return orchestrator.InventoryNode(host, devices)
+
+ results = []
+ for key, host_info in hosts:
+ result = self._worker_pool.apply_async(run, (key, host_info))
+ results.append(result)
+
+ return SSHReadCompletion(results)
+
+ @log_exceptions
+ def _create_osd(self, host, drive_group):
+ conn = self._get_connection(host)
+ try:
+ devices = drive_group.data_devices.paths
+ self._bootstrap_osd(conn)
+
+ for device in devices:
+ ceph_volume_executable = self._executable_path(conn, "ceph-volume")
+ command = [
+ ceph_volume_executable,
+ "lvm",
+ "create",
+ "--cluster-fsid", self._get_cluster_fsid(),
+ "--{}".format(drive_group.objectstore),
+ "--data", device
+ ]
+ remoto.process.run(conn, command)
+
+ return "Created osd on host '{}'".format(host)
+
+ except:
+ raise
+
+ finally:
+ conn.exit()
+
+ def create_osds(self, drive_group, all_hosts=None):
+ """
+ Create a new osd.
+
+ The orchestrator CLI currently handles a narrow form of drive
+ specification defined by a single block device using bluestore.
+
+ :param drive_group: osd specification
+
+ TODO:
+ - support full drive_group specification
+ - support batch creation
+ """
+ assert len(drive_group.hosts(all_hosts)) == 1
+ assert len(drive_group.data_devices.paths) > 0
+ assert all(map(lambda p: isinstance(p, six.string_types),
+ drive_group.data_devices.paths))
+
+ host = drive_group.hosts(all_hosts)[0]
+ self._require_hosts(host)
+
+ result = self._worker_pool.apply_async(self._create_osd, (host,
+ drive_group))
+
+ return SSHWriteCompletion(result)
+
+ def _create_mon(self, host, network):
+ """
+ Create a new monitor on the given host.
+ """
+ self.log.info("create_mon({}:{}): starting".format(host, network))
+
+ conn = self._get_connection(host)
+
+ try:
+ self._ensure_ceph_conf(conn, network)
+
+ uid = conn.remote_module.path_getuid("/var/lib/ceph")
+ gid = conn.remote_module.path_getgid("/var/lib/ceph")
+
+ # install client admin key on target mon host
+ admin_keyring = self._get_bootstrap_key("admin")
+ admin_keyring_path = '/etc/ceph/ceph.client.admin.keyring'
+ conn.remote_module.write_keyring(admin_keyring_path, admin_keyring, uid, gid)
+
+ mon_path = "/var/lib/ceph/mon/ceph-{name}".format(name=host)
+ conn.remote_module.create_mon_path(mon_path, uid, gid)
+
+ # bootstrap key
+ conn.remote_module.safe_makedirs("/var/lib/ceph/tmp")
+ monitor_keyring = self._get_bootstrap_key("mon")
+ mon_keyring_path = "/var/lib/ceph/tmp/ceph-{name}.mon.keyring".format(name=host)
+ conn.remote_module.write_file(
+ mon_keyring_path,
+ monitor_keyring,
+ 0o600,
+ None,
+ uid,
+ gid
+ )
+
+ # monitor map
+ monmap_path = "/var/lib/ceph/tmp/ceph.{name}.monmap".format(name=host)
+ remoto.process.run(conn,
+ ['ceph', 'mon', 'getmap', '-o', monmap_path],
+ )
+
+ user_args = []
+ if uid != 0:
+ user_args = user_args + [ '--setuser', str(uid) ]
+ if gid != 0:
+ user_args = user_args + [ '--setgroup', str(gid) ]
+
+ remoto.process.run(conn,
+ ['ceph-mon', '--mkfs', '-i', host,
+ '--monmap', monmap_path, '--keyring', mon_keyring_path
+ ] + user_args
+ )
+
+ remoto.process.run(conn,
+ ['systemctl', 'enable', 'ceph.target'],
+ timeout=7,
+ )
+
+ remoto.process.run(conn,
+ ['systemctl', 'enable', 'ceph-mon@{name}'.format(name=host)],
+ timeout=7,
+ )
+
+ remoto.process.run(conn,
+ ['systemctl', 'start', 'ceph-mon@{name}'.format(name=host)],
+ timeout=7,
+ )
+
+ return "Created mon on host '{}'".format(host)
+
+ except Exception as e:
+ self.log.error("create_mon({}:{}): error: {}".format(host, network, e))
+ raise
+
+ finally:
+ self.log.info("create_mon({}:{}): finished".format(host, network))
+ conn.exit()
+
+ def update_mons(self, num, hosts):
+ """
+ Adjust the number of cluster monitors.
+ """
+ # current support limited to adding monitors.
+ mon_map = self.get("mon_map")
+ num_mons = len(mon_map["mons"])
+ if num == num_mons:
+ return SSHWriteCompletionReady("The requested number of monitors exist.")
+ if num < num_mons:
+ raise NotImplementedError("Removing monitors is not supported.")
+
+ # check that all the hostnames are registered
+ self._require_hosts(map(lambda h: h[0], hosts))
+
+ # current support requires a network to be specified
+ for host, network in hosts:
+ if not network:
+ raise RuntimeError("Host '{}' missing network "
+ "part".format(host))
+
+ # explicit placement: enough hosts provided?
+ num_new_mons = num - num_mons
+ if len(hosts) < num_new_mons:
+ raise RuntimeError("Error: {} hosts provided, expected {}".format(
+ len(hosts), num_new_mons))
+
+ self.log.info("creating {} monitors on hosts: '{}'".format(
+ num_new_mons, ",".join(map(lambda h: ":".join(h), hosts))))
+
+ # TODO: we may want to chain the creation of the monitors so they join
+ # the quroum one at a time.
+ results = []
+ for host, network in hosts:
+ result = self._worker_pool.apply_async(self._create_mon, (host,
+ network))
+ results.append(result)
+
+ return SSHWriteCompletion(results)
+
+ def _create_mgr(self, host):
+ """
+ Create a new manager instance on a host.
+ """
+ self.log.info("create_mgr({}): starting".format(host))
+
+ conn = self._get_connection(host)
+
+ try:
+ bootstrap_keyring_path = self._bootstrap_mgr(conn)
+
+ mgr_path = "/var/lib/ceph/mgr/ceph-{name}".format(name=host)
+ conn.remote_module.safe_makedirs(mgr_path)
+ keyring_path = os.path.join(mgr_path, "keyring")
+
+ command = [
+ 'ceph',
+ '--name', 'client.bootstrap-mgr',
+ '--keyring', bootstrap_keyring_path,
+ 'auth', 'get-or-create', 'mgr.{name}'.format(name=host),
+ 'mon', 'allow profile mgr',
+ 'osd', 'allow *',
+ 'mds', 'allow *',
+ '-o',
+ keyring_path
+ ]
+
+ out, err, ret = remoto.process.check(conn, command)
+ if ret != 0:
+ raise Exception("oops")
+
+ remoto.process.run(conn,
+ ['systemctl', 'enable', 'ceph-mgr@{name}'.format(name=host)],
+ timeout=7
+ )
+
+ remoto.process.run(conn,
+ ['systemctl', 'start', 'ceph-mgr@{name}'.format(name=host)],
+ timeout=7
+ )
+
+ remoto.process.run(conn,
+ ['systemctl', 'enable', 'ceph.target'],
+ timeout=7
+ )
+
+ return "Created mgr on host '{}'".format(host)
+
+ except Exception as e:
+ self.log.error("create_mgr({}): error: {}".format(host, e))
+ raise
+
+ finally:
+ self.log.info("create_mgr({}): finished".format(host))
+ conn.exit()
+
+ def update_mgrs(self, num, hosts):
+ """
+ Adjust the number of cluster managers.
+ """
+ # current support limited to adding managers.
+ mgr_map = self.get("mgr_map")
+ num_mgrs = 1 if mgr_map["active_name"] else 0
+ num_mgrs += len(mgr_map["standbys"])
+ if num == num_mgrs:
+ return SSHWriteCompletionReady("The requested number of managers exist.")
+ if num < num_mgrs:
+ raise NotImplementedError("Removing managers is not supported")
+
+ # check that all the hosts are registered
+ self._require_hosts(hosts)
+
+ # we assume explicit placement by which there are the same number of
+ # hosts specified as the size of increase in number of daemons.
+ num_new_mgrs = num - num_mgrs
+ if len(hosts) < num_new_mgrs:
+ raise RuntimeError("Error: {} hosts provided, expected {}".format(
+ len(hosts), num_new_mgrs))
+
+ self.log.info("creating {} managers on hosts: '{}'".format(
+ num_new_mgrs, ",".join(hosts)))
+
+ results = []
+ for i in range(num_new_mgrs):
+ result = self._worker_pool.apply_async(self._create_mgr, (hosts[i],))
+ results.append(result)
+
+ return SSHWriteCompletion(results)