summaryrefslogtreecommitdiffstats
path: root/database/contexts/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/contexts/worker.c')
-rw-r--r--database/contexts/worker.c1094
1 files changed, 1094 insertions, 0 deletions
diff --git a/database/contexts/worker.c b/database/contexts/worker.c
new file mode 100644
index 00000000..22e28b2a
--- /dev/null
+++ b/database/contexts/worker.c
@@ -0,0 +1,1094 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "internal.h"
+
+static uint64_t rrdcontext_get_next_version(RRDCONTEXT *rc);
+
+static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused);
+
+static void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc);
+
+static void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc);
+static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs);
+
+static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jobs);
+static void rrdcontext_garbage_collect_for_all_hosts(void);
+
+extern usec_t rrdcontext_next_db_rotation_ut;
+
+// ----------------------------------------------------------------------------
+// load from SQL
+
+static void rrdinstance_load_clabel(SQL_CLABEL_DATA *sld, void *data) {
+ RRDINSTANCE *ri = data;
+ rrdlabels_add(ri->rrdlabels, sld->label_key, sld->label_value, sld->label_source);
+}
+
+static void rrdinstance_load_dimension(SQL_DIMENSION_DATA *sd, void *data) {
+ RRDINSTANCE *ri = data;
+
+ RRDMETRIC trm = {
+ .id = string_strdupz(sd->id),
+ .name = string_strdupz(sd->name),
+ .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_LOAD_SQL, // no need for atomic
+ };
+ if(sd->hidden) trm.flags |= RRD_FLAG_HIDDEN;
+
+ uuid_copy(trm.uuid, sd->dim_id);
+
+ dictionary_set(ri->rrdmetrics, string2str(trm.id), &trm, sizeof(trm));
+}
+
+static void rrdinstance_load_chart_callback(SQL_CHART_DATA *sc, void *data) {
+ RRDHOST *host = data;
+
+ RRDCONTEXT tc = {
+ .id = string_strdupz(sc->context),
+ .title = string_strdupz(sc->title),
+ .units = string_strdupz(sc->units),
+ .family = string_strdupz(sc->family),
+ .priority = sc->priority,
+ .chart_type = sc->chart_type,
+ .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_LOAD_SQL, // no need for atomics
+ .rrdhost = host,
+ };
+
+ RRDCONTEXT_ACQUIRED *rca = (RRDCONTEXT_ACQUIRED *)dictionary_set_and_acquire_item(host->rrdctx.contexts, string2str(tc.id), &tc, sizeof(tc));
+ RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
+
+ RRDINSTANCE tri = {
+ .id = string_strdupz(sc->id),
+ .name = string_strdupz(sc->name),
+ .title = string_strdupz(sc->title),
+ .units = string_strdupz(sc->units),
+ .family = string_strdupz(sc->family),
+ .chart_type = sc->chart_type,
+ .priority = sc->priority,
+ .update_every_s = sc->update_every,
+ .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_LOAD_SQL, // no need for atomics
+ };
+ uuid_copy(tri.uuid, sc->chart_id);
+
+ RRDINSTANCE_ACQUIRED *ria = (RRDINSTANCE_ACQUIRED *)dictionary_set_and_acquire_item(rc->rrdinstances, sc->id, &tri, sizeof(tri));
+ RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+
+ ctx_get_dimension_list(&ri->uuid, rrdinstance_load_dimension, ri);
+ ctx_get_label_list(&ri->uuid, rrdinstance_load_clabel, ri);
+ rrdinstance_trigger_updates(ri, __FUNCTION__ );
+ rrdinstance_release(ria);
+ rrdcontext_release(rca);
+}
+
+static void rrdcontext_load_context_callback(VERSIONED_CONTEXT_DATA *ctx_data, void *data) {
+ RRDHOST *host = data;
+ (void)host;
+
+ RRDCONTEXT trc = {
+ .id = string_strdupz(ctx_data->id),
+ .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_LOAD_SQL, // no need for atomics
+
+ // no need to set more data here
+ // we only need the hub data
+
+ .hub = *ctx_data,
+ };
+ dictionary_set(host->rrdctx.contexts, string2str(trc.id), &trc, sizeof(trc));
+}
+
+void rrdhost_load_rrdcontext_data(RRDHOST *host) {
+ if(host->rrdctx.contexts) return;
+
+ rrdhost_create_rrdcontexts(host);
+ ctx_get_context_list(&host->host_uuid, rrdcontext_load_context_callback, host);
+ ctx_get_chart_list(&host->host_uuid, rrdinstance_load_chart_callback, host);
+
+ RRDCONTEXT *rc;
+ dfe_start_read(host->rrdctx.contexts, rc) {
+ rrdcontext_trigger_updates(rc, __FUNCTION__ );
+ }
+ dfe_done(rc);
+
+ rrdcontext_garbage_collect_single_host(host, false);
+}
+
+// ----------------------------------------------------------------------------
+// version hash calculation
+
+uint64_t rrdcontext_version_hash_with_callback(
+ RRDHOST *host,
+ void (*callback)(RRDCONTEXT *, bool, void *),
+ bool snapshot,
+ void *bundle) {
+
+ if(unlikely(!host || !host->rrdctx.contexts)) return 0;
+
+ RRDCONTEXT *rc;
+ uint64_t hash = 0;
+
+ // loop through all contexts of the host
+ dfe_start_read(host->rrdctx.contexts, rc) {
+
+ rrdcontext_lock(rc);
+
+ if(unlikely(rrd_flag_check(rc, RRD_FLAG_HIDDEN))) {
+ rrdcontext_unlock(rc);
+ continue;
+ }
+
+ if(unlikely(callback))
+ callback(rc, snapshot, bundle);
+
+ // skip any deleted contexts
+ if(unlikely(rrd_flag_is_deleted(rc))) {
+ rrdcontext_unlock(rc);
+ continue;
+ }
+
+ // we use rc->hub.* which has the latest
+ // metadata we have sent to the hub
+
+ // if a context is currently queued, rc->hub.* does NOT
+ // reflect the queued changes. rc->hub.* is updated with
+ // their metadata, after messages are dispatched to hub.
+
+ // when the context is being collected,
+ // rc->hub.last_time_t is already zero
+
+ hash += rc->hub.version + rc->hub.last_time_s - rc->hub.first_time_s;
+
+ rrdcontext_unlock(rc);
+
+ }
+ dfe_done(rc);
+
+ return hash;
+}
+
+// ----------------------------------------------------------------------------
+// retention recalculation
+
+void rrdcontext_recalculate_context_retention(RRDCONTEXT *rc, RRD_FLAGS reason, bool worker_jobs) {
+ rrdcontext_post_process_updates(rc, true, reason, worker_jobs);
+}
+
+void rrdcontext_recalculate_host_retention(RRDHOST *host, RRD_FLAGS reason, bool worker_jobs) {
+ if(unlikely(!host || !host->rrdctx.contexts)) return;
+
+ RRDCONTEXT *rc;
+ dfe_start_read(host->rrdctx.contexts, rc) {
+ rrdcontext_recalculate_context_retention(rc, reason, worker_jobs);
+ }
+ dfe_done(rc);
+}
+
+static void rrdcontext_recalculate_retention_all_hosts(void) {
+ rrdcontext_next_db_rotation_ut = 0;
+ RRDHOST *host;
+ dfe_start_reentrant(rrdhost_root_index, host) {
+ worker_is_busy(WORKER_JOB_RETENTION);
+ rrdcontext_recalculate_host_retention(host, RRD_FLAG_UPDATE_REASON_DB_ROTATION, true);
+ }
+ dfe_done(host);
+}
+
+// ----------------------------------------------------------------------------
+// garbage collector
+
+bool rrdmetric_update_retention(RRDMETRIC *rm) {
+ time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
+
+ if(rm->rrddim) {
+ min_first_time_t = rrddim_first_entry_s(rm->rrddim);
+ max_last_time_t = rrddim_last_entry_s(rm->rrddim);
+ }
+ else {
+ RRDHOST *rrdhost = rm->ri->rc->rrdhost;
+ for (size_t tier = 0; tier < storage_tiers; tier++) {
+ STORAGE_ENGINE *eng = rrdhost->db[tier].eng;
+
+ time_t first_time_t, last_time_t;
+ if (eng->api.metric_retention_by_uuid(rrdhost->db[tier].instance, &rm->uuid, &first_time_t, &last_time_t)) {
+ if (first_time_t < min_first_time_t)
+ min_first_time_t = first_time_t;
+
+ if (last_time_t > max_last_time_t)
+ max_last_time_t = last_time_t;
+ }
+ }
+ }
+
+ if((min_first_time_t == LONG_MAX || min_first_time_t == 0) && max_last_time_t == 0)
+ return false;
+
+ if(min_first_time_t == LONG_MAX)
+ min_first_time_t = 0;
+
+ if(min_first_time_t > max_last_time_t) {
+ internal_error(true, "RRDMETRIC: retention of '%s' is flipped, first_time_t = %ld, last_time_t = %ld", string2str(rm->id), min_first_time_t, max_last_time_t);
+ time_t tmp = min_first_time_t;
+ min_first_time_t = max_last_time_t;
+ max_last_time_t = tmp;
+ }
+
+ // check if retention changed
+
+ if (min_first_time_t != rm->first_time_s) {
+ rm->first_time_s = min_first_time_t;
+ rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if (max_last_time_t != rm->last_time_s) {
+ rm->last_time_s = max_last_time_t;
+ rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ if(unlikely(!rm->first_time_s && !rm->last_time_s))
+ rrd_flag_set_deleted(rm, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+
+ rrd_flag_set(rm, RRD_FLAG_LIVE_RETENTION);
+
+ return true;
+}
+
+static inline bool rrdmetric_should_be_deleted(RRDMETRIC *rm) {
+ if(likely(!rrd_flag_check(rm, RRD_FLAGS_REQUIRED_FOR_DELETIONS)))
+ return false;
+
+ if(likely(rrd_flag_check(rm, RRD_FLAGS_PREVENTING_DELETIONS)))
+ return false;
+
+ if(likely(rm->rrddim))
+ return false;
+
+ rrdmetric_update_retention(rm);
+ if(rm->first_time_s || rm->last_time_s)
+ return false;
+
+ return true;
+}
+
+static inline bool rrdinstance_should_be_deleted(RRDINSTANCE *ri) {
+ if(likely(!rrd_flag_check(ri, RRD_FLAGS_REQUIRED_FOR_DELETIONS)))
+ return false;
+
+ if(likely(rrd_flag_check(ri, RRD_FLAGS_PREVENTING_DELETIONS)))
+ return false;
+
+ if(likely(ri->rrdset))
+ return false;
+
+ if(unlikely(dictionary_referenced_items(ri->rrdmetrics) != 0))
+ return false;
+
+ if(unlikely(dictionary_entries(ri->rrdmetrics) != 0))
+ return false;
+
+ if(ri->first_time_s || ri->last_time_s)
+ return false;
+
+ return true;
+}
+
+static inline bool rrdcontext_should_be_deleted(RRDCONTEXT *rc) {
+ if(likely(!rrd_flag_check(rc, RRD_FLAGS_REQUIRED_FOR_DELETIONS)))
+ return false;
+
+ if(likely(rrd_flag_check(rc, RRD_FLAGS_PREVENTING_DELETIONS)))
+ return false;
+
+ if(unlikely(dictionary_referenced_items(rc->rrdinstances) != 0))
+ return false;
+
+ if(unlikely(dictionary_entries(rc->rrdinstances) != 0))
+ return false;
+
+ if(unlikely(rc->first_time_s || rc->last_time_s))
+ return false;
+
+ return true;
+}
+
+void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc) {
+ // we need to refresh the string pointers in rc->hub
+ // in case the context changed values
+ rc->hub.id = string2str(rc->id);
+ rc->hub.title = string2str(rc->title);
+ rc->hub.units = string2str(rc->units);
+ rc->hub.family = string2str(rc->family);
+
+ // delete it from SQL
+ if(ctx_delete_context(&rc->rrdhost->host_uuid, &rc->hub) != 0)
+ error("RRDCONTEXT: failed to delete context '%s' version %"PRIu64" from SQL.", rc->hub.id, rc->hub.version);
+}
+
+static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jobs) {
+
+ internal_error(true, "RRDCONTEXT: garbage collecting context structures of host '%s'", rrdhost_hostname(host));
+
+ RRDCONTEXT *rc;
+ dfe_start_reentrant(host->rrdctx.contexts, rc) {
+ if(unlikely(worker_jobs && !service_running(SERVICE_CONTEXT))) break;
+
+ if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP);
+
+ rrdcontext_lock(rc);
+
+ RRDINSTANCE *ri;
+ dfe_start_reentrant(rc->rrdinstances, ri) {
+ if(unlikely(worker_jobs && !service_running(SERVICE_CONTEXT))) break;
+
+ RRDMETRIC *rm;
+ dfe_start_write(ri->rrdmetrics, rm) {
+ if(rrdmetric_should_be_deleted(rm)) {
+ if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
+ if(!dictionary_del(ri->rrdmetrics, string2str(rm->id)))
+ error("RRDCONTEXT: metric '%s' of instance '%s' of context '%s' of host '%s', failed to be deleted from rrdmetrics dictionary.",
+ string2str(rm->id),
+ string2str(ri->id),
+ string2str(rc->id),
+ rrdhost_hostname(host));
+ else
+ internal_error(
+ true,
+ "RRDCONTEXT: metric '%s' of instance '%s' of context '%s' of host '%s', deleted from rrdmetrics dictionary.",
+ string2str(rm->id),
+ string2str(ri->id),
+ string2str(rc->id),
+ rrdhost_hostname(host));
+ }
+ }
+ dfe_done(rm);
+
+ if(rrdinstance_should_be_deleted(ri)) {
+ if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
+ if(!dictionary_del(rc->rrdinstances, string2str(ri->id)))
+ error("RRDCONTEXT: instance '%s' of context '%s' of host '%s', failed to be deleted from rrdmetrics dictionary.",
+ string2str(ri->id),
+ string2str(rc->id),
+ rrdhost_hostname(host));
+ else
+ internal_error(
+ true,
+ "RRDCONTEXT: instance '%s' of context '%s' of host '%s', deleted from rrdmetrics dictionary.",
+ string2str(ri->id),
+ string2str(rc->id),
+ rrdhost_hostname(host));
+ }
+ }
+ dfe_done(ri);
+
+ if(unlikely(rrdcontext_should_be_deleted(rc))) {
+ if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
+ rrdcontext_dequeue_from_post_processing(rc);
+ rrdcontext_delete_from_sql_unsafe(rc);
+
+ if(!dictionary_del(host->rrdctx.contexts, string2str(rc->id)))
+ error("RRDCONTEXT: context '%s' of host '%s', failed to be deleted from rrdmetrics dictionary.",
+ string2str(rc->id),
+ rrdhost_hostname(host));
+ else
+ internal_error(
+ true,
+ "RRDCONTEXT: context '%s' of host '%s', deleted from rrdmetrics dictionary.",
+ string2str(rc->id),
+ rrdhost_hostname(host));
+ }
+
+ // the item is referenced in the dictionary
+ // so, it is still here to unlock, even if we have deleted it
+ rrdcontext_unlock(rc);
+ }
+ dfe_done(rc);
+}
+
+static void rrdcontext_garbage_collect_for_all_hosts(void) {
+ RRDHOST *host;
+ dfe_start_reentrant(rrdhost_root_index, host) {
+ rrdcontext_garbage_collect_single_host(host, true);
+ }
+ dfe_done(host);
+}
+
+// ----------------------------------------------------------------------------
+// post processing
+
+static void rrdmetric_process_updates(RRDMETRIC *rm, bool force, RRD_FLAGS reason, bool worker_jobs) {
+ if(reason != RRD_FLAG_NONE)
+ rrd_flag_set_updated(rm, reason);
+
+ if(!force && !rrd_flag_is_updated(rm) && rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION) && !rrd_flag_check(rm, RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION))
+ return;
+
+ if(worker_jobs)
+ worker_is_busy(WORKER_JOB_PP_METRIC);
+
+ if(reason & RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD) {
+ rrd_flag_set_archived(rm);
+ rrd_flag_set(rm, RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD);
+ }
+ if(rrd_flag_is_deleted(rm) && (reason & RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION))
+ rrd_flag_set_archived(rm);
+
+ rrdmetric_update_retention(rm);
+
+ rrd_flag_unset_updated(rm);
+}
+
+static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FLAGS reason, bool worker_jobs) {
+ if(reason != RRD_FLAG_NONE)
+ rrd_flag_set_updated(ri, reason);
+
+ if(!force && !rrd_flag_is_updated(ri) && rrd_flag_check(ri, RRD_FLAG_LIVE_RETENTION))
+ return;
+
+ if(worker_jobs)
+ worker_is_busy(WORKER_JOB_PP_INSTANCE);
+
+ time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
+ size_t metrics_active = 0, metrics_deleted = 0;
+ bool live_retention = true, currently_collected = false;
+ if(dictionary_entries(ri->rrdmetrics) > 0) {
+ RRDMETRIC *rm;
+ dfe_start_read((DICTIONARY *)ri->rrdmetrics, rm) {
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+
+ RRD_FLAGS reason_to_pass = reason;
+ if(rrd_flag_check(ri, RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION))
+ reason_to_pass |= RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION;
+
+ rrdmetric_process_updates(rm, force, reason_to_pass, worker_jobs);
+
+ if(unlikely(!rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION)))
+ live_retention = false;
+
+ if (unlikely((rrdmetric_should_be_deleted(rm)))) {
+ metrics_deleted++;
+ continue;
+ }
+
+ if(!currently_collected && rrd_flag_check(rm, RRD_FLAG_COLLECTED) && rm->first_time_s)
+ currently_collected = true;
+
+ metrics_active++;
+
+ if (rm->first_time_s && rm->first_time_s < min_first_time_t)
+ min_first_time_t = rm->first_time_s;
+
+ if (rm->last_time_s && rm->last_time_s > max_last_time_t)
+ max_last_time_t = rm->last_time_s;
+ }
+ dfe_done(rm);
+ }
+
+ if(unlikely(live_retention && !rrd_flag_check(ri, RRD_FLAG_LIVE_RETENTION)))
+ rrd_flag_set(ri, RRD_FLAG_LIVE_RETENTION);
+ else if(unlikely(!live_retention && rrd_flag_check(ri, RRD_FLAG_LIVE_RETENTION)))
+ rrd_flag_clear(ri, RRD_FLAG_LIVE_RETENTION);
+
+ if(unlikely(!metrics_active)) {
+ // no metrics available
+
+ if(ri->first_time_s) {
+ ri->first_time_s = 0;
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if(ri->last_time_s) {
+ ri->last_time_s = 0;
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ rrd_flag_set_deleted(ri, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+ }
+ else {
+ // we have active metrics...
+
+ if (unlikely(min_first_time_t == LONG_MAX))
+ min_first_time_t = 0;
+
+ if (unlikely(min_first_time_t == 0 || max_last_time_t == 0)) {
+ if(ri->first_time_s) {
+ ri->first_time_s = 0;
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if(ri->last_time_s) {
+ ri->last_time_s = 0;
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ if(likely(live_retention))
+ rrd_flag_set_deleted(ri, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+ }
+ else {
+ rrd_flag_clear(ri, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+
+ if (unlikely(ri->first_time_s != min_first_time_t)) {
+ ri->first_time_s = min_first_time_t;
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if (unlikely(ri->last_time_s != max_last_time_t)) {
+ ri->last_time_s = max_last_time_t;
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ if(likely(currently_collected))
+ rrd_flag_set_collected(ri);
+ else
+ rrd_flag_set_archived(ri);
+ }
+ }
+
+ rrd_flag_unset_updated(ri);
+}
+
+static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs) {
+ if(reason != RRD_FLAG_NONE)
+ rrd_flag_set_updated(rc, reason);
+
+ if(worker_jobs)
+ worker_is_busy(WORKER_JOB_PP_CONTEXT);
+
+ size_t min_priority_collected = LONG_MAX;
+ size_t min_priority_not_collected = LONG_MAX;
+ size_t min_priority = LONG_MAX;
+ time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
+ size_t instances_active = 0, instances_deleted = 0;
+ bool live_retention = true, currently_collected = false, hidden = true;
+ if(dictionary_entries(rc->rrdinstances) > 0) {
+ RRDINSTANCE *ri;
+ dfe_start_reentrant(rc->rrdinstances, ri) {
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+
+ RRD_FLAGS reason_to_pass = reason;
+ if(rrd_flag_check(rc, RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION))
+ reason_to_pass |= RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION;
+
+ rrdinstance_post_process_updates(ri, force, reason_to_pass, worker_jobs);
+
+ if(unlikely(hidden && !rrd_flag_check(ri, RRD_FLAG_HIDDEN)))
+ hidden = false;
+
+ if(unlikely(live_retention && !rrd_flag_check(ri, RRD_FLAG_LIVE_RETENTION)))
+ live_retention = false;
+
+ if (unlikely(rrdinstance_should_be_deleted(ri))) {
+ instances_deleted++;
+ continue;
+ }
+
+ if(unlikely(!currently_collected && rrd_flag_is_collected(ri) && ri->first_time_s))
+ currently_collected = true;
+
+ internal_error(rc->units != ri->units,
+ "RRDCONTEXT: '%s' rrdinstance '%s' has different units, context '%s', instance '%s'",
+ string2str(rc->id), string2str(ri->id),
+ string2str(rc->units), string2str(ri->units));
+
+ instances_active++;
+
+ if (ri->priority >= RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY) {
+ if(rrd_flag_check(ri, RRD_FLAG_COLLECTED)) {
+ if(ri->priority < min_priority_collected)
+ min_priority_collected = ri->priority;
+ }
+ else {
+ if(ri->priority < min_priority_not_collected)
+ min_priority_not_collected = ri->priority;
+ }
+ }
+
+ if (ri->first_time_s && ri->first_time_s < min_first_time_t)
+ min_first_time_t = ri->first_time_s;
+
+ if (ri->last_time_s && ri->last_time_s > max_last_time_t)
+ max_last_time_t = ri->last_time_s;
+ }
+ dfe_done(ri);
+
+ if(min_priority_collected != LONG_MAX)
+ // use the collected priority
+ min_priority = min_priority_collected;
+ else
+ // use the non-collected priority
+ min_priority = min_priority_not_collected;
+ }
+
+ {
+ bool previous_hidden = rrd_flag_check(rc, RRD_FLAG_HIDDEN);
+ if (hidden != previous_hidden) {
+ if (hidden && !rrd_flag_check(rc, RRD_FLAG_HIDDEN))
+ rrd_flag_set(rc, RRD_FLAG_HIDDEN);
+ else if (!hidden && rrd_flag_check(rc, RRD_FLAG_HIDDEN))
+ rrd_flag_clear(rc, RRD_FLAG_HIDDEN);
+ }
+
+ bool previous_live_retention = rrd_flag_check(rc, RRD_FLAG_LIVE_RETENTION);
+ if (live_retention != previous_live_retention) {
+ if (live_retention && !rrd_flag_check(rc, RRD_FLAG_LIVE_RETENTION))
+ rrd_flag_set(rc, RRD_FLAG_LIVE_RETENTION);
+ else if (!live_retention && rrd_flag_check(rc, RRD_FLAG_LIVE_RETENTION))
+ rrd_flag_clear(rc, RRD_FLAG_LIVE_RETENTION);
+ }
+ }
+
+ rrdcontext_lock(rc);
+ rc->pp.executions++;
+
+ if(unlikely(!instances_active)) {
+ // we had some instances, but they are gone now...
+
+ if(rc->first_time_s) {
+ rc->first_time_s = 0;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if(rc->last_time_s) {
+ rc->last_time_s = 0;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ rrd_flag_set_deleted(rc, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+ }
+ else {
+ // we have some active instances...
+
+ if (unlikely(min_first_time_t == LONG_MAX))
+ min_first_time_t = 0;
+
+ if (unlikely(min_first_time_t == 0 && max_last_time_t == 0)) {
+ if(rc->first_time_s) {
+ rc->first_time_s = 0;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if(rc->last_time_s) {
+ rc->last_time_s = 0;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ rrd_flag_set_deleted(rc, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+ }
+ else {
+ rrd_flag_clear(rc, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+
+ if (unlikely(rc->first_time_s != min_first_time_t)) {
+ rc->first_time_s = min_first_time_t;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if (rc->last_time_s != max_last_time_t) {
+ rc->last_time_s = max_last_time_t;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ if(likely(currently_collected))
+ rrd_flag_set_collected(rc);
+ else
+ rrd_flag_set_archived(rc);
+ }
+
+ if (min_priority != LONG_MAX && rc->priority != min_priority) {
+ rc->priority = min_priority;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
+ }
+ }
+
+ if(unlikely(rrd_flag_is_updated(rc) && rc->rrdhost->rrdctx.hub_queue)) {
+ if(check_if_cloud_version_changed_unsafe(rc, false)) {
+ rc->version = rrdcontext_get_next_version(rc);
+ dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx.hub_queue,
+ string2str(rc->id), rc, sizeof(*rc));
+ }
+ }
+
+ rrd_flag_unset_updated(rc);
+ rrdcontext_unlock(rc);
+}
+
+void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function __maybe_unused, RRD_FLAGS flags __maybe_unused) {
+ if(unlikely(!rc->rrdhost->rrdctx.pp_queue)) return;
+
+ if(!rrd_flag_check(rc, RRD_FLAG_QUEUED_FOR_PP)) {
+ dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx.pp_queue,
+ string2str(rc->id),
+ rc,
+ sizeof(*rc));
+
+#if(defined(NETDATA_INTERNAL_CHECKS) && defined(LOG_POST_PROCESSING_QUEUE_INSERTIONS))
+ {
+ BUFFER *wb_flags = buffer_create(1000);
+ rrd_flags_to_buffer(flags, wb_flags);
+
+ BUFFER *wb_reasons = buffer_create(1000);
+ rrd_reasons_to_buffer(flags, wb_reasons);
+
+ internal_error(true, "RRDCONTEXT: '%s' update triggered by function %s(), due to flags: %s, reasons: %s",
+ string2str(rc->id), function,
+ buffer_tostring(wb_flags),
+ buffer_tostring(wb_reasons));
+
+ buffer_free(wb_reasons);
+ buffer_free(wb_flags);
+ }
+#endif
+ }
+}
+
+static void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc) {
+ if(unlikely(!rc->rrdhost->rrdctx.pp_queue)) return;
+ dictionary_del(rc->rrdhost->rrdctx.pp_queue, string2str(rc->id));
+}
+
+static void rrdcontext_post_process_queued_contexts(RRDHOST *host) {
+ if(unlikely(!host->rrdctx.pp_queue)) return;
+
+ RRDCONTEXT *rc;
+ dfe_start_reentrant(host->rrdctx.pp_queue, rc) {
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+
+ rrdcontext_dequeue_from_post_processing(rc);
+ rrdcontext_post_process_updates(rc, false, RRD_FLAG_NONE, true);
+ }
+ dfe_done(rc);
+}
+
+// ----------------------------------------------------------------------------
+// dispatching contexts to cloud
+
+static uint64_t rrdcontext_get_next_version(RRDCONTEXT *rc) {
+ time_t now = now_realtime_sec();
+ uint64_t version = MAX(rc->version, rc->hub.version);
+ version = MAX((uint64_t)now, version);
+ version++;
+ return version;
+}
+
+void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe_unused, void *bundle __maybe_unused) {
+
+ // save it, so that we know the last version we sent to hub
+ rc->version = rc->hub.version = rrdcontext_get_next_version(rc);
+ rc->hub.id = string2str(rc->id);
+ rc->hub.title = string2str(rc->title);
+ rc->hub.units = string2str(rc->units);
+ rc->hub.family = string2str(rc->family);
+ rc->hub.chart_type = rrdset_type_name(rc->chart_type);
+ rc->hub.priority = rc->priority;
+ rc->hub.first_time_s = rc->first_time_s;
+ rc->hub.last_time_s = rrd_flag_is_collected(rc) ? 0 : rc->last_time_s;
+ rc->hub.deleted = rrd_flag_is_deleted(rc) ? true : false;
+
+#ifdef ENABLE_ACLK
+ struct context_updated message = {
+ .id = rc->hub.id,
+ .version = rc->hub.version,
+ .title = rc->hub.title,
+ .units = rc->hub.units,
+ .family = rc->hub.family,
+ .chart_type = rc->hub.chart_type,
+ .priority = rc->hub.priority,
+ .first_entry = rc->hub.first_time_s,
+ .last_entry = rc->hub.last_time_s,
+ .deleted = rc->hub.deleted,
+ };
+
+ if(likely(!rrd_flag_check(rc, RRD_FLAG_HIDDEN))) {
+ if (snapshot) {
+ if (!rc->hub.deleted)
+ contexts_snapshot_add_ctx_update(bundle, &message);
+ }
+ else
+ contexts_updated_add_ctx_update(bundle, &message);
+ }
+#endif
+
+ // store it to SQL
+
+ if(rrd_flag_is_deleted(rc))
+ rrdcontext_delete_from_sql_unsafe(rc);
+
+ else if (ctx_store_context(&rc->rrdhost->host_uuid, &rc->hub) != 0)
+ error("RRDCONTEXT: failed to save context '%s' version %"PRIu64" to SQL.", rc->hub.id, rc->hub.version);
+}
+
+static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused) {
+ bool id_changed = false,
+ title_changed = false,
+ units_changed = false,
+ family_changed = false,
+ chart_type_changed = false,
+ priority_changed = false,
+ first_time_changed = false,
+ last_time_changed = false,
+ deleted_changed = false;
+
+ RRD_FLAGS flags = rrd_flags_get(rc);
+
+ if(unlikely(string2str(rc->id) != rc->hub.id))
+ id_changed = true;
+
+ if(unlikely(string2str(rc->title) != rc->hub.title))
+ title_changed = true;
+
+ if(unlikely(string2str(rc->units) != rc->hub.units))
+ units_changed = true;
+
+ if(unlikely(string2str(rc->family) != rc->hub.family))
+ family_changed = true;
+
+ if(unlikely(rrdset_type_name(rc->chart_type) != rc->hub.chart_type))
+ chart_type_changed = true;
+
+ if(unlikely(rc->priority != rc->hub.priority))
+ priority_changed = true;
+
+ if(unlikely((uint64_t)rc->first_time_s != rc->hub.first_time_s))
+ first_time_changed = true;
+
+ if(unlikely((uint64_t)((flags & RRD_FLAG_COLLECTED) ? 0 : rc->last_time_s) != rc->hub.last_time_s))
+ last_time_changed = true;
+
+ if(unlikely(((flags & RRD_FLAG_DELETED) ? true : false) != rc->hub.deleted))
+ deleted_changed = true;
+
+ if(unlikely(id_changed || title_changed || units_changed || family_changed || chart_type_changed || priority_changed || first_time_changed || last_time_changed || deleted_changed)) {
+
+ internal_error(LOG_TRANSITIONS,
+ "RRDCONTEXT: %s NEW VERSION '%s'%s of host '%s', version %"PRIu64", title '%s'%s, units '%s'%s, family '%s'%s, chart type '%s'%s, priority %u%s, first_time_t %ld%s, last_time_t %ld%s, deleted '%s'%s, (queued for %llu ms, expected %llu ms)",
+ sending?"SENDING":"QUEUE",
+ string2str(rc->id), id_changed ? " (CHANGED)" : "",
+ rrdhost_hostname(rc->rrdhost),
+ rc->version,
+ string2str(rc->title), title_changed ? " (CHANGED)" : "",
+ string2str(rc->units), units_changed ? " (CHANGED)" : "",
+ string2str(rc->family), family_changed ? " (CHANGED)" : "",
+ rrdset_type_name(rc->chart_type), chart_type_changed ? " (CHANGED)" : "",
+ rc->priority, priority_changed ? " (CHANGED)" : "",
+ rc->first_time_s, first_time_changed ? " (CHANGED)" : "",
+ (flags & RRD_FLAG_COLLECTED) ? 0 : rc->last_time_s, last_time_changed ? " (CHANGED)" : "",
+ (flags & RRD_FLAG_DELETED) ? "true" : "false", deleted_changed ? " (CHANGED)" : "",
+ sending ? (now_realtime_usec() - rc->queue.queued_ut) / USEC_PER_MS : 0,
+ sending ? (rc->queue.scheduled_dispatch_ut - rc->queue.queued_ut) / USEC_PER_MS : 0
+ );
+
+ return true;
+ }
+
+ return false;
+}
+
+static inline usec_t rrdcontext_calculate_queued_dispatch_time_ut(RRDCONTEXT *rc, usec_t now_ut) {
+
+ if(likely(rc->queue.delay_calc_ut >= rc->queue.queued_ut))
+ return rc->queue.scheduled_dispatch_ut;
+
+ RRD_FLAGS flags = rc->queue.queued_flags;
+
+ usec_t delay = LONG_MAX;
+ int i;
+ struct rrdcontext_reason *reason;
+ for(i = 0, reason = &rrdcontext_reasons[i]; reason->name ; reason = &rrdcontext_reasons[++i]) {
+ if(unlikely(flags & reason->flag)) {
+ if(reason->delay_ut < delay)
+ delay = reason->delay_ut;
+ }
+ }
+
+ if(unlikely(delay == LONG_MAX)) {
+ internal_error(true, "RRDCONTEXT: '%s', cannot find minimum delay of flags %x", string2str(rc->id), (unsigned int)flags);
+ delay = 60 * USEC_PER_SEC;
+ }
+
+ rc->queue.delay_calc_ut = now_ut;
+ usec_t dispatch_ut = rc->queue.scheduled_dispatch_ut = rc->queue.queued_ut + delay;
+ return dispatch_ut;
+}
+
+static void rrdcontext_dequeue_from_hub_queue(RRDCONTEXT *rc) {
+ dictionary_del(rc->rrdhost->rrdctx.hub_queue, string2str(rc->id));
+}
+
+static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now_ut) {
+
+ // check if we have received a streaming command for this host
+ if(!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS) || !aclk_connected || !host->rrdctx.hub_queue)
+ return;
+
+ // check if there are queued items to send
+ if(!dictionary_entries(host->rrdctx.hub_queue))
+ return;
+
+ if(!host->node_id)
+ return;
+
+ size_t messages_added = 0;
+ contexts_updated_t bundle = NULL;
+
+ RRDCONTEXT *rc;
+ dfe_start_reentrant(host->rrdctx.hub_queue, rc) {
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+
+ if(unlikely(messages_added >= MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST))
+ break;
+
+ worker_is_busy(WORKER_JOB_QUEUED);
+ usec_t dispatch_ut = rrdcontext_calculate_queued_dispatch_time_ut(rc, now_ut);
+ char *claim_id = get_agent_claimid();
+
+ if(unlikely(now_ut >= dispatch_ut) && claim_id) {
+ worker_is_busy(WORKER_JOB_CHECK);
+
+ rrdcontext_lock(rc);
+
+ if(check_if_cloud_version_changed_unsafe(rc, true)) {
+ worker_is_busy(WORKER_JOB_SEND);
+
+#ifdef ENABLE_ACLK
+ if(!bundle) {
+ // prepare the bundle to send the messages
+ char uuid[UUID_STR_LEN];
+ uuid_unparse_lower(*host->node_id, uuid);
+
+ bundle = contexts_updated_new(claim_id, uuid, 0, now_ut);
+ }
+#endif
+ // update the hub data of the context, give a new version, pack the message
+ // and save an update to SQL
+ rrdcontext_message_send_unsafe(rc, false, bundle);
+ messages_added++;
+
+ rc->queue.dispatches++;
+ rc->queue.dequeued_ut = now_ut;
+ }
+ else
+ rc->version = rc->hub.version;
+
+ // remove it from the queue
+ worker_is_busy(WORKER_JOB_DEQUEUE);
+ rrdcontext_dequeue_from_hub_queue(rc);
+
+ if(unlikely(rrdcontext_should_be_deleted(rc))) {
+ // this is a deleted context - delete it forever...
+
+ worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
+
+ rrdcontext_dequeue_from_post_processing(rc);
+ rrdcontext_delete_from_sql_unsafe(rc);
+
+ STRING *id = string_dup(rc->id);
+ rrdcontext_unlock(rc);
+
+ // delete it from the master dictionary
+ if(!dictionary_del(host->rrdctx.contexts, string2str(rc->id)))
+ error("RRDCONTEXT: '%s' of host '%s' failed to be deleted from rrdcontext dictionary.",
+ string2str(id), rrdhost_hostname(host));
+
+ string_freez(id);
+ }
+ else
+ rrdcontext_unlock(rc);
+ }
+ freez(claim_id);
+ }
+ dfe_done(rc);
+
+#ifdef ENABLE_ACLK
+ if(service_running(SERVICE_CONTEXT) && bundle) {
+ // we have a bundle to send messages
+
+ // update the version hash
+ contexts_updated_update_version_hash(bundle, rrdcontext_version_hash(host));
+
+ // send it
+ aclk_send_contexts_updated(bundle);
+ }
+ else if(bundle)
+ contexts_updated_delete(bundle);
+#endif
+
+}
+
+// ----------------------------------------------------------------------------
+// worker thread
+
+static void rrdcontext_main_cleanup(void *ptr) {
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ // custom code
+ worker_unregister();
+
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+void *rrdcontext_main(void *ptr) {
+ netdata_thread_cleanup_push(rrdcontext_main_cleanup, ptr);
+
+ worker_register("RRDCONTEXT");
+ worker_register_job_name(WORKER_JOB_HOSTS, "hosts");
+ worker_register_job_name(WORKER_JOB_CHECK, "dedup checks");
+ worker_register_job_name(WORKER_JOB_SEND, "sent contexts");
+ worker_register_job_name(WORKER_JOB_DEQUEUE, "deduplicated contexts");
+ worker_register_job_name(WORKER_JOB_RETENTION, "metrics retention");
+ worker_register_job_name(WORKER_JOB_QUEUED, "queued contexts");
+ worker_register_job_name(WORKER_JOB_CLEANUP, "cleanups");
+ worker_register_job_name(WORKER_JOB_CLEANUP_DELETE, "deletes");
+ worker_register_job_name(WORKER_JOB_PP_METRIC, "check metrics");
+ worker_register_job_name(WORKER_JOB_PP_INSTANCE, "check instances");
+ worker_register_job_name(WORKER_JOB_PP_CONTEXT, "check contexts");
+
+ worker_register_job_custom_metric(WORKER_JOB_HUB_QUEUE_SIZE, "hub queue size", "contexts", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_PP_QUEUE_SIZE, "post processing queue size", "contexts", WORKER_METRIC_ABSOLUTE);
+
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+ usec_t step = RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC;
+
+ while (service_running(SERVICE_CONTEXT)) {
+ worker_is_idle();
+ heartbeat_next(&hb, step);
+
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+
+ usec_t now_ut = now_realtime_usec();
+
+ if(rrdcontext_next_db_rotation_ut && now_ut > rrdcontext_next_db_rotation_ut) {
+ rrdcontext_recalculate_retention_all_hosts();
+ rrdcontext_garbage_collect_for_all_hosts();
+ rrdcontext_next_db_rotation_ut = 0;
+ }
+
+ size_t hub_queued_contexts_for_all_hosts = 0;
+ size_t pp_queued_contexts_for_all_hosts = 0;
+
+ RRDHOST *host;
+ dfe_start_reentrant(rrdhost_root_index, host) {
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+
+ worker_is_busy(WORKER_JOB_HOSTS);
+
+ if(host->rrdctx.pp_queue) {
+ pp_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.pp_queue);
+ rrdcontext_post_process_queued_contexts(host);
+ dictionary_garbage_collect(host->rrdctx.pp_queue);
+ }
+
+ if(host->rrdctx.hub_queue) {
+ hub_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.hub_queue);
+ rrdcontext_dispatch_queued_contexts_to_hub(host, now_ut);
+ dictionary_garbage_collect(host->rrdctx.hub_queue);
+ }
+
+ if (host->rrdctx.contexts)
+ dictionary_garbage_collect(host->rrdctx.contexts);
+ }
+ dfe_done(host);
+
+ worker_set_metric(WORKER_JOB_HUB_QUEUE_SIZE, (NETDATA_DOUBLE)hub_queued_contexts_for_all_hosts);
+ worker_set_metric(WORKER_JOB_PP_QUEUE_SIZE, (NETDATA_DOUBLE)pp_queued_contexts_for_all_hosts);
+ }
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}