summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/services/ingress.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/cephadm/services/ingress.py')
-rw-r--r--src/pybind/mgr/cephadm/services/ingress.py381
1 files changed, 381 insertions, 0 deletions
diff --git a/src/pybind/mgr/cephadm/services/ingress.py b/src/pybind/mgr/cephadm/services/ingress.py
new file mode 100644
index 000000000..55be30454
--- /dev/null
+++ b/src/pybind/mgr/cephadm/services/ingress.py
@@ -0,0 +1,381 @@
+import ipaddress
+import logging
+import random
+import string
+from typing import List, Dict, Any, Tuple, cast, Optional
+
+from ceph.deployment.service_spec import ServiceSpec, IngressSpec
+from mgr_util import build_url
+from cephadm import utils
+from orchestrator import OrchestratorError, DaemonDescription
+from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
+
+logger = logging.getLogger(__name__)
+
+
+class IngressService(CephService):
+ TYPE = 'ingress'
+ MAX_KEEPALIVED_PASS_LEN = 8
+
+ def primary_daemon_type(self, spec: Optional[ServiceSpec] = None) -> str:
+ if spec:
+ ispec = cast(IngressSpec, spec)
+ # in keepalive only setups, we are only deploying keepalived,
+ # so that should be marked as the primary daemon type. Otherwise,
+ # we consider haproxy to be the primary.
+ if hasattr(spec, 'keepalive_only') and ispec.keepalive_only:
+ return 'keepalived'
+ return 'haproxy'
+
+ def per_host_daemon_type(self, spec: Optional[ServiceSpec] = None) -> Optional[str]:
+ if spec:
+ ispec = cast(IngressSpec, spec)
+ # if we are using "keepalive_only" mode on this ingress service
+ # we are only deploying keepalived daemons, so there should
+ # only be a primary daemon type and the per host daemon type
+ # should be empty
+ if hasattr(spec, 'keepalive_only') and ispec.keepalive_only:
+ return None
+ return 'keepalived'
+
+ def prepare_create(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ) -> CephadmDaemonDeploySpec:
+ if daemon_spec.daemon_type == 'haproxy':
+ return self.haproxy_prepare_create(daemon_spec)
+ if daemon_spec.daemon_type == 'keepalived':
+ return self.keepalived_prepare_create(daemon_spec)
+ assert False, "unexpected daemon type"
+
+ def generate_config(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec
+ ) -> Tuple[Dict[str, Any], List[str]]:
+ if daemon_spec.daemon_type == 'haproxy':
+ return self.haproxy_generate_config(daemon_spec)
+ else:
+ return self.keepalived_generate_config(daemon_spec)
+ assert False, "unexpected daemon type"
+
+ def haproxy_prepare_create(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ) -> CephadmDaemonDeploySpec:
+ assert daemon_spec.daemon_type == 'haproxy'
+
+ daemon_id = daemon_spec.daemon_id
+ host = daemon_spec.host
+ spec = cast(IngressSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+
+ logger.debug('prepare_create haproxy.%s on host %s with spec %s' % (
+ daemon_id, host, spec))
+
+ daemon_spec.final_config, daemon_spec.deps = self.haproxy_generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def haproxy_generate_config(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ) -> Tuple[Dict[str, Any], List[str]]:
+ spec = cast(IngressSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ assert spec.backend_service
+ if spec.backend_service not in self.mgr.spec_store:
+ raise RuntimeError(
+ f'{spec.service_name()} backend service {spec.backend_service} does not exist')
+ backend_spec = self.mgr.spec_store[spec.backend_service].spec
+ daemons = self.mgr.cache.get_daemons_by_service(spec.backend_service)
+ deps = [d.name() for d in daemons]
+
+ # generate password?
+ pw_key = f'{spec.service_name()}/monitor_password'
+ password = self.mgr.get_store(pw_key)
+ if password is None:
+ if not spec.monitor_password:
+ password = ''.join(random.choice(string.ascii_lowercase)
+ for _ in range(self.MAX_KEEPALIVED_PASS_LEN))
+ self.mgr.set_store(pw_key, password)
+ else:
+ if spec.monitor_password:
+ self.mgr.set_store(pw_key, None)
+ if spec.monitor_password:
+ password = spec.monitor_password
+
+ if backend_spec.service_type == 'nfs':
+ mode = 'tcp'
+ # we need to get the nfs daemon with the highest rank_generation for
+ # each rank we are currently deploying for the haproxy config
+ # for example if we had three (rank, rank_generation) pairs of
+ # (0, 0), (0, 1), (1, 0) we would want the nfs daemon corresponding
+ # to (0, 1) and (1, 0) because those are the two with the highest
+ # rank_generation for the existing ranks (0 and 1, with the highest
+ # rank_generation for rank 0 being 1 and highest rank_generation for
+ # rank 1 being 0)
+ ranked_daemons = [d for d in daemons if (d.rank is not None and d.rank_generation is not None)]
+ by_rank: Dict[int, DaemonDescription] = {}
+ for d in ranked_daemons:
+ # It doesn't seem like mypy can figure out that rank
+ # and rank_generation for both the daemon we're looping on
+ # and all those in by_rank cannot be None due to the filtering
+ # when creating the ranked_daemons list, which is why these
+ # seemingly unnecessary assertions are here.
+ assert d.rank is not None
+ if d.rank not in by_rank:
+ by_rank[d.rank] = d
+ else:
+ same_rank_nfs = by_rank[d.rank]
+ assert d.rank_generation is not None
+ assert same_rank_nfs.rank_generation is not None
+ # if we have multiple of the same rank. take the one
+ # with the highesr rank generation
+ if d.rank_generation > same_rank_nfs.rank_generation:
+ by_rank[d.rank] = d
+ servers = []
+
+ # try to establish how many ranks we *should* have
+ num_ranks = backend_spec.placement.count
+ if not num_ranks:
+ num_ranks = 1 + max(by_rank.keys())
+
+ for rank in range(num_ranks):
+ if rank in by_rank:
+ d = by_rank[rank]
+ assert d.ports
+ servers.append({
+ 'name': f"{spec.backend_service}.{rank}",
+ 'ip': d.ip or utils.resolve_ip(self.mgr.inventory.get_addr(str(d.hostname))),
+ 'port': d.ports[0],
+ })
+ else:
+ # offline/missing server; leave rank in place
+ servers.append({
+ 'name': f"{spec.backend_service}.{rank}",
+ 'ip': '0.0.0.0',
+ 'port': 0,
+ })
+ else:
+ mode = 'http'
+ servers = [
+ {
+ 'name': d.name(),
+ 'ip': d.ip or utils.resolve_ip(self.mgr.inventory.get_addr(str(d.hostname))),
+ 'port': d.ports[0],
+ } for d in daemons if d.ports
+ ]
+
+ host_ip = daemon_spec.ip or self.mgr.inventory.get_addr(daemon_spec.host)
+ server_opts = []
+ if spec.enable_haproxy_protocol:
+ server_opts.append("send-proxy-v2")
+ logger.debug("enabled default server opts: %r", server_opts)
+ ip = '*' if spec.virtual_ips_list else str(spec.virtual_ip).split('/')[0] or daemon_spec.ip or '*'
+ frontend_port = daemon_spec.ports[0] if daemon_spec.ports else spec.frontend_port
+ if ip != '*' and frontend_port:
+ daemon_spec.port_ips = {str(frontend_port): ip}
+ haproxy_conf = self.mgr.template.render(
+ 'services/ingress/haproxy.cfg.j2',
+ {
+ 'spec': spec,
+ 'backend_spec': backend_spec,
+ 'mode': mode,
+ 'servers': servers,
+ 'user': spec.monitor_user or 'admin',
+ 'password': password,
+ 'ip': ip,
+ 'frontend_port': frontend_port,
+ 'monitor_port': daemon_spec.ports[1] if daemon_spec.ports else spec.monitor_port,
+ 'local_host_ip': host_ip,
+ 'default_server_opts': server_opts,
+ }
+ )
+ config_files = {
+ 'files': {
+ "haproxy.cfg": haproxy_conf,
+ }
+ }
+ if spec.ssl_cert:
+ ssl_cert = spec.ssl_cert
+ if isinstance(ssl_cert, list):
+ ssl_cert = '\n'.join(ssl_cert)
+ config_files['files']['haproxy.pem'] = ssl_cert
+
+ return config_files, sorted(deps)
+
+ def keepalived_prepare_create(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ) -> CephadmDaemonDeploySpec:
+ assert daemon_spec.daemon_type == 'keepalived'
+
+ daemon_id = daemon_spec.daemon_id
+ host = daemon_spec.host
+ spec = cast(IngressSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+
+ logger.debug('prepare_create keepalived.%s on host %s with spec %s' % (
+ daemon_id, host, spec))
+
+ daemon_spec.final_config, daemon_spec.deps = self.keepalived_generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def keepalived_generate_config(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ) -> Tuple[Dict[str, Any], List[str]]:
+ spec = cast(IngressSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ assert spec.backend_service
+
+ # generate password?
+ pw_key = f'{spec.service_name()}/keepalived_password'
+ password = self.mgr.get_store(pw_key)
+ if password is None:
+ if not spec.keepalived_password:
+ password = ''.join(random.choice(string.ascii_lowercase)
+ for _ in range(self.MAX_KEEPALIVED_PASS_LEN))
+ self.mgr.set_store(pw_key, password)
+ else:
+ if spec.keepalived_password:
+ self.mgr.set_store(pw_key, None)
+ if spec.keepalived_password:
+ password = spec.keepalived_password
+
+ daemons = self.mgr.cache.get_daemons_by_service(spec.service_name())
+
+ if not daemons and not spec.keepalive_only:
+ raise OrchestratorError(
+ f'Failed to generate keepalived.conf: No daemons deployed for {spec.service_name()}')
+
+ deps = sorted([d.name() for d in daemons if d.daemon_type == 'haproxy'])
+
+ host = daemon_spec.host
+ hosts = sorted(list(set([host] + [str(d.hostname) for d in daemons])))
+
+ def _get_valid_interface_and_ip(vip: str, host: str) -> Tuple[str, str]:
+ # interface
+ bare_ip = ipaddress.ip_interface(vip).ip
+ host_ip = ''
+ interface = None
+ for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
+ if ifaces and ipaddress.ip_address(bare_ip) in ipaddress.ip_network(subnet):
+ interface = list(ifaces.keys())[0]
+ host_ip = ifaces[interface][0]
+ logger.info(
+ f'{bare_ip} is in {subnet} on {host} interface {interface}'
+ )
+ break
+ # try to find interface by matching spec.virtual_interface_networks
+ if not interface and spec.virtual_interface_networks:
+ for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
+ if subnet in spec.virtual_interface_networks:
+ interface = list(ifaces.keys())[0]
+ host_ip = ifaces[interface][0]
+ logger.info(
+ f'{spec.virtual_ip} will be configured on {host} interface '
+ f'{interface} (which is in subnet {subnet})'
+ )
+ break
+ if not interface:
+ raise OrchestratorError(
+ f"Unable to identify interface for {spec.virtual_ip} on {host}"
+ )
+ return interface, host_ip
+
+ # script to monitor health
+ script = '/usr/bin/false'
+ for d in daemons:
+ if d.hostname == host:
+ if d.daemon_type == 'haproxy':
+ assert d.ports
+ port = d.ports[1] # monitoring port
+ host_ip = d.ip or self.mgr.inventory.get_addr(d.hostname)
+ script = f'/usr/bin/curl {build_url(scheme="http", host=host_ip, port=port)}/health'
+ assert script
+
+ states = []
+ priorities = []
+ virtual_ips = []
+
+ # Set state and priority. Have one master for each VIP. Or at least the first one as master if only one VIP.
+ if spec.virtual_ip:
+ virtual_ips.append(spec.virtual_ip)
+ if hosts[0] == host:
+ states.append('MASTER')
+ priorities.append(100)
+ else:
+ states.append('BACKUP')
+ priorities.append(90)
+
+ elif spec.virtual_ips_list:
+ virtual_ips = spec.virtual_ips_list
+ if len(virtual_ips) > len(hosts):
+ raise OrchestratorError(
+ "Number of virtual IPs for ingress is greater than number of available hosts"
+ )
+ for x in range(len(virtual_ips)):
+ if hosts[x] == host:
+ states.append('MASTER')
+ priorities.append(100)
+ else:
+ states.append('BACKUP')
+ priorities.append(90)
+
+ # remove host, daemon is being deployed on from hosts list for
+ # other_ips in conf file and converter to ips
+ if host in hosts:
+ hosts.remove(host)
+ host_ips: List[str] = []
+ other_ips: List[List[str]] = []
+ interfaces: List[str] = []
+ for vip in virtual_ips:
+ interface, ip = _get_valid_interface_and_ip(vip, host)
+ host_ips.append(ip)
+ interfaces.append(interface)
+ ips: List[str] = []
+ for h in hosts:
+ _, ip = _get_valid_interface_and_ip(vip, h)
+ ips.append(ip)
+ other_ips.append(ips)
+
+ # Use interface as vrrp_interface for vrrp traffic if vrrp_interface_network not set on the spec
+ vrrp_interfaces: List[str] = []
+ if not spec.vrrp_interface_network:
+ vrrp_interfaces = interfaces
+ else:
+ for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
+ if subnet == spec.vrrp_interface_network:
+ vrrp_interface = [list(ifaces.keys())[0]] * len(interfaces)
+ logger.info(
+ f'vrrp will be configured on {host} interface '
+ f'{vrrp_interface} (which is in subnet {subnet})'
+ )
+ break
+ else:
+ raise OrchestratorError(
+ f"Unable to identify vrrp interface for {spec.vrrp_interface_network} on {host}"
+ )
+
+ keepalived_conf = self.mgr.template.render(
+ 'services/ingress/keepalived.conf.j2',
+ {
+ 'spec': spec,
+ 'script': script,
+ 'password': password,
+ 'interfaces': interfaces,
+ 'vrrp_interfaces': vrrp_interfaces,
+ 'virtual_ips': virtual_ips,
+ 'first_virtual_router_id': spec.first_virtual_router_id,
+ 'states': states,
+ 'priorities': priorities,
+ 'other_ips': other_ips,
+ 'host_ips': host_ips,
+ }
+ )
+
+ config_file = {
+ 'files': {
+ "keepalived.conf": keepalived_conf,
+ }
+ }
+
+ return config_file, deps