1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
from typing import List, cast
from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec
from ceph.deployment.service_spec import TracingSpec
from mgr_util import build_url
class ElasticSearchService(CephadmService):
TYPE = 'elasticsearch'
DEFAULT_SERVICE_PORT = 9200
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
return daemon_spec
class JaegerAgentService(CephadmService):
TYPE = 'jaeger-agent'
DEFAULT_SERVICE_PORT = 6799
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
collectors = []
deps: List[str] = []
for dd in self.mgr.cache.get_daemons_by_type(JaegerCollectorService.TYPE):
# scrape jaeger-collector nodes
assert dd.hostname is not None
port = dd.ports[0] if dd.ports else JaegerCollectorService.DEFAULT_SERVICE_PORT
url = build_url(host=dd.hostname, port=port).lstrip('/')
collectors.append(url)
deps.append(url)
daemon_spec.final_config = {'collector_nodes': ",".join(collectors)}
daemon_spec.deps = sorted(deps)
return daemon_spec
class JaegerCollectorService(CephadmService):
TYPE = 'jaeger-collector'
DEFAULT_SERVICE_PORT = 14250
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
elasticsearch_nodes = get_elasticsearch_nodes(self, daemon_spec)
daemon_spec.final_config = {'elasticsearch_nodes': ",".join(elasticsearch_nodes)}
return daemon_spec
class JaegerQueryService(CephadmService):
TYPE = 'jaeger-query'
DEFAULT_SERVICE_PORT = 16686
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
elasticsearch_nodes = get_elasticsearch_nodes(self, daemon_spec)
daemon_spec.final_config = {'elasticsearch_nodes': ",".join(elasticsearch_nodes)}
return daemon_spec
def get_elasticsearch_nodes(service: CephadmService, daemon_spec: CephadmDaemonDeploySpec) -> List[str]:
elasticsearch_nodes = []
for dd in service.mgr.cache.get_daemons_by_type(ElasticSearchService.TYPE):
assert dd.hostname is not None
addr = dd.ip if dd.ip else service.mgr.inventory.get_addr(dd.hostname)
port = dd.ports[0] if dd.ports else ElasticSearchService.DEFAULT_SERVICE_PORT
url = build_url(host=addr, port=port).lstrip('/')
elasticsearch_nodes.append(f'http://{url}')
if len(elasticsearch_nodes) == 0:
# takes elasticsearch address from TracingSpec
spec: TracingSpec = cast(
TracingSpec, service.mgr.spec_store.active_specs[daemon_spec.service_name])
assert spec.es_nodes is not None
urls = spec.es_nodes.split(",")
for url in urls:
elasticsearch_nodes.append(f'http://{url}')
return elasticsearch_nodes
|