summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk_node.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_aclk_node.c')
-rw-r--r--database/sqlite/sqlite_aclk_node.c130
1 files changed, 76 insertions, 54 deletions
diff --git a/database/sqlite/sqlite_aclk_node.c b/database/sqlite/sqlite_aclk_node.c
index 82927854a..dcc8c375c 100644
--- a/database/sqlite/sqlite_aclk_node.c
+++ b/database/sqlite/sqlite_aclk_node.c
@@ -7,17 +7,16 @@
#include "../../aclk/aclk_capas.h"
#ifdef ENABLE_ACLK
+
DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) {
RRDSET *st;
char name[500];
- rrdset_foreach_read(st, host) {
+ rrdset_foreach_read(st, host)
+ {
if (rrdset_is_available_for_viewers(st)) {
- struct collector_info col = {
- .plugin = rrdset_plugin_name(st),
- .module = rrdset_module_name(st)
- };
- snprintfz(name, 499, "%s:%s", col.plugin, col.module);
+ struct collector_info col = {.plugin = rrdset_plugin_name(st), .module = rrdset_module_name(st)};
+ snprintfz(name, sizeof(name) - 1, "%s:%s", col.plugin, col.module);
dictionary_set(dict, name, &col, sizeof(struct collector_info));
}
}
@@ -26,17 +25,9 @@ DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) {
return dict;
}
-static void build_node_collectors(char *node_id __maybe_unused)
+static void build_node_collectors(RRDHOST *host)
{
-
- RRDHOST *host = find_host_by_node_id(node_id);
-
- if (unlikely(!host))
- return;
-
- struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config;
- if (unlikely(!wc))
- return;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
struct update_node_collectors upd_node_collectors;
DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED);
@@ -50,45 +41,33 @@ static void build_node_collectors(char *node_id __maybe_unused)
dictionary_destroy(dict);
freez(upd_node_collectors.claim_id);
- netdata_log_access("ACLK RES [%s (%s)]: NODE COLLECTORS SENT", node_id, rrdhost_hostname(host));
-
- freez(node_id);
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: NODE COLLECTORS SENT", wc->node_id, rrdhost_hostname(host));
}
-static void build_node_info(char *node_id __maybe_unused)
+static void build_node_info(RRDHOST *host)
{
struct update_node_info node_info;
- RRDHOST *host = find_host_by_node_id(node_id);
-
- if (unlikely((!host))) {
- freez(node_id);
- return;
- }
-
- struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config;
-
- if (unlikely(!wc)) {
- freez(node_id);
- return;
- }
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
rrd_rdlock();
node_info.node_id = wc->node_id;
node_info.claim_id = get_agent_claimid();
node_info.machine_guid = host->machine_guid;
- node_info.child = (wc->host != localhost);
+ node_info.child = (host != localhost);
node_info.ml_info.ml_capable = ml_capable();
- node_info.ml_info.ml_enabled = ml_enabled(wc->host);
+ node_info.ml_info.ml_enabled = ml_enabled(host);
- node_info.node_instance_capabilities = aclk_get_node_instance_capas(wc->host);
+ node_info.node_instance_capabilities = aclk_get_node_instance_capas(host);
now_realtime_timeval(&node_info.updated_at);
char *host_version = NULL;
if (host != localhost) {
netdata_mutex_lock(&host->receiver_lock);
- host_version = strdupz(host->receiver && host->receiver->program_version ? host->receiver->program_version : rrdhost_program_version(host));
+ host_version = strdupz(
+ host->receiver && host->receiver->program_version ? host->receiver->program_version :
+ rrdhost_program_version(host));
netdata_mutex_unlock(&host->receiver_lock);
}
@@ -112,10 +91,11 @@ static void build_node_info(char *node_id __maybe_unused)
node_info.data.machine_guid = host->machine_guid;
struct capability node_caps[] = {
- { .name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled },
- { .name = "mc", .version = host->system_info->mc_version ? host->system_info->mc_version : 0, .enabled = host->system_info->mc_version ? 1 : 0 },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
+ {.name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled},
+ {.name = "mc",
+ .version = host->system_info->mc_version ? host->system_info->mc_version : 0,
+ .enabled = host->system_info->mc_version ? 1 : 0},
+ {.name = NULL, .version = 0, .enabled = 0}};
node_info.node_capabilities = node_caps;
node_info.data.ml_info.ml_capable = host->system_info->ml_capable;
@@ -124,7 +104,14 @@ static void build_node_info(char *node_id __maybe_unused)
node_info.data.host_labels_ptr = host->rrdlabels;
aclk_update_node_info(&node_info);
- netdata_log_access("ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", wc->node_id, rrdhost_hostname(wc->host), host->machine_guid, wc->host == localhost ? "parent" : "child");
+ nd_log(
+ NDLS_ACCESS,
+ NDLP_DEBUG,
+ "ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)",
+ wc->node_id,
+ rrdhost_hostname(host),
+ host->machine_guid,
+ host == localhost ? "parent" : "child");
rrd_unlock();
freez(node_info.claim_id);
@@ -132,10 +119,21 @@ static void build_node_info(char *node_id __maybe_unused)
freez(host_version);
wc->node_collectors_send = now_realtime_sec();
- freez(node_id);
-
}
+static bool host_is_replicating(RRDHOST *host)
+{
+ bool replicating = false;
+ RRDSET *st;
+ rrdset_foreach_reentrant(st, host) {
+ if (rrdset_is_replicating(st)) {
+ replicating = true;
+ break;
+ }
+ }
+ rrdset_foreach_done(st);
+ return replicating;
+}
void aclk_check_node_info_and_collectors(void)
{
@@ -144,35 +142,59 @@ void aclk_check_node_info_and_collectors(void)
if (unlikely(!aclk_connected))
return;
- size_t pending = 0;
- dfe_start_reentrant(rrdhost_root_index, host) {
+ size_t context_loading = 0;
+ size_t replicating = 0;
+ size_t context_pp = 0;
- struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+ time_t now = now_realtime_sec();
+ dfe_start_reentrant(rrdhost_root_index, host)
+ {
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
if (unlikely(!wc))
continue;
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
internal_error(true, "ACLK SYNC: Context still pending for %s", rrdhost_hostname(host));
- pending++;
+ context_loading++;
continue;
}
- if (wc->node_info_send_time && wc->node_info_send_time + 30 < now_realtime_sec()) {
+ if (unlikely(host_is_replicating(host))) {
+ internal_error(true, "ACLK SYNC: Host %s is still replicating", rrdhost_hostname(host));
+ replicating++;
+ continue;
+ }
+
+ bool pp_queue_empty = !(host->rrdctx.pp_queue && dictionary_entries(host->rrdctx.pp_queue));
+
+ if (!pp_queue_empty && (wc->node_info_send_time || wc->node_collectors_send))
+ context_pp++;
+
+ if (pp_queue_empty && wc->node_info_send_time && wc->node_info_send_time + 30 < now) {
wc->node_info_send_time = 0;
- build_node_info(strdupz(wc->node_id));
+ build_node_info(host);
internal_error(true, "ACLK SYNC: Sending node info for %s", rrdhost_hostname(host));
}
- if (wc->node_collectors_send && wc->node_collectors_send + 30 < now_realtime_sec()) {
- build_node_collectors(strdupz(wc->node_id));
+ if (pp_queue_empty && wc->node_collectors_send && wc->node_collectors_send + 30 < now) {
+ build_node_collectors(host);
internal_error(true, "ACLK SYNC: Sending collectors for %s", rrdhost_hostname(host));
wc->node_collectors_send = 0;
}
}
dfe_done(host);
- if(pending)
- netdata_log_info("ACLK: %zu nodes are pending for contexts to load, skipped sending node info for them", pending);
+ if (context_loading || replicating || context_pp) {
+ nd_log_limit_static_thread_var(erl, 10, 100 * USEC_PER_MS);
+ nd_log_limit(
+ &erl,
+ NDLS_DAEMON,
+ NDLP_INFO,
+ "%zu nodes loading contexts, %zu replicating data, %zu pending context post processing",
+ context_loading,
+ replicating,
+ context_pp);
+ }
}
#endif