summaryrefslogtreecommitdiffstats
path: root/src/cephadm
diff options
context:
space:
mode:
Diffstat (limited to '')
-rwxr-xr-xsrc/cephadm/cephadm.py263
-rw-r--r--src/cephadm/tests/test_agent.py2
-rw-r--r--src/cephadm/tests/test_nfs.py1
-rw-r--r--src/cephadm/tox.ini2
4 files changed, 230 insertions, 38 deletions
diff --git a/src/cephadm/cephadm.py b/src/cephadm/cephadm.py
index bcb82c4c4..c71810640 100755
--- a/src/cephadm/cephadm.py
+++ b/src/cephadm/cephadm.py
@@ -37,9 +37,10 @@ from functools import wraps
from glob import glob
from io import StringIO
from threading import Thread, Event
+from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.request import urlopen, Request
-from pathlib import Path
+
FuncT = TypeVar('FuncT', bound=Callable)
@@ -55,7 +56,7 @@ DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0'
DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7'
DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3'
DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4'
-DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.1'
+DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:1.0.0'
DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23'
DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29'
@@ -136,6 +137,33 @@ async def concurrent_tasks(func: Callable, cmd_list: List[str]) -> List[Any]:
return data
+def http_query(addr: str = '',
+ port: str = '',
+ data: Optional[bytes] = None,
+ endpoint: str = '',
+ ssl_ctx: Optional[Any] = None,
+ timeout: Optional[int] = 10) -> Tuple[int, str]:
+
+ url = f'https://{addr}:{port}{endpoint}'
+ logger.debug(f'sending query to {url}')
+ try:
+ req = Request(url, data, {'Content-Type': 'application/json'})
+ with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
+ response_str = response.read()
+ response_status = response.status
+ except HTTPError as e:
+ logger.debug(f'{e.code} {e.reason}')
+ response_status = e.code
+ response_str = e.reason
+ except URLError as e:
+ logger.debug(f'{e.reason}')
+ response_status = -1
+ response_str = e.reason
+ except Exception:
+ raise
+ return (response_status, response_str)
+
+
class EndPoint:
"""EndPoint representing an ip:port format"""
@@ -672,6 +700,100 @@ class Monitoring(object):
##################################
+class NodeProxy(object):
+ """Defines a node-proxy container"""
+
+ daemon_type = 'node-proxy'
+ # TODO: update this if we make node-proxy an executable
+ entrypoint = '/usr/sbin/ceph-node-proxy'
+ required_files = ['node-proxy.json']
+
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return cls.daemon_type == daemon_type
+
+ def __init__(self,
+ ctx: CephadmContext,
+ fsid: str,
+ daemon_id: Union[str, int],
+ config_json: Dict[Any, Any],
+ image: str = DEFAULT_IMAGE) -> None:
+ self.ctx = ctx
+ self.fsid = fsid
+ self.daemon_id = daemon_id
+ self.image = image
+
+ config = dict_get(config_json, 'node-proxy.json', {})
+ self.files = {'node-proxy.json': config}
+
+ # validate the supplied args
+ self.validate()
+
+ @classmethod
+ def init(cls, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str]) -> 'NodeProxy':
+ return cls(ctx, fsid, daemon_id,
+ fetch_configs(ctx), ctx.image)
+
+ @staticmethod
+ def get_container_mounts(data_dir, log_dir):
+ # type: (str, str) -> Dict[str, str]
+ mounts = dict()
+ mounts[os.path.join(data_dir, 'node-proxy.json')] = '/usr/share/ceph/node-proxy.json:z'
+ return mounts
+
+ def get_daemon_args(self) -> List[str]:
+ # TODO: this corresponds with the mount location of
+ # the config in _get_container_mounts above. They
+ # will both need to be updated when we have a proper
+ # location in the container for node-proxy
+ return ['--config', '/usr/share/ceph/node-proxy.json']
+
+ def validate(self):
+ # type: () -> None
+ if not is_fsid(self.fsid):
+ raise Error('not an fsid: %s' % self.fsid)
+ if not self.daemon_id:
+ raise Error('invalid daemon_id: %s' % self.daemon_id)
+ if not self.image:
+ raise Error('invalid image: %s' % self.image)
+ # check for the required files
+ if self.required_files:
+ for fname in self.required_files:
+ if fname not in self.files:
+ raise Error(
+ 'required file missing from config-json: %s' % fname
+ )
+
+ def get_daemon_name(self):
+ # type: () -> str
+ return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+ def get_container_name(self, desc=None):
+ # type: (Optional[str]) -> str
+ cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name())
+ if desc:
+ cname = '%s-%s' % (cname, desc)
+ return cname
+
+ def create_daemon_dirs(self, data_dir, uid, gid):
+ # type: (str, int, int) -> None
+ """Create files under the container data dir"""
+ if not os.path.isdir(data_dir):
+ raise OSError('data_dir is not a directory: %s' % (data_dir))
+
+ logger.info('Writing node-proxy config...')
+ # populate files from the config-json
+ populate_files(data_dir, self.files, uid, gid)
+
+ def config_and_keyring(
+ self, ctx: CephadmContext
+ ) -> Tuple[Optional[str], Optional[str]]:
+ return get_config_and_keyring(ctx)
+
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+ return extract_uid_gid(ctx)
+
+
@contextmanager
def write_new(
destination: Union[str, Path],
@@ -726,7 +848,7 @@ class NFSGanesha(object):
entrypoint = '/usr/bin/ganesha.nfsd'
daemon_args = ['-F', '-L', 'STDERR']
- required_files = ['ganesha.conf']
+ required_files = ['ganesha.conf', 'idmap.conf']
port_map = {
'nfs': 2049,
@@ -1072,7 +1194,7 @@ class CephNvmeof(object):
fetch_configs(ctx), ctx.image)
@staticmethod
- def get_container_mounts(data_dir: str) -> Dict[str, str]:
+ def get_container_mounts(data_dir: str, log_dir: str) -> Dict[str, str]:
mounts = dict()
mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z'
mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
@@ -1080,6 +1202,7 @@ class CephNvmeof(object):
mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config'
mounts['/dev/hugepages'] = '/dev/hugepages'
mounts['/dev/vfio/vfio'] = '/dev/vfio/vfio'
+ mounts[log_dir] = '/var/log/ceph:z'
return mounts
@staticmethod
@@ -1098,7 +1221,7 @@ class CephNvmeof(object):
out, err, ret = call(ctx,
[ctx.container_engine.path, 'inspect',
'--format', '{{index .Config.Labels "io.ceph.version"}}',
- ctx.image])
+ container_id])
version = None
if ret == 0:
version = out.strip()
@@ -1217,6 +1340,13 @@ class CephExporter(object):
]
return args
+ def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
+ if not os.path.exists(self.sock_dir):
+ os.mkdir(self.sock_dir)
+ # part of validation is for the sock dir, so we postpone
+ # it until now
+ self.validate()
+
def validate(self) -> None:
if not os.path.isdir(self.sock_dir):
raise Error(f'Directory does not exist. Got: {self.sock_dir}')
@@ -1620,6 +1750,7 @@ def get_supported_daemons():
supported_daemons.append(CephadmAgent.daemon_type)
supported_daemons.append(SNMPGateway.daemon_type)
supported_daemons.extend(Tracing.components)
+ supported_daemons.append(NodeProxy.daemon_type)
assert len(supported_daemons) == len(set(supported_daemons))
return supported_daemons
@@ -2998,9 +3129,11 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id):
ip = meta['ip']
if 'ports' in meta and meta['ports']:
port = meta['ports'][0]
- r += [f'--web.listen-address={ip}:{port}']
if daemon_type == 'prometheus':
config = fetch_configs(ctx)
+ ip_to_bind_to = config.get('ip_to_bind_to', '')
+ if ip_to_bind_to:
+ ip = ip_to_bind_to
retention_time = config.get('retention_time', '15d')
retention_size = config.get('retention_size', '0') # default to disabled
r += [f'--storage.tsdb.retention.time={retention_time}']
@@ -3015,6 +3148,7 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id):
addr = next(iter(ipv4_addrs or ipv6_addrs), None)
host = wrap_ipv6(addr) if addr else host
r += [f'--web.external-url={scheme}://{host}:{port}']
+ r += [f'--web.listen-address={ip}:{port}']
if daemon_type == 'alertmanager':
config = fetch_configs(ctx)
peers = config.get('peers', list()) # type: ignore
@@ -3060,6 +3194,9 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id):
elif daemon_type == SNMPGateway.daemon_type:
sc = SNMPGateway.init(ctx, fsid, daemon_id)
r.extend(sc.get_daemon_args())
+ elif daemon_type == NodeProxy.daemon_type:
+ node_proxy = NodeProxy.init(ctx, fsid, daemon_id)
+ r.extend(node_proxy.get_daemon_args())
return r
@@ -3173,6 +3310,14 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
sg = SNMPGateway.init(ctx, fsid, daemon_id)
sg.create_daemon_conf()
+ elif daemon_type == NodeProxy.daemon_type:
+ node_proxy = NodeProxy.init(ctx, fsid, daemon_id)
+ node_proxy.create_daemon_dirs(data_dir, uid, gid)
+
+ elif daemon_type == CephExporter.daemon_type:
+ ceph_exporter = CephExporter.init(ctx, fsid, daemon_id)
+ ceph_exporter.create_daemon_dirs(data_dir, uid, gid)
+
_write_custom_conf_files(ctx, daemon_type, str(daemon_id), fsid, uid, gid)
@@ -3480,7 +3625,8 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id,
if daemon_type == CephNvmeof.daemon_type:
assert daemon_id
data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
- mounts.update(CephNvmeof.get_container_mounts(data_dir))
+ log_dir = get_log_dir(fsid, ctx.log_dir)
+ mounts.update(CephNvmeof.get_container_mounts(data_dir, log_dir))
if daemon_type == CephIscsi.daemon_type:
assert daemon_id
@@ -3503,6 +3649,12 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id,
data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
mounts.update(cc.get_container_mounts(data_dir))
+ if daemon_type == NodeProxy.daemon_type:
+ assert daemon_id
+ data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
+ log_dir = get_log_dir(fsid, ctx.log_dir)
+ mounts.update(NodeProxy.get_container_mounts(data_dir, log_dir))
+
# Modifications podman makes to /etc/hosts causes issues with
# certain daemons (specifically referencing "host.containers.internal" entry
# being added to /etc/hosts in this case). To avoid that, but still
@@ -3633,6 +3785,9 @@ def get_container(ctx: CephadmContext,
host_network = False
envs.extend(cc.get_container_envs())
container_args.extend(cc.get_container_args())
+ elif daemon_type == NodeProxy.daemon_type:
+ entrypoint = NodeProxy.entrypoint
+ name = '%s.%s' % (daemon_type, daemon_id)
if daemon_type in Monitoring.components:
uid, gid = extract_uid_gid_monitoring(ctx, daemon_type)
@@ -4718,12 +4873,13 @@ class MgrListener(Thread):
conn.send(err_str.encode())
logger.error(err_str)
else:
- conn.send(b'ACK')
- if 'config' in data:
- self.agent.wakeup()
- self.agent.ls_gatherer.wakeup()
- self.agent.volume_gatherer.wakeup()
- logger.debug(f'Got mgr message {data}')
+ if 'counter' in data:
+ conn.send(b'ACK')
+ if 'config' in data:
+ self.agent.wakeup()
+ self.agent.ls_gatherer.wakeup()
+ self.agent.volume_gatherer.wakeup()
+ logger.debug(f'Got mgr message {data}')
except Exception as e:
logger.error(f'Mgr Listener encountered exception: {e}')
@@ -4731,17 +4887,20 @@ class MgrListener(Thread):
self.stop = True
def handle_json_payload(self, data: Dict[Any, Any]) -> None:
- self.agent.ack = int(data['counter'])
- if 'config' in data:
- logger.info('Received new config from mgr')
- config = data['config']
- for filename in config:
- if filename in self.agent.required_files:
- file_path = os.path.join(self.agent.daemon_dir, filename)
- with write_new(file_path) as f:
- f.write(config[filename])
- self.agent.pull_conf_settings()
- self.agent.wakeup()
+ if 'counter' in data:
+ self.agent.ack = int(data['counter'])
+ if 'config' in data:
+ logger.info('Received new config from mgr')
+ config = data['config']
+ for filename in config:
+ if filename in self.agent.required_files:
+ file_path = os.path.join(self.agent.daemon_dir, filename)
+ with write_new(file_path) as f:
+ f.write(config[filename])
+ self.agent.pull_conf_settings()
+ self.agent.wakeup()
+ else:
+ raise RuntimeError('No valid data received.')
class CephadmAgent():
@@ -4783,6 +4942,9 @@ class CephadmAgent():
self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
self.recent_iteration_index: int = 0
self.cached_ls_values: Dict[str, Dict[str, str]] = {}
+ self.ssl_ctx = ssl.create_default_context()
+ self.ssl_ctx.check_hostname = True
+ self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
def validate(self, config: Dict[str, str] = {}) -> None:
# check for the required files
@@ -4894,6 +5056,7 @@ WantedBy=ceph-{fsid}.target
def run(self) -> None:
self.pull_conf_settings()
+ self.ssl_ctx.load_verify_locations(self.ca_path)
try:
for _ in range(1001):
@@ -4915,11 +5078,6 @@ WantedBy=ceph-{fsid}.target
if not self.volume_gatherer.is_alive():
self.volume_gatherer.start()
- ssl_ctx = ssl.create_default_context()
- ssl_ctx.check_hostname = True
- ssl_ctx.verify_mode = ssl.CERT_REQUIRED
- ssl_ctx.load_verify_locations(self.ca_path)
-
while not self.stop:
start_time = time.monotonic()
ack = self.ack
@@ -4945,15 +5103,19 @@ WantedBy=ceph-{fsid}.target
'port': self.listener_port})
data = data.encode('ascii')
- url = f'https://{self.target_ip}:{self.target_port}/data/'
try:
- req = Request(url, data, {'Content-Type': 'application/json'})
send_time = time.monotonic()
- with urlopen(req, context=ssl_ctx) as response:
- response_str = response.read()
- response_json = json.loads(response_str)
- total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds()
- logger.info(f'Received mgr response: "{response_json["result"]}" {total_request_time} seconds after sending request.')
+ status, response = http_query(addr=self.target_ip,
+ port=self.target_port,
+ data=data,
+ endpoint='/data',
+ ssl_ctx=self.ssl_ctx)
+ response_json = json.loads(response)
+ if status != 200:
+ logger.error(f'HTTP error {status} while querying agent endpoint: {response}')
+ raise RuntimeError
+ total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds()
+ logger.info(f'Received mgr response: "{response_json["result"]}" {total_request_time} seconds after sending request.')
except Exception as e:
logger.error(f'Failed to send metadata to mgr: {e}')
@@ -6441,6 +6603,10 @@ def command_bootstrap(ctx):
'For more information see:\n\n'
'\thttps://docs.ceph.com/en/latest/mgr/telemetry/\n')
logger.info('Bootstrap complete.')
+
+ if getattr(ctx, 'deploy_cephadm_agent', None):
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/use_agent', 'true'])
+
return ctx.error_code
##################################
@@ -6758,6 +6924,15 @@ def _dispatch_deploy(
deployment_type=deployment_type,
endpoints=daemon_endpoints)
+ elif daemon_type == NodeProxy.daemon_type:
+ config, keyring = get_config_and_keyring(ctx)
+ uid, gid = extract_uid_gid(ctx)
+ c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
+ deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
+ config=config, keyring=keyring,
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
+
elif daemon_type == CephadmAgent.daemon_type:
# get current user gid and uid
uid = os.getuid()
@@ -8059,6 +8234,13 @@ def _rm_cluster(ctx: CephadmContext, keep_logs: bool, zap_osds: bool) -> None:
for fname in glob(f'{ctx.log_dir}/cephadm.log*'):
os.remove(fname)
+ try:
+ Path('/etc/ceph/podman-auth.json').unlink()
+ except FileNotFoundError:
+ pass
+ except Exception as e:
+ logger.debug(f'Could not remove podman-auth.json: {e}')
+
# rm sysctl settings
sysctl_dirs: List[Path] = [Path(ctx.sysctl_dir), Path('/usr/lib/sysctl.d')]
@@ -8097,6 +8279,7 @@ def check_time_sync(ctx, enabler=None):
'ntp.service', # 18.04 (at least)
'ntpsec.service', # 20.04 (at least) / buster
'openntpd.service', # ubuntu / debian
+ 'timemaster.service', # linuxptp on ubuntu/debian
]
if not check_units(ctx, units, enabler):
logger.warning('No time sync service is running; checked for %s' % units)
@@ -10008,7 +10191,9 @@ def _get_parser():
parser_version.set_defaults(func=command_version)
parser_pull = subparsers.add_parser(
- 'pull', help='pull the default container image')
+ 'pull',
+ help='pull a ceph container image (will pull the default image if --image not provided)',
+ usage='cephadm pull (for default image) | cephadm --image <image-name> pull (for custom ceph image)')
parser_pull.set_defaults(func=command_pull)
parser_pull.add_argument(
'--insecure',
@@ -10440,6 +10625,10 @@ def _get_parser():
'--log-to-file',
action='store_true',
help='configure cluster to log to traditional log files in /var/log/ceph/$fsid')
+ parser_bootstrap.add_argument(
+ '--deploy-cephadm-agent',
+ action='store_true',
+ help='deploy the cephadm-agent')
parser_deploy = subparsers.add_parser(
'deploy', help='deploy a daemon')
diff --git a/src/cephadm/tests/test_agent.py b/src/cephadm/tests/test_agent.py
index f9cf201e2..182c1f661 100644
--- a/src/cephadm/tests/test_agent.py
+++ b/src/cephadm/tests/test_agent.py
@@ -527,7 +527,7 @@ def test_agent_run(_pull_conf_settings, _port_in_use, _gatherer_start,
'port': str(open_listener_port)
}
_RQ_init.assert_called_with(
- f'https://{target_ip}:{target_port}/data/',
+ f'https://{target_ip}:{target_port}/data',
json.dumps(expected_data).encode('ascii'),
{'Content-Type': 'application/json'}
)
diff --git a/src/cephadm/tests/test_nfs.py b/src/cephadm/tests/test_nfs.py
index 0649ef934..c9893b32d 100644
--- a/src/cephadm/tests/test_nfs.py
+++ b/src/cephadm/tests/test_nfs.py
@@ -25,6 +25,7 @@ def nfs_json(**kwargs):
if kwargs.get("files"):
result["files"] = {
"ganesha.conf": "",
+ "idmap.conf": "",
}
if kwargs.get("rgw_content"):
result["rgw"] = dict(kwargs["rgw_content"])
diff --git a/src/cephadm/tox.ini b/src/cephadm/tox.ini
index 2cbfca70f..a6d7f3958 100644
--- a/src/cephadm/tox.ini
+++ b/src/cephadm/tox.ini
@@ -37,12 +37,14 @@ deps =
pyfakefs >= 5, < 6 ; python_version >= "3.7"
mock
pytest
+ pyyaml
commands=pytest {posargs}
[testenv:mypy]
basepython = python3
deps =
mypy
+ types-PyYAML
-c{toxinidir}/../mypy-constrains.txt
commands = mypy --config-file ../mypy.ini {posargs:cephadm.py}