diff options
Diffstat (limited to 'database/sqlite/sqlite_aclk_node.c')
-rw-r--r-- | database/sqlite/sqlite_aclk_node.c | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/database/sqlite/sqlite_aclk_node.c b/database/sqlite/sqlite_aclk_node.c new file mode 100644 index 00000000..dcc8c375 --- /dev/null +++ b/database/sqlite/sqlite_aclk_node.c @@ -0,0 +1,200 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "sqlite_functions.h" +#include "sqlite_aclk_node.h" + +#include "../../aclk/aclk_contexts_api.h" +#include "../../aclk/aclk_capas.h" + +#ifdef ENABLE_ACLK + +DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) { + RRDSET *st; + char name[500]; + + rrdset_foreach_read(st, host) + { + if (rrdset_is_available_for_viewers(st)) { + struct collector_info col = {.plugin = rrdset_plugin_name(st), .module = rrdset_module_name(st)}; + snprintfz(name, sizeof(name) - 1, "%s:%s", col.plugin, col.module); + dictionary_set(dict, name, &col, sizeof(struct collector_info)); + } + } + rrdset_foreach_done(st); + + return dict; +} + +static void build_node_collectors(RRDHOST *host) +{ + struct aclk_sync_cfg_t *wc = host->aclk_config; + + struct update_node_collectors upd_node_collectors; + DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED); + + upd_node_collectors.node_id = wc->node_id; + upd_node_collectors.claim_id = get_agent_claimid(); + + upd_node_collectors.node_collectors = collectors_from_charts(host, dict); + aclk_update_node_collectors(&upd_node_collectors); + + dictionary_destroy(dict); + freez(upd_node_collectors.claim_id); + + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: NODE COLLECTORS SENT", wc->node_id, rrdhost_hostname(host)); +} + +static void build_node_info(RRDHOST *host) +{ + struct update_node_info node_info; + + struct aclk_sync_cfg_t *wc = host->aclk_config; + + rrd_rdlock(); + node_info.node_id = wc->node_id; + node_info.claim_id = get_agent_claimid(); + node_info.machine_guid = host->machine_guid; + node_info.child = (host != localhost); + node_info.ml_info.ml_capable = ml_capable(); + node_info.ml_info.ml_enabled = ml_enabled(host); + + node_info.node_instance_capabilities = aclk_get_node_instance_capas(host); + + now_realtime_timeval(&node_info.updated_at); + + char *host_version = NULL; + if (host != localhost) { + netdata_mutex_lock(&host->receiver_lock); + host_version = strdupz( + host->receiver && host->receiver->program_version ? host->receiver->program_version : + rrdhost_program_version(host)); + netdata_mutex_unlock(&host->receiver_lock); + } + + node_info.data.name = rrdhost_hostname(host); + node_info.data.os = rrdhost_os(host); + node_info.data.os_name = host->system_info->host_os_name; + node_info.data.os_version = host->system_info->host_os_version; + node_info.data.kernel_name = host->system_info->kernel_name; + node_info.data.kernel_version = host->system_info->kernel_version; + node_info.data.architecture = host->system_info->architecture; + node_info.data.cpus = host->system_info->host_cores ? str2uint32_t(host->system_info->host_cores, NULL) : 0; + node_info.data.cpu_frequency = host->system_info->host_cpu_freq ? host->system_info->host_cpu_freq : "0"; + node_info.data.memory = host->system_info->host_ram_total ? host->system_info->host_ram_total : "0"; + node_info.data.disk_space = host->system_info->host_disk_space ? host->system_info->host_disk_space : "0"; + node_info.data.version = host_version ? host_version : VERSION; + node_info.data.release_channel = get_release_channel(); + node_info.data.timezone = rrdhost_abbrev_timezone(host); + node_info.data.virtualization_type = host->system_info->virtualization ? host->system_info->virtualization : "unknown"; + node_info.data.container_type = host->system_info->container ? host->system_info->container : "unknown"; + node_info.data.custom_info = config_get(CONFIG_SECTION_WEB, "custom dashboard_info.js", ""); + node_info.data.machine_guid = host->machine_guid; + + struct capability node_caps[] = { + {.name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled}, + {.name = "mc", + .version = host->system_info->mc_version ? host->system_info->mc_version : 0, + .enabled = host->system_info->mc_version ? 1 : 0}, + {.name = NULL, .version = 0, .enabled = 0}}; + node_info.node_capabilities = node_caps; + + node_info.data.ml_info.ml_capable = host->system_info->ml_capable; + node_info.data.ml_info.ml_enabled = host->system_info->ml_enabled; + + node_info.data.host_labels_ptr = host->rrdlabels; + + aclk_update_node_info(&node_info); + nd_log( + NDLS_ACCESS, + NDLP_DEBUG, + "ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", + wc->node_id, + rrdhost_hostname(host), + host->machine_guid, + host == localhost ? "parent" : "child"); + + rrd_unlock(); + freez(node_info.claim_id); + freez(node_info.node_instance_capabilities); + freez(host_version); + + wc->node_collectors_send = now_realtime_sec(); +} + +static bool host_is_replicating(RRDHOST *host) +{ + bool replicating = false; + RRDSET *st; + rrdset_foreach_reentrant(st, host) { + if (rrdset_is_replicating(st)) { + replicating = true; + break; + } + } + rrdset_foreach_done(st); + return replicating; +} + +void aclk_check_node_info_and_collectors(void) +{ + RRDHOST *host; + + if (unlikely(!aclk_connected)) + return; + + size_t context_loading = 0; + size_t replicating = 0; + size_t context_pp = 0; + + time_t now = now_realtime_sec(); + dfe_start_reentrant(rrdhost_root_index, host) + { + struct aclk_sync_cfg_t *wc = host->aclk_config; + if (unlikely(!wc)) + continue; + + if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) { + internal_error(true, "ACLK SYNC: Context still pending for %s", rrdhost_hostname(host)); + context_loading++; + continue; + } + + if (unlikely(host_is_replicating(host))) { + internal_error(true, "ACLK SYNC: Host %s is still replicating", rrdhost_hostname(host)); + replicating++; + continue; + } + + bool pp_queue_empty = !(host->rrdctx.pp_queue && dictionary_entries(host->rrdctx.pp_queue)); + + if (!pp_queue_empty && (wc->node_info_send_time || wc->node_collectors_send)) + context_pp++; + + if (pp_queue_empty && wc->node_info_send_time && wc->node_info_send_time + 30 < now) { + wc->node_info_send_time = 0; + build_node_info(host); + internal_error(true, "ACLK SYNC: Sending node info for %s", rrdhost_hostname(host)); + } + + if (pp_queue_empty && wc->node_collectors_send && wc->node_collectors_send + 30 < now) { + build_node_collectors(host); + internal_error(true, "ACLK SYNC: Sending collectors for %s", rrdhost_hostname(host)); + wc->node_collectors_send = 0; + } + } + dfe_done(host); + + if (context_loading || replicating || context_pp) { + nd_log_limit_static_thread_var(erl, 10, 100 * USEC_PER_MS); + nd_log_limit( + &erl, + NDLS_DAEMON, + NDLP_INFO, + "%zu nodes loading contexts, %zu replicating data, %zu pending context post processing", + context_loading, + replicating, + context_pp); + } +} + +#endif |