From c21c3b0befeb46a51b6bf3758ffa30813bea0ff0 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 9 Mar 2024 14:19:22 +0100 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- database/sqlite/sqlite_aclk_node.c | 130 ++++++++++++++++++++++--------------- 1 file changed, 76 insertions(+), 54 deletions(-) (limited to 'database/sqlite/sqlite_aclk_node.c') diff --git a/database/sqlite/sqlite_aclk_node.c b/database/sqlite/sqlite_aclk_node.c index 82927854a..dcc8c375c 100644 --- a/database/sqlite/sqlite_aclk_node.c +++ b/database/sqlite/sqlite_aclk_node.c @@ -7,17 +7,16 @@ #include "../../aclk/aclk_capas.h" #ifdef ENABLE_ACLK + DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) { RRDSET *st; char name[500]; - rrdset_foreach_read(st, host) { + rrdset_foreach_read(st, host) + { if (rrdset_is_available_for_viewers(st)) { - struct collector_info col = { - .plugin = rrdset_plugin_name(st), - .module = rrdset_module_name(st) - }; - snprintfz(name, 499, "%s:%s", col.plugin, col.module); + struct collector_info col = {.plugin = rrdset_plugin_name(st), .module = rrdset_module_name(st)}; + snprintfz(name, sizeof(name) - 1, "%s:%s", col.plugin, col.module); dictionary_set(dict, name, &col, sizeof(struct collector_info)); } } @@ -26,17 +25,9 @@ DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) { return dict; } -static void build_node_collectors(char *node_id __maybe_unused) +static void build_node_collectors(RRDHOST *host) { - - RRDHOST *host = find_host_by_node_id(node_id); - - if (unlikely(!host)) - return; - - struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config; - if (unlikely(!wc)) - return; + struct aclk_sync_cfg_t *wc = host->aclk_config; struct update_node_collectors upd_node_collectors; DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED); @@ -50,45 +41,33 @@ static void build_node_collectors(char *node_id __maybe_unused) dictionary_destroy(dict); freez(upd_node_collectors.claim_id); - netdata_log_access("ACLK RES [%s (%s)]: NODE COLLECTORS SENT", node_id, rrdhost_hostname(host)); - - freez(node_id); + nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: NODE COLLECTORS SENT", wc->node_id, rrdhost_hostname(host)); } -static void build_node_info(char *node_id __maybe_unused) +static void build_node_info(RRDHOST *host) { struct update_node_info node_info; - RRDHOST *host = find_host_by_node_id(node_id); - - if (unlikely((!host))) { - freez(node_id); - return; - } - - struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config; - - if (unlikely(!wc)) { - freez(node_id); - return; - } + struct aclk_sync_cfg_t *wc = host->aclk_config; rrd_rdlock(); node_info.node_id = wc->node_id; node_info.claim_id = get_agent_claimid(); node_info.machine_guid = host->machine_guid; - node_info.child = (wc->host != localhost); + node_info.child = (host != localhost); node_info.ml_info.ml_capable = ml_capable(); - node_info.ml_info.ml_enabled = ml_enabled(wc->host); + node_info.ml_info.ml_enabled = ml_enabled(host); - node_info.node_instance_capabilities = aclk_get_node_instance_capas(wc->host); + node_info.node_instance_capabilities = aclk_get_node_instance_capas(host); now_realtime_timeval(&node_info.updated_at); char *host_version = NULL; if (host != localhost) { netdata_mutex_lock(&host->receiver_lock); - host_version = strdupz(host->receiver && host->receiver->program_version ? host->receiver->program_version : rrdhost_program_version(host)); + host_version = strdupz( + host->receiver && host->receiver->program_version ? host->receiver->program_version : + rrdhost_program_version(host)); netdata_mutex_unlock(&host->receiver_lock); } @@ -112,10 +91,11 @@ static void build_node_info(char *node_id __maybe_unused) node_info.data.machine_guid = host->machine_guid; struct capability node_caps[] = { - { .name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled }, - { .name = "mc", .version = host->system_info->mc_version ? host->system_info->mc_version : 0, .enabled = host->system_info->mc_version ? 1 : 0 }, - { .name = NULL, .version = 0, .enabled = 0 } - }; + {.name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled}, + {.name = "mc", + .version = host->system_info->mc_version ? host->system_info->mc_version : 0, + .enabled = host->system_info->mc_version ? 1 : 0}, + {.name = NULL, .version = 0, .enabled = 0}}; node_info.node_capabilities = node_caps; node_info.data.ml_info.ml_capable = host->system_info->ml_capable; @@ -124,7 +104,14 @@ static void build_node_info(char *node_id __maybe_unused) node_info.data.host_labels_ptr = host->rrdlabels; aclk_update_node_info(&node_info); - netdata_log_access("ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", wc->node_id, rrdhost_hostname(wc->host), host->machine_guid, wc->host == localhost ? "parent" : "child"); + nd_log( + NDLS_ACCESS, + NDLP_DEBUG, + "ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", + wc->node_id, + rrdhost_hostname(host), + host->machine_guid, + host == localhost ? "parent" : "child"); rrd_unlock(); freez(node_info.claim_id); @@ -132,10 +119,21 @@ static void build_node_info(char *node_id __maybe_unused) freez(host_version); wc->node_collectors_send = now_realtime_sec(); - freez(node_id); - } +static bool host_is_replicating(RRDHOST *host) +{ + bool replicating = false; + RRDSET *st; + rrdset_foreach_reentrant(st, host) { + if (rrdset_is_replicating(st)) { + replicating = true; + break; + } + } + rrdset_foreach_done(st); + return replicating; +} void aclk_check_node_info_and_collectors(void) { @@ -144,35 +142,59 @@ void aclk_check_node_info_and_collectors(void) if (unlikely(!aclk_connected)) return; - size_t pending = 0; - dfe_start_reentrant(rrdhost_root_index, host) { + size_t context_loading = 0; + size_t replicating = 0; + size_t context_pp = 0; - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + time_t now = now_realtime_sec(); + dfe_start_reentrant(rrdhost_root_index, host) + { + struct aclk_sync_cfg_t *wc = host->aclk_config; if (unlikely(!wc)) continue; if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) { internal_error(true, "ACLK SYNC: Context still pending for %s", rrdhost_hostname(host)); - pending++; + context_loading++; continue; } - if (wc->node_info_send_time && wc->node_info_send_time + 30 < now_realtime_sec()) { + if (unlikely(host_is_replicating(host))) { + internal_error(true, "ACLK SYNC: Host %s is still replicating", rrdhost_hostname(host)); + replicating++; + continue; + } + + bool pp_queue_empty = !(host->rrdctx.pp_queue && dictionary_entries(host->rrdctx.pp_queue)); + + if (!pp_queue_empty && (wc->node_info_send_time || wc->node_collectors_send)) + context_pp++; + + if (pp_queue_empty && wc->node_info_send_time && wc->node_info_send_time + 30 < now) { wc->node_info_send_time = 0; - build_node_info(strdupz(wc->node_id)); + build_node_info(host); internal_error(true, "ACLK SYNC: Sending node info for %s", rrdhost_hostname(host)); } - if (wc->node_collectors_send && wc->node_collectors_send + 30 < now_realtime_sec()) { - build_node_collectors(strdupz(wc->node_id)); + if (pp_queue_empty && wc->node_collectors_send && wc->node_collectors_send + 30 < now) { + build_node_collectors(host); internal_error(true, "ACLK SYNC: Sending collectors for %s", rrdhost_hostname(host)); wc->node_collectors_send = 0; } } dfe_done(host); - if(pending) - netdata_log_info("ACLK: %zu nodes are pending for contexts to load, skipped sending node info for them", pending); + if (context_loading || replicating || context_pp) { + nd_log_limit_static_thread_var(erl, 10, 100 * USEC_PER_MS); + nd_log_limit( + &erl, + NDLS_DAEMON, + NDLP_INFO, + "%zu nodes loading contexts, %zu replicating data, %zu pending context post processing", + context_loading, + replicating, + context_pp); + } } #endif -- cgit v1.2.3