diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/exporter | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/exporter')
-rw-r--r-- | src/exporter/CMakeLists.txt | 10 | ||||
-rw-r--r-- | src/exporter/DaemonMetricCollector.cc | 457 | ||||
-rw-r--r-- | src/exporter/DaemonMetricCollector.h | 104 | ||||
-rw-r--r-- | src/exporter/ceph_exporter.cc | 65 | ||||
-rw-r--r-- | src/exporter/http_server.cc | 169 | ||||
-rw-r--r-- | src/exporter/http_server.h | 5 | ||||
-rw-r--r-- | src/exporter/util.cc | 69 | ||||
-rw-r--r-- | src/exporter/util.h | 24 |
8 files changed, 903 insertions, 0 deletions
diff --git a/src/exporter/CMakeLists.txt b/src/exporter/CMakeLists.txt new file mode 100644 index 000000000..0c0c03bf9 --- /dev/null +++ b/src/exporter/CMakeLists.txt @@ -0,0 +1,10 @@ +set(exporter_srcs + ceph_exporter.cc + DaemonMetricCollector.cc + http_server.cc + util.cc + ) +add_executable(ceph-exporter ${exporter_srcs}) +target_link_libraries(ceph-exporter + global-static ceph-common) +install(TARGETS ceph-exporter DESTINATION bin) diff --git a/src/exporter/DaemonMetricCollector.cc b/src/exporter/DaemonMetricCollector.cc new file mode 100644 index 000000000..ebe85c304 --- /dev/null +++ b/src/exporter/DaemonMetricCollector.cc @@ -0,0 +1,457 @@ +#include "DaemonMetricCollector.h" + +#include <boost/json/src.hpp> +#include <chrono> +#include <filesystem> +#include <iostream> +#include <map> +#include <memory> +#include <regex> +#include <sstream> +#include <string> +#include <utility> + +#include "common/admin_socket_client.h" +#include "common/debug.h" +#include "common/hostname.h" +#include "common/perf_counters.h" +#include "common/split.h" +#include "global/global_context.h" +#include "global/global_init.h" +#include "include/common_fwd.h" +#include "util.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_ceph_exporter + +using json_object = boost::json::object; +using json_value = boost::json::value; +using json_array = boost::json::array; + +void DaemonMetricCollector::request_loop(boost::asio::steady_timer &timer) { + timer.async_wait([&](const boost::system::error_code &e) { + std::cerr << e << std::endl; + update_sockets(); + dump_asok_metrics(); + auto stats_period = g_conf().get_val<int64_t>("exporter_stats_period"); + // time to wait before sending requests again + timer.expires_from_now(std::chrono::seconds(stats_period)); + request_loop(timer); + }); +} + +void DaemonMetricCollector::main() { + // time to wait before sending requests again + + boost::asio::io_service io; + boost::asio::steady_timer timer{io, std::chrono::seconds(0)}; + request_loop(timer); + io.run(); +} + +std::string DaemonMetricCollector::get_metrics() { + const std::lock_guard<std::mutex> lock(metrics_mutex); + return metrics; +} + +template <class T> +void add_metric(std::unique_ptr<MetricsBuilder> &builder, T value, + std::string name, std::string description, std::string mtype, + labels_t labels) { + builder->add(std::to_string(value), name, description, mtype, labels); +} + +void add_double_or_int_metric(std::unique_ptr<MetricsBuilder> &builder, + json_value value, std::string name, + std::string description, std::string mtype, + labels_t labels) { + if (value.is_int64()) { + int64_t v = value.as_int64(); + add_metric(builder, v, name, description, mtype, labels); + } else if (value.is_double()) { + double v = value.as_double(); + add_metric(builder, v, name, description, mtype, labels); + } +} + +std::string boost_string_to_std(boost::json::string js) { + std::string res(js.data()); + return res; +} + +std::string quote(std::string value) { return "\"" + value + "\""; } + +void DaemonMetricCollector::dump_asok_metrics() { + BlockTimer timer(__FILE__, __FUNCTION__); + + std::vector<std::pair<std::string, int>> daemon_pids; + + int failures = 0; + bool sort = g_conf().get_val<bool>("exporter_sort_metrics"); + if (sort) { + builder = + std::unique_ptr<OrderedMetricsBuilder>(new OrderedMetricsBuilder()); + } else { + builder = + std::unique_ptr<UnorderedMetricsBuilder>(new UnorderedMetricsBuilder()); + } + auto prio_limit = g_conf().get_val<int64_t>("exporter_prio_limit"); + for (auto &[daemon_name, sock_client] : clients) { + bool ok; + sock_client.ping(&ok); + if (!ok) { + failures++; + continue; + } + std::string counter_dump_response = + asok_request(sock_client, "counter dump", daemon_name); + if (counter_dump_response.size() == 0) { + failures++; + continue; + } + std::string counter_schema_response = + asok_request(sock_client, "counter schema", daemon_name); + if (counter_schema_response.size() == 0) { + failures++; + continue; + } + + json_object counter_dump = boost::json::parse(counter_dump_response).as_object(); + json_object counter_schema = boost::json::parse(counter_schema_response).as_object(); + + for (auto &perf_group_item : counter_schema) { + std::string perf_group = {perf_group_item.key().begin(), + perf_group_item.key().end()}; + json_array perf_group_schema_array = perf_group_item.value().as_array(); + json_array perf_group_dump_array = counter_dump[perf_group].as_array(); + for (auto schema_itr = perf_group_schema_array.begin(), + dump_itr = perf_group_dump_array.begin(); + schema_itr != perf_group_schema_array.end() && + dump_itr != perf_group_dump_array.end(); + ++schema_itr, ++dump_itr) { + auto counters = schema_itr->at("counters").as_object(); + auto counters_labels = schema_itr->at("labels").as_object(); + auto counters_values = dump_itr->at("counters").as_object(); + labels_t labels; + + for (auto &label: counters_labels) { + std::string label_key = {label.key().begin(), label.key().end()}; + labels[label_key] = quote(label.value().as_string().c_str()); + } + for (auto &counter : counters) { + json_object counter_group = counter.value().as_object(); + if (counter_group["priority"].as_int64() < prio_limit) { + continue; + } + std::string counter_name_init = {counter.key().begin(), counter.key().end()}; + std::string counter_name = perf_group + "_" + counter_name_init; + promethize(counter_name); + + auto extra_labels = get_extra_labels(daemon_name); + if (extra_labels.empty()) { + dout(1) << "Unable to parse instance_id from daemon_name: " << daemon_name << dendl; + continue; + } + labels.insert(extra_labels.begin(), extra_labels.end()); + + // For now this is only required for rgw multi-site metrics + auto multisite_labels_and_name = add_fixed_name_metrics(counter_name); + if (!multisite_labels_and_name.first.empty()) { + labels.insert(multisite_labels_and_name.first.begin(), multisite_labels_and_name.first.end()); + counter_name = multisite_labels_and_name.second; + } + auto perf_values = counters_values.at(counter_name_init); + dump_asok_metric(counter_group, perf_values, counter_name, labels); + } + } + } + std::string config_show = + asok_request(sock_client, "config show", daemon_name); + if (config_show.size() == 0) { + failures++; + continue; + } + json_object pid_file_json = boost::json::parse(config_show).as_object(); + std::string pid_path = + boost_string_to_std(pid_file_json["pid_file"].as_string()); + std::string pid_str = read_file_to_string(pid_path); + if (!pid_path.size()) { + dout(1) << "pid path is empty; process metrics won't be fetched for: " + << daemon_name << dendl; + } + if (!pid_str.empty()) { + daemon_pids.push_back({daemon_name, std::stoi(pid_str)}); + } + } + dout(10) << "Perf counters retrieved for " << clients.size() - failures << "/" + << clients.size() << " daemons." << dendl; + // get time spent on this function + timer.stop(); + std::string scrap_desc( + "Time spent scraping and transforming perf counters to metrics"); + labels_t scrap_labels; + scrap_labels["host"] = quote(ceph_get_hostname()); + scrap_labels["function"] = quote(__FUNCTION__); + add_metric(builder, timer.get_ms(), "ceph_exporter_scrape_time", scrap_desc, + "gauge", scrap_labels); + + const std::lock_guard<std::mutex> lock(metrics_mutex); + // only get metrics if there's pid path for some or all daemons isn't empty + if (daemon_pids.size() != 0) { + get_process_metrics(daemon_pids); + } + metrics = builder->dump(); +} + +std::vector<std::string> read_proc_stat_file(std::string path) { + std::string stat = read_file_to_string(path); + std::vector<std::string> strings; + auto parts = ceph::split(stat); + strings.assign(parts.begin(), parts.end()); + return strings; +} + +struct pstat read_pid_stat(int pid) { + std::string stat_path("/proc/" + std::to_string(pid) + "/stat"); + std::vector<std::string> stats = read_proc_stat_file(stat_path); + struct pstat stat; + stat.minflt = std::stoul(stats[9]); + stat.majflt = std::stoul(stats[11]); + stat.utime = std::stoul(stats[13]); + stat.stime = std::stoul(stats[14]); + stat.num_threads = std::stoul(stats[19]); + stat.start_time = std::stoul(stats[21]); + stat.vm_size = std::stoul(stats[22]); + stat.resident_size = std::stoi(stats[23]); + return stat; +} + +void DaemonMetricCollector::get_process_metrics( + std::vector<std::pair<std::string, int>> daemon_pids) { + std::string path("/proc"); + std::stringstream ss; + for (auto &[daemon_name, pid] : daemon_pids) { + std::vector<std::string> uptimes = read_proc_stat_file("/proc/uptime"); + struct pstat stat = read_pid_stat(pid); + int clk_tck = sysconf(_SC_CLK_TCK); + double start_time_seconds = stat.start_time / (double)clk_tck; + double user_time = stat.utime / (double)clk_tck; + double kernel_time = stat.stime / (double)clk_tck; + double total_time_seconds = user_time + kernel_time; + double uptime = std::stod(uptimes[0]); + double elapsed_time = uptime - start_time_seconds; + double idle_time = elapsed_time - total_time_seconds; + double usage = total_time_seconds * 100 / elapsed_time; + + labels_t labels; + labels["ceph_daemon"] = quote(daemon_name); + add_metric(builder, stat.minflt, "ceph_exporter_minflt_total", + "Number of minor page faults of daemon", "counter", labels); + add_metric(builder, stat.majflt, "ceph_exporter_majflt_total", + "Number of major page faults of daemon", "counter", labels); + add_metric(builder, stat.num_threads, "ceph_exporter_num_threads", + "Number of threads used by daemon", "gauge", labels); + add_metric(builder, usage, "ceph_exporter_cpu_usage", + "CPU usage of a daemon", "gauge", labels); + + std::string cpu_time_desc = "Process time in kernel/user/idle mode"; + labels_t cpu_total_labels; + cpu_total_labels["ceph_daemon"] = quote(daemon_name); + cpu_total_labels["mode"] = quote("kernel"); + add_metric(builder, kernel_time, "ceph_exporter_cpu_total", cpu_time_desc, + "counter", cpu_total_labels); + cpu_total_labels["mode"] = quote("user"); + add_metric(builder, user_time, "ceph_exporter_cpu_total", cpu_time_desc, + "counter", cpu_total_labels); + cpu_total_labels["mode"] = quote("idle"); + add_metric(builder, idle_time, "ceph_exporter_cpu_total", cpu_time_desc, + "counter", cpu_total_labels); + add_metric(builder, stat.vm_size, "ceph_exporter_vm_size", + "Virtual memory used in a daemon", "gauge", labels); + add_metric(builder, stat.resident_size, "ceph_exporter_resident_size", + "Resident memory in a daemon", "gauge", labels); + } +} + +std::string DaemonMetricCollector::asok_request(AdminSocketClient &asok, + std::string command, + std::string daemon_name) { + std::string request("{\"prefix\": \"" + command + "\"}"); + std::string response; + std::string err = asok.do_request(request, &response); + if (err.length() > 0 || response.substr(0, 5) == "ERROR") { + dout(1) << "command " << command << "failed for daemon " << daemon_name + << "with error: " << err << dendl; + return ""; + } + return response; +} + +labels_t DaemonMetricCollector::get_extra_labels(std::string daemon_name) { + labels_t labels; + const std::string ceph_daemon_prefix = "ceph-"; + const std::string ceph_client_prefix = "client."; + if (daemon_name.rfind(ceph_daemon_prefix, 0) == 0) { + daemon_name = daemon_name.substr(ceph_daemon_prefix.size()); + } + if (daemon_name.rfind(ceph_client_prefix, 0) == 0) { + daemon_name = daemon_name.substr(ceph_client_prefix.size()); + } + // In vstart cluster socket files for rgw are stored as radosgw.<instance_id>.asok + if (daemon_name.find("radosgw") != std::string::npos) { + std::size_t pos = daemon_name.find_last_of('.'); + std::string tmp = daemon_name.substr(pos+1); + labels["instance_id"] = quote(tmp); + } + else if (daemon_name.find("rgw") != std::string::npos) { + // fetch intance_id for e.g. "hrgsea" from daemon_name=rgw.foo.ceph-node-00.hrgsea.2.94739968030880 + std::vector<std::string> elems; + std::stringstream ss; + ss.str(daemon_name); + std::string item; + while (std::getline(ss, item, '.')) { + elems.push_back(item); + } + if (elems.size() >= 4) { + labels["instance_id"] = quote(elems[3]); + } else { + return labels_t(); + } + } else { + labels.insert({"ceph_daemon", quote(daemon_name)}); + } + return labels; +} + +// Add fixed name metrics from existing ones that have details in their names +// that should be in labels (not in name). For backward compatibility, +// a new fixed name metric is created (instead of replacing)and details are put +// in new labels. Intended for RGW sync perf. counters but extendable as required. +// See: https://tracker.ceph.com/issues/45311 +std::pair<labels_t, std::string> +DaemonMetricCollector::add_fixed_name_metrics(std::string metric_name) { + std::string new_metric_name; + labels_t labels; + new_metric_name = metric_name; + + std::regex re("^data_sync_from_(.*)\\."); + std::smatch match; + if (std::regex_search(metric_name, match, re) == true) { + new_metric_name = std::regex_replace(metric_name, re, "from_([^.]*)', 'from_zone"); + labels["source_zone"] = quote(match.str(1)); + return {labels, new_metric_name}; + } + return {}; +} + +/* +perf_values can be either a int/double or a json_object. Since + json_value is a wrapper of both we use that class. + */ +void DaemonMetricCollector::dump_asok_metric(json_object perf_info, + json_value perf_values, + std::string name, + labels_t labels) { + int64_t type = perf_info["type"].as_int64(); + std::string metric_type = + boost_string_to_std(perf_info["metric_type"].as_string()); + std::string description = + boost_string_to_std(perf_info["description"].as_string()); + + if (type & PERFCOUNTER_LONGRUNAVG) { + int64_t count = perf_values.as_object()["avgcount"].as_int64(); + add_metric(builder, count, name + "_count", description + " Count", "counter", + labels); + json_value sum_value = perf_values.as_object()["sum"]; + add_double_or_int_metric(builder, sum_value, name + "_sum", description + " Total", + metric_type, labels); + } else { + add_double_or_int_metric(builder, perf_values, name, description, + metric_type, labels); + } +} + +void DaemonMetricCollector::update_sockets() { + std::string sock_dir = g_conf().get_val<std::string>("exporter_sock_dir"); + clients.clear(); + std::filesystem::path sock_path = sock_dir; + if (!std::filesystem::is_directory(sock_path.parent_path())) { + dout(1) << "ERROR: No such directory exist" << sock_dir << dendl; + return; + } + for (const auto &entry : std::filesystem::directory_iterator(sock_dir)) { + if (entry.path().extension() == ".asok") { + std::string daemon_socket_name = entry.path().filename().string(); + std::string daemon_name = + daemon_socket_name.substr(0, daemon_socket_name.size() - 5); + if (clients.find(daemon_name) == clients.end() && + !(daemon_name.find("mgr") != std::string::npos) && + !(daemon_name.find("ceph-exporter") != std::string::npos)) { + AdminSocketClient sock(entry.path().string()); + clients.insert({daemon_name, std::move(sock)}); + } + } + } +} + +void OrderedMetricsBuilder::add(std::string value, std::string name, + std::string description, std::string mtype, + labels_t labels) { + if (metrics.find(name) == metrics.end()) { + Metric metric(name, mtype, description); + metrics[name] = std::move(metric); + } + Metric &metric = metrics[name]; + metric.add(labels, value); +} + +std::string OrderedMetricsBuilder::dump() { + for (auto &[name, metric] : metrics) { + out += metric.dump() + "\n"; + } + return out; +} + +void UnorderedMetricsBuilder::add(std::string value, std::string name, + std::string description, std::string mtype, + labels_t labels) { + Metric metric(name, mtype, description); + metric.add(labels, value); + out += metric.dump() + "\n\n"; +} + +std::string UnorderedMetricsBuilder::dump() { return out; } + +void Metric::add(labels_t labels, std::string value) { + metric_entry entry; + entry.labels = labels; + entry.value = value; + entries.push_back(entry); +} + +std::string Metric::dump() { + std::stringstream metric_ss; + metric_ss << "# HELP " << name << " " << description << "\n"; + metric_ss << "# TYPE " << name << " " << mtype << "\n"; + for (auto &entry : entries) { + std::stringstream labels_ss; + size_t i = 0; + for (auto &[label_name, label_value] : entry.labels) { + labels_ss << label_name << "=" << label_value; + if (i < entry.labels.size() - 1) { + labels_ss << ","; + } + i++; + } + metric_ss << name << "{" << labels_ss.str() << "} " << entry.value; + if (&entry != &entries.back()) { + metric_ss << "\n"; + } + } + return metric_ss.str(); +} + +DaemonMetricCollector &collector_instance() { + static DaemonMetricCollector instance; + return instance; +} diff --git a/src/exporter/DaemonMetricCollector.h b/src/exporter/DaemonMetricCollector.h new file mode 100644 index 000000000..e906fb13a --- /dev/null +++ b/src/exporter/DaemonMetricCollector.h @@ -0,0 +1,104 @@ +#pragma once + +#include "common/admin_socket_client.h" +#include <map> +#include <string> +#include <vector> + +#include <boost/asio.hpp> +#include <boost/json/object.hpp> +#include <filesystem> +#include <map> +#include <string> +#include <vector> + +struct pstat { + unsigned long utime; + unsigned long stime; + unsigned long minflt; + unsigned long majflt; + unsigned long start_time; + int num_threads; + unsigned long vm_size; + int resident_size; +}; + +class MetricsBuilder; +class OrderedMetricsBuilder; +class UnorderedMetricsBuilder; +class Metric; + +typedef std::map<std::string, std::string> labels_t; + +class DaemonMetricCollector { +public: + void main(); + std::string get_metrics(); + labels_t get_extra_labels(std::string daemon_name); + +private: + std::map<std::string, AdminSocketClient> clients; + std::string metrics; + std::mutex metrics_mutex; + std::unique_ptr<MetricsBuilder> builder; + void update_sockets(); + void request_loop(boost::asio::steady_timer &timer); + + void dump_asok_metrics(); + void dump_asok_metric(boost::json::object perf_info, + boost::json::value perf_values, std::string name, + labels_t labels); + std::pair<labels_t, std::string> add_fixed_name_metrics(std::string metric_name); + void get_process_metrics(std::vector<std::pair<std::string, int>> daemon_pids); + std::string asok_request(AdminSocketClient &asok, std::string command, std::string daemon_name); +}; + +class Metric { +private: + struct metric_entry { + labels_t labels; + std::string value; + }; + std::string name; + std::string mtype; + std::string description; + std::vector<metric_entry> entries; + +public: + Metric(std::string name, std::string mtype, std::string description) + : name(name), mtype(mtype), description(description) {} + Metric(const Metric &) = default; + Metric() = default; + void add(labels_t labels, std::string value); + std::string dump(); +}; + +class MetricsBuilder { +public: + virtual ~MetricsBuilder() = default; + virtual std::string dump() = 0; + virtual void add(std::string value, std::string name, std::string description, + std::string mtype, labels_t labels) = 0; + +protected: + std::string out; +}; + +class OrderedMetricsBuilder : public MetricsBuilder { +private: + std::map<std::string, Metric> metrics; + +public: + std::string dump(); + void add(std::string value, std::string name, std::string description, + std::string mtype, labels_t labels); +}; + +class UnorderedMetricsBuilder : public MetricsBuilder { +public: + std::string dump(); + void add(std::string value, std::string name, std::string description, + std::string mtype, labels_t labels); +}; + +DaemonMetricCollector &collector_instance(); diff --git a/src/exporter/ceph_exporter.cc b/src/exporter/ceph_exporter.cc new file mode 100644 index 000000000..70650ff87 --- /dev/null +++ b/src/exporter/ceph_exporter.cc @@ -0,0 +1,65 @@ +#include "common/ceph_argparse.h" +#include "common/config.h" +#include "exporter/DaemonMetricCollector.h" +#include "exporter/http_server.h" +#include "global/global_init.h" +#include "global/global_context.h" + +#include <boost/thread/thread.hpp> +#include <iostream> +#include <map> +#include <string> + +#define dout_context g_ceph_context + +static void usage() { + std::cout << "usage: ceph-exporter [options]\n" + << "options:\n" + " --sock-dir: The path to ceph daemons socket files dir\n" + " --addrs: Host ip address where exporter is deployed\n" + " --port: Port to deploy exporter on. Default is 9926\n" + " --prio-limit: Only perf counters greater than or equal to prio-limit are fetched. Default: 5\n" + " --stats-period: Time to wait before sending requests again to exporter server (seconds). Default: 5s" + << std::endl; + generic_server_usage(); +} + +int main(int argc, char **argv) { + + auto args = argv_to_vec(argc, argv); + if (args.empty()) { + std::cerr << argv[0] << ": -h or --help for usage" << std::endl; + exit(1); + } + if (ceph_argparse_need_usage(args)) { + usage(); + exit(0); + } + + auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_DAEMON, 0); + std::string val; + for (auto i = args.begin(); i != args.end();) { + if (ceph_argparse_double_dash(args, i)) { + break; + } else if (ceph_argparse_witharg(args, i, &val, "--sock-dir", (char *)NULL)) { + cct->_conf.set_val("exporter_sock_dir", val); + } else if (ceph_argparse_witharg(args, i, &val, "--addrs", (char *)NULL)) { + cct->_conf.set_val("exporter_addr", val); + } else if (ceph_argparse_witharg(args, i, &val, "--port", (char *)NULL)) { + cct->_conf.set_val("exporter_http_port", val); + } else if (ceph_argparse_witharg(args, i, &val, "--prio-limit", (char *)NULL)) { + cct->_conf.set_val("exporter_prio_limit", val); + } else if (ceph_argparse_witharg(args, i, &val, "--stats-period", (char *)NULL)) { + cct->_conf.set_val("exporter_stats_period", val); + } else { + ++i; + } + } + common_init_finish(g_ceph_context); + + boost::thread server_thread(http_server_thread_entrypoint); + DaemonMetricCollector &collector = collector_instance(); + collector.main(); + server_thread.join(); +} diff --git a/src/exporter/http_server.cc b/src/exporter/http_server.cc new file mode 100644 index 000000000..317d877e8 --- /dev/null +++ b/src/exporter/http_server.cc @@ -0,0 +1,169 @@ +#include "http_server.h" +#include "common/debug.h" +#include "common/hostname.h" +#include "global/global_init.h" +#include "global/global_context.h" +#include "exporter/DaemonMetricCollector.h" + +#include <boost/asio.hpp> +#include <boost/beast/core.hpp> +#include <boost/beast/http.hpp> +#include <boost/beast/version.hpp> +#include <boost/thread/thread.hpp> +#include <chrono> +#include <cstdlib> +#include <ctime> +#include <iostream> +#include <map> +#include <memory> +#include <string> + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_ceph_exporter + +namespace beast = boost::beast; // from <boost/beast.hpp> +namespace http = beast::http; // from <boost/beast/http.hpp> +namespace net = boost::asio; // from <boost/asio.hpp> +using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp> + +class http_connection : public std::enable_shared_from_this<http_connection> { +public: + http_connection(tcp::socket socket) : socket_(std::move(socket)) {} + + // Initiate the asynchronous operations associated with the connection. + void start() { + read_request(); + check_deadline(); + } + +private: + tcp::socket socket_; + beast::flat_buffer buffer_{8192}; + http::request<http::dynamic_body> request_; + http::response<http::string_body> response_; + + net::steady_timer deadline_{socket_.get_executor(), std::chrono::seconds(60)}; + + // Asynchronously receive a complete request message. + void read_request() { + auto self = shared_from_this(); + + http::async_read(socket_, buffer_, request_, + [self](beast::error_code ec, std::size_t bytes_transferred) { + boost::ignore_unused(bytes_transferred); + if (ec) { + dout(1) << "ERROR: " << ec.message() << dendl; + return; + } + else { + self->process_request(); + } + }); + } + + // Determine what needs to be done with the request message. + void process_request() { + response_.version(request_.version()); + response_.keep_alive(request_.keep_alive()); + + switch (request_.method()) { + case http::verb::get: + response_.result(http::status::ok); + create_response(); + break; + + default: + // We return responses indicating an error if + // we do not recognize the request method. + response_.result(http::status::method_not_allowed); + response_.set(http::field::content_type, "text/plain"); + std::string body("Invalid request-method '" + + std::string(request_.method_string()) + "'"); + response_.body() = body; + break; + } + + write_response(); + } + + // Construct a response message based on the program state. + void create_response() { + if (request_.target() == "/") { + response_.set(http::field::content_type, "text/html; charset=utf-8"); + std::string body("<html>\n" + "<head><title>Ceph Exporter</title></head>\n" + "<body>\n" + "<h1>Ceph Exporter</h1>\n" + "<p><a href='/metrics'>Metrics</a></p>" + "</body>\n" + "</html>\n"); + response_.body() = body; + } else if (request_.target() == "/metrics") { + response_.set(http::field::content_type, "text/plain; charset=utf-8"); + DaemonMetricCollector &collector = collector_instance(); + std::string metrics = collector.get_metrics(); + response_.body() = metrics; + } else { + response_.result(http::status::method_not_allowed); + response_.set(http::field::content_type, "text/plain"); + response_.body() = "File not found \n"; + } + } + + // Asynchronously transmit the response message. + void write_response() { + auto self = shared_from_this(); + + response_.prepare_payload(); + + http::async_write(socket_, response_, + [self](beast::error_code ec, std::size_t) { + self->socket_.shutdown(tcp::socket::shutdown_send, ec); + self->deadline_.cancel(); + if (ec) { + dout(1) << "ERROR: " << ec.message() << dendl; + return; + } + }); + } + + // Check whether we have spent enough time on this connection. + void check_deadline() { + auto self = shared_from_this(); + + deadline_.async_wait([self](beast::error_code ec) { + if (!ec) { + // Close socket to cancel any outstanding operation. + self->socket_.close(ec); + } + }); + } +}; + +// "Loop" forever accepting new connections. +void http_server(tcp::acceptor &acceptor, tcp::socket &socket) { + acceptor.async_accept(socket, [&](beast::error_code ec) { + if (!ec) + std::make_shared<http_connection>(std::move(socket))->start(); + http_server(acceptor, socket); + }); +} + +void http_server_thread_entrypoint() { + try { + std::string exporter_addr = g_conf().get_val<std::string>("exporter_addr"); + auto const address = net::ip::make_address(exporter_addr); + unsigned short port = g_conf().get_val<int64_t>("exporter_http_port"); + + net::io_context ioc{1}; + + tcp::acceptor acceptor{ioc, {address, port}}; + tcp::socket socket{ioc}; + http_server(acceptor, socket); + dout(1) << "Http server running on " << exporter_addr << ":" << port << dendl; + ioc.run(); + } catch (std::exception const &e) { + dout(1) << "Error: " << e.what() << dendl; + exit(EXIT_FAILURE); + } +} diff --git a/src/exporter/http_server.h b/src/exporter/http_server.h new file mode 100644 index 000000000..0d0502f57 --- /dev/null +++ b/src/exporter/http_server.h @@ -0,0 +1,5 @@ +#pragma once + +#include <string> + +void http_server_thread_entrypoint(); diff --git a/src/exporter/util.cc b/src/exporter/util.cc new file mode 100644 index 000000000..06a8338b8 --- /dev/null +++ b/src/exporter/util.cc @@ -0,0 +1,69 @@ +#include "util.h" + +#include <boost/algorithm/string/classification.hpp> +#include <boost/algorithm/string/replace.hpp> +#include <cctype> +#include <chrono> +#include <fstream> +#include <iostream> +#include <sstream> + +#include "common/debug.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_ceph_exporter + +BlockTimer::BlockTimer(std::string file, std::string function) + : file(file), function(function), stopped(false) { + t1 = std::chrono::high_resolution_clock::now(); +} +BlockTimer::~BlockTimer() { + dout(20) << file << ":" << function << ": " << ms.count() << "ms" << dendl; +} + +// useful with stop +double BlockTimer::get_ms() { + return ms.count(); +} + +// Manually stop the timer as you might want to get the time +void BlockTimer::stop() { + if (!stopped) { + stopped = true; + t2 = std::chrono::high_resolution_clock::now(); + ms = t2 - t1; + } +} + +bool string_is_digit(std::string s) { + size_t i = 0; + while (std::isdigit(s[i]) && i < s.size()) { + i++; + } + return i >= s.size(); +} + +std::string read_file_to_string(std::string path) { + std::ifstream is(path); + std::stringstream buffer; + buffer << is.rdbuf(); + return buffer.str(); +} + +// Must be kept in sync with promethize() in src/pybind/mgr/prometheus/module.py +void promethize(std::string &name) { + if (name[name.size() - 1] == '-') { + name[name.size() - 1] = '_'; + name += "minus"; + } + + auto should_be_underscore = [](char ch) { + return ch == '.' || ch == '/' || ch == ' ' || ch == '-'; + }; + std::replace_if(name.begin(), name.end(), should_be_underscore, '_'); + + boost::replace_all(name, "::", "_"); + boost::replace_all(name, "+", "_plus"); + + name = "ceph_" + name; +} diff --git a/src/exporter/util.h b/src/exporter/util.h new file mode 100644 index 000000000..2628864d5 --- /dev/null +++ b/src/exporter/util.h @@ -0,0 +1,24 @@ +#include "common/hostname.h" +#include <chrono> +#include <string> + +#define TIMED_FUNCTION() BlockTimer timer(__FILE__, __FUNCTION__) + +class BlockTimer { + public: + BlockTimer(std::string file, std::string function); + ~BlockTimer(); + void stop(); + double get_ms(); + private: + std::chrono::duration<double, std::milli> ms; + std::string file, function; + bool stopped; + std::chrono::time_point<std::chrono::high_resolution_clock> t1, t2; +}; + +bool string_is_digit(std::string s); +std::string read_file_to_string(std::string path); +std::string get_hostname(std::string path); + +void promethize(std::string &name); |