summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/rook/module.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/rook/module.py')
-rw-r--r--src/pybind/mgr/rook/module.py469
1 files changed, 469 insertions, 0 deletions
diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py
new file mode 100644
index 00000000..4b49d681
--- /dev/null
+++ b/src/pybind/mgr/rook/module.py
@@ -0,0 +1,469 @@
+import threading
+import functools
+import os
+import uuid
+try:
+ from typing import List
+except ImportError:
+ pass # just for type checking
+
+try:
+ from kubernetes import client, config
+ from kubernetes.client.rest import ApiException
+
+ kubernetes_imported = True
+except ImportError:
+ kubernetes_imported = False
+ client = None
+ config = None
+
+from mgr_module import MgrModule
+import orchestrator
+
+from .rook_cluster import RookCluster
+
+
+all_completions = []
+
+
+class RookReadCompletion(orchestrator.ReadCompletion):
+ """
+ All reads are simply API calls: avoid spawning
+ huge numbers of threads by just running them
+ inline when someone calls wait()
+ """
+
+ def __init__(self, cb):
+ super(RookReadCompletion, self).__init__()
+ self.cb = cb
+ self._result = None
+ self._complete = False
+
+ self.message = "<read op>"
+
+ # XXX hacky global
+ global all_completions
+ all_completions.append(self)
+
+ @property
+ def result(self):
+ return self._result
+
+ @property
+ def is_complete(self):
+ return self._complete
+
+ def execute(self):
+ self._result = self.cb()
+ self._complete = True
+
+
+class RookWriteCompletion(orchestrator.WriteCompletion):
+ """
+ Writes are a two-phase thing, firstly sending
+ the write to the k8s API (fast) and then waiting
+ for the corresponding change to appear in the
+ Ceph cluster (slow)
+ """
+ # XXX kubernetes bindings call_api already usefully has
+ # a completion= param that uses threads. Maybe just
+ # use that?
+ def __init__(self, execute_cb, complete_cb, message):
+ super(RookWriteCompletion, self).__init__()
+ self.execute_cb = execute_cb
+ self.complete_cb = complete_cb
+
+ # Executed means I executed my k8s API call, it may or may
+ # not have succeeded
+ self.executed = False
+
+ # Result of k8s API call, this is set if executed==True
+ self._result = None
+
+ self.effective = False
+
+ self.id = str(uuid.uuid4())
+
+ self.message = message
+
+ self.error = None
+
+ # XXX hacky global
+ global all_completions
+ all_completions.append(self)
+
+ def __str__(self):
+ return self.message
+
+ @property
+ def result(self):
+ return self._result
+
+ @property
+ def is_persistent(self):
+ return (not self.is_errored) and self.executed
+
+ @property
+ def is_effective(self):
+ return self.effective
+
+ @property
+ def is_errored(self):
+ return self.error is not None
+
+ def execute(self):
+ if not self.executed:
+ self._result = self.execute_cb()
+ self.executed = True
+
+ if not self.effective:
+ # TODO: check self.result for API errors
+ if self.complete_cb is None:
+ self.effective = True
+ else:
+ self.effective = self.complete_cb()
+
+
+def deferred_read(f):
+ """
+ Decorator to make RookOrchestrator methods return
+ a completion object that executes themselves.
+ """
+
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ return RookReadCompletion(lambda: f(*args, **kwargs))
+
+ return wrapper
+
+
+class RookEnv(object):
+ def __init__(self):
+ # POD_NAMESPACE already exist for Rook 0.9
+ self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
+
+ # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
+ self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
+
+ self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', "rook-ceph-system")
+ self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
+ self.api_name = "ceph.rook.io/" + self.crd_version
+
+ def api_version_match(self):
+ return self.crd_version == 'v1'
+
+ def has_namespace(self):
+ return 'POD_NAMESPACE' in os.environ
+
+
+class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
+ MODULE_OPTIONS = [
+ # TODO: configure k8s API addr instead of assuming local
+ ]
+
+ def wait(self, completions):
+ self.log.info("wait: completions={0}".format(completions))
+
+ incomplete = False
+
+ # Our `wait` implementation is very simple because everything's
+ # just an API call.
+ for c in completions:
+ if not isinstance(c, RookReadCompletion) and \
+ not isinstance(c, RookWriteCompletion):
+ raise TypeError(
+ "wait() requires list of completions, not {0}".format(
+ c.__class__
+ ))
+
+ if c.is_complete:
+ continue
+
+ try:
+ c.execute()
+ except Exception as e:
+ if not isinstance(e, orchestrator.OrchestratorError):
+ self.log.exception("Completion {0} threw an exception:".format(
+ c.message
+ ))
+ c.exception = e
+ c._complete = True
+
+ if not c.is_complete:
+ incomplete = True
+
+ return not incomplete
+
+ @staticmethod
+ def can_run():
+ if not kubernetes_imported:
+ return False, "`kubernetes` python module not found"
+ if not RookEnv().api_version_match():
+ return False, "Rook version unsupported."
+ return True, ''
+
+ def available(self):
+ if not kubernetes_imported:
+ return False, "`kubernetes` python module not found"
+ elif not self._rook_env.has_namespace():
+ return False, "ceph-mgr not running in Rook cluster"
+
+ try:
+ self.k8s.list_namespaced_pod(self._rook_env.cluster_name)
+ except ApiException as e:
+ return False, "Cannot reach Kubernetes API: {}".format(e)
+ else:
+ return True, ""
+
+ def __init__(self, *args, **kwargs):
+ super(RookOrchestrator, self).__init__(*args, **kwargs)
+
+ self._initialized = threading.Event()
+ self._k8s = None
+ self._rook_cluster = None
+ self._rook_env = RookEnv()
+
+ self._shutdown = threading.Event()
+
+ def shutdown(self):
+ self._shutdown.set()
+
+ @property
+ def k8s(self):
+ self._initialized.wait()
+ return self._k8s
+
+ @property
+ def rook_cluster(self):
+ # type: () -> RookCluster
+ self._initialized.wait()
+ return self._rook_cluster
+
+ def serve(self):
+ # For deployed clusters, we should always be running inside
+ # a Rook cluster. For development convenience, also support
+ # running outside (reading ~/.kube config)
+
+ if self._rook_env.cluster_name:
+ config.load_incluster_config()
+ cluster_name = self._rook_env.cluster_name
+ else:
+ self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
+ config.load_kube_config()
+
+ cluster_name = "rook-ceph"
+
+ # So that I can do port forwarding from my workstation - jcsp
+ from kubernetes.client import configuration
+ configuration.verify_ssl = False
+
+ self._k8s = client.CoreV1Api()
+
+ try:
+ # XXX mystery hack -- I need to do an API call from
+ # this context, or subsequent API usage from handle_command
+ # fails with SSLError('bad handshake'). Suspect some kind of
+ # thread context setup in SSL lib?
+ self._k8s.list_namespaced_pod(cluster_name)
+ except ApiException:
+ # Ignore here to make self.available() fail with a proper error message
+ pass
+
+ self._rook_cluster = RookCluster(
+ self._k8s,
+ self._rook_env)
+
+ self._initialized.set()
+
+ while not self._shutdown.is_set():
+ # XXX hack (or is it?) to kick all completions periodically,
+ # in case we had a caller that wait()'ed on them long enough
+ # to get persistence but not long enough to get completion
+
+ global all_completions
+ self.wait(all_completions)
+ all_completions = [c for c in all_completions if not c.is_complete]
+
+ self._shutdown.wait(5)
+
+ # TODO: watch Rook for config changes to complain/update if
+ # things look a bit out of sync?
+
+ @deferred_read
+ def get_inventory(self, node_filter=None, refresh=False):
+ node_list = None
+ if node_filter and node_filter.nodes:
+ # Explicit node list
+ node_list = node_filter.nodes
+ elif node_filter and node_filter.labels:
+ # TODO: query k8s API to resolve to node list, and pass
+ # it into RookCluster.get_discovered_devices
+ raise NotImplementedError()
+
+ devs = self.rook_cluster.get_discovered_devices(node_list)
+
+ result = []
+ for node_name, node_devs in devs.items():
+ devs = []
+ for d in node_devs:
+ dev = orchestrator.InventoryDevice()
+
+ # XXX CAUTION! https://github.com/rook/rook/issues/1716
+ # Passing this through for the sake of completeness but it
+ # is not trustworthy!
+ dev.blank = d['empty']
+ dev.type = 'hdd' if d['rotational'] else 'ssd'
+ dev.id = d['name']
+ dev.size = d['size']
+
+ if d['filesystem'] == "" and not d['rotational']:
+ # Empty or partitioned SSD
+ partitioned_space = sum(
+ [p['size'] for p in d['Partitions']])
+ dev.metadata_space_free = max(0, d[
+ 'size'] - partitioned_space)
+
+ devs.append(dev)
+
+ result.append(orchestrator.InventoryNode(node_name, devs))
+
+ return result
+
+ @deferred_read
+ def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
+
+ if service_type not in ("mds", "osd", "mgr", "mon", "nfs", None):
+ raise orchestrator.OrchestratorValidationError(service_type + " unsupported")
+
+ pods = self.rook_cluster.describe_pods(service_type, service_id, node_name)
+
+ result = []
+ for p in pods:
+ sd = orchestrator.ServiceDescription()
+ sd.nodename = p['nodename']
+ sd.container_id = p['name']
+ sd.service_type = p['labels']['app'].replace('rook-ceph-', '')
+
+ if sd.service_type == "osd":
+ sd.service_instance = "%s" % p['labels']["ceph-osd-id"]
+ elif sd.service_type == "mds":
+ sd.service = p['labels']['rook_file_system']
+ pfx = "{0}-".format(sd.service)
+ sd.service_instance = p['labels']['ceph_daemon_id'].replace(pfx, '', 1)
+ elif sd.service_type == "mon":
+ sd.service_instance = p['labels']["mon"]
+ elif sd.service_type == "mgr":
+ sd.service_instance = p['labels']["mgr"]
+ elif sd.service_type == "nfs":
+ sd.service = p['labels']['ceph_nfs']
+ sd.service_instance = p['labels']['instance']
+ sd.rados_config_location = self.rook_cluster.get_nfs_conf_url(sd.service, sd.service_instance)
+ elif sd.service_type == "rgw":
+ sd.service = p['labels']['rgw']
+ sd.service_instance = p['labels']['ceph_daemon_id']
+ else:
+ # Unknown type -- skip it
+ continue
+
+ result.append(sd)
+
+ return result
+
+ def _service_add_decorate(self, typename, spec, func):
+ return RookWriteCompletion(lambda: func(spec), None,
+ "Creating {0} services for {1}".format(typename, spec.name))
+
+ def add_stateless_service(self, service_type, spec):
+ # assert isinstance(spec, orchestrator.StatelessServiceSpec)
+ if service_type == "mds":
+ return self._service_add_decorate("Filesystem", spec,
+ self.rook_cluster.add_filesystem)
+ elif service_type == "rgw" :
+ return self._service_add_decorate("RGW", spec,
+ self.rook_cluster.add_objectstore)
+ elif service_type == "nfs" :
+ return self._service_add_decorate("NFS", spec,
+ self.rook_cluster.add_nfsgw)
+ else:
+ raise NotImplementedError(service_type)
+
+ def remove_stateless_service(self, service_type, service_id):
+ return RookWriteCompletion(
+ lambda: self.rook_cluster.rm_service(service_type, service_id), None,
+ "Removing {0} services for {1}".format(service_type, service_id))
+
+ def update_mons(self, num, hosts):
+ if hosts:
+ raise RuntimeError("Host list is not supported by rook.")
+
+ return RookWriteCompletion(
+ lambda: self.rook_cluster.update_mon_count(num), None,
+ "Updating mon count to {0}".format(num))
+
+ def update_stateless_service(self, svc_type, spec):
+ # only nfs is currently supported
+ if svc_type != "nfs":
+ raise NotImplementedError(svc_type)
+
+ num = spec.count
+ return RookWriteCompletion(
+ lambda: self.rook_cluster.update_nfs_count(spec.name, num), None,
+ "Updating NFS server count in {0} to {1}".format(spec.name, num))
+
+ def create_osds(self, drive_group, all_hosts):
+ # type: (orchestrator.DriveGroupSpec, List[str]) -> RookWriteCompletion
+
+ assert len(drive_group.hosts(all_hosts)) == 1
+ targets = []
+ if drive_group.data_devices:
+ targets += drive_group.data_devices.paths
+ if drive_group.data_directories:
+ targets += drive_group.data_directories
+
+ if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]):
+ raise RuntimeError("Node '{0}' is not in the Kubernetes "
+ "cluster".format(drive_group.hosts(all_hosts)))
+
+ # Validate whether cluster CRD can accept individual OSD
+ # creations (i.e. not useAllDevices)
+ if not self.rook_cluster.can_create_osd():
+ raise RuntimeError("Rook cluster configuration does not "
+ "support OSD creation.")
+
+ def execute():
+ return self.rook_cluster.add_osds(drive_group, all_hosts)
+
+ def is_complete():
+ # Find OSD pods on this host
+ pod_osd_ids = set()
+ pods = self._k8s.list_namespaced_pod(self._rook_env.namespace,
+ label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
+ field_selector="spec.nodeName={0}".format(
+ drive_group.hosts(all_hosts)[0]
+ )).items
+ for p in pods:
+ pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
+
+ self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
+
+ found = []
+ osdmap = self.get("osd_map")
+ for osd in osdmap['osds']:
+ osd_id = osd['osd']
+ if osd_id not in pod_osd_ids:
+ continue
+
+ metadata = self.get_metadata('osd', "%s" % osd_id)
+ if metadata and metadata['devices'] in targets:
+ found.append(osd_id)
+ else:
+ self.log.info("ignoring osd {0} {1}".format(
+ osd_id, metadata['devices']
+ ))
+
+ return found is not None
+
+ return RookWriteCompletion(execute, is_complete,
+ "Creating OSD on {0}:{1}".format(
+ drive_group.hosts(all_hosts)[0], targets
+ ))