summaryrefslogtreecommitdiffstats
path: root/src/database/contexts
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:23 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:44 +0000
commit836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch)
tree1604da8f482d02effa033c94a84be42bc0c848c3 /src/database/contexts
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz
netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/database/contexts/README.md (renamed from database/contexts/README.md)0
-rw-r--r--src/database/contexts/api_v1.c (renamed from database/contexts/api_v1.c)8
-rw-r--r--src/database/contexts/api_v2.c (renamed from database/contexts/api_v2.c)442
-rw-r--r--src/database/contexts/context.c342
-rw-r--r--src/database/contexts/instance.c (renamed from database/contexts/instance.c)6
-rw-r--r--src/database/contexts/internal.h388
-rw-r--r--src/database/contexts/metric.c327
-rw-r--r--src/database/contexts/query_scope.c (renamed from database/contexts/query_scope.c)0
-rw-r--r--src/database/contexts/query_target.c (renamed from database/contexts/query_target.c)60
-rw-r--r--src/database/contexts/rrdcontext.c (renamed from database/contexts/rrdcontext.c)25
-rw-r--r--src/database/contexts/rrdcontext.h (renamed from database/contexts/rrdcontext.h)39
-rw-r--r--src/database/contexts/worker.c (renamed from database/contexts/worker.c)192
12 files changed, 1595 insertions, 234 deletions
diff --git a/database/contexts/README.md b/src/database/contexts/README.md
index e69de29bb..e69de29bb 100644
--- a/database/contexts/README.md
+++ b/src/database/contexts/README.md
diff --git a/database/contexts/api_v1.c b/src/database/contexts/api_v1.c
index f144e6f7b..355aaf91a 100644
--- a/database/contexts/api_v1.c
+++ b/src/database/contexts/api_v1.c
@@ -131,13 +131,13 @@ static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void
if(before && (!ri->first_time_s || before < ri->first_time_s))
return 0;
- if(t_parent->chart_label_key && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, t_parent->chart_label_key,
- '\0', NULL))
+ if(t_parent->chart_label_key && rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, t_parent->chart_label_key,
+ '\0', NULL) != SP_MATCHED_POSITIVE)
return 0;
- if(t_parent->chart_labels_filter && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels,
+ if(t_parent->chart_labels_filter && rrdlabels_match_simple_pattern_parsed(ri->rrdlabels,
t_parent->chart_labels_filter, ':',
- NULL))
+ NULL) != SP_MATCHED_POSITIVE)
return 0;
time_t first_time_s = ri->first_time_s;
diff --git a/database/contexts/api_v2.c b/src/database/contexts/api_v2.c
index 3ca49a319..07cd3ac83 100644
--- a/database/contexts/api_v2.c
+++ b/src/database/contexts/api_v2.c
@@ -167,6 +167,9 @@ struct function_v2_entry {
size_t used;
size_t *node_ids;
STRING *help;
+ STRING *tags;
+ HTTP_ACCESS access;
+ int priority;
};
struct context_v2_entry {
@@ -180,24 +183,45 @@ struct context_v2_entry {
FTS_MATCH match;
};
+struct alert_counts {
+ size_t critical;
+ size_t warning;
+ size_t clear;
+ size_t error;
+};
+
struct alert_v2_entry {
RRDCALC *tmp;
STRING *name;
STRING *summary;
+ RRDLABELS *recipient;
+ RRDLABELS *classification;
+ RRDLABELS *context;
+ RRDLABELS *component;
+ RRDLABELS *type;
size_t ati;
- size_t critical;
- size_t warning;
- size_t clear;
- size_t error;
+ struct alert_counts counts;
size_t instances;
DICTIONARY *nodes;
DICTIONARY *configs;
};
+struct alert_by_x_entry {
+ struct {
+ struct alert_counts counts;
+ size_t silent;
+ size_t total;
+ } running;
+
+ struct {
+ size_t available;
+ } prototypes;
+};
+
typedef struct full_text_search_index {
size_t searches;
size_t string_searches;
@@ -251,8 +275,14 @@ struct rrdcontext_to_json_v2_data {
size_t ati;
- DICTIONARY *alerts;
+ DICTIONARY *summary;
DICTIONARY *alert_instances;
+
+ DICTIONARY *by_type;
+ DICTIONARY *by_component;
+ DICTIONARY *by_classification;
+ DICTIONARY *by_recipient;
+ DICTIONARY *by_module;
} alerts;
struct {
@@ -276,9 +306,7 @@ struct rrdcontext_to_json_v2_data {
struct query_timings timings;
};
-static void alerts_v2_add(struct alert_v2_entry *t, RRDCALC *rc) {
- t->instances++;
-
+static void alert_counts_add(struct alert_counts *t, RRDCALC *rc) {
switch(rc->status) {
case RRDCALC_STATUS_CRITICAL:
t->critical++;
@@ -303,20 +331,66 @@ static void alerts_v2_add(struct alert_v2_entry *t, RRDCALC *rc) {
break;
}
+}
+
+static void alerts_v2_add(struct alert_v2_entry *t, RRDCALC *rc) {
+ t->instances++;
+
+ alert_counts_add(&t->counts, rc);
dictionary_set(t->nodes, rc->rrdset->rrdhost->machine_guid, NULL, 0);
char key[UUID_STR_LEN + 1];
- uuid_unparse_lower(rc->config_hash_id, key);
+ uuid_unparse_lower(rc->config.hash_id, key);
dictionary_set(t->configs, key, NULL, 0);
}
+static void alerts_by_x_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data) {
+ static STRING *silent = NULL;
+ if(unlikely(!silent)) silent = string_strdupz("silent");
+
+ struct alert_by_x_entry *b = value;
+ RRDCALC *rc = data;
+ if(!rc) {
+ // prototype
+ b->prototypes.available++;
+ }
+ else {
+ alert_counts_add(&b->running.counts, rc);
+
+ b->running.total++;
+
+ if (rc->config.recipient == silent)
+ b->running.silent++;
+ }
+}
+
+static bool alerts_by_x_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value __maybe_unused, void *data __maybe_unused) {
+ alerts_by_x_insert_callback(item, old_value, data);
+ return false;
+}
+
static void alerts_v2_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data) {
struct rrdcontext_to_json_v2_data *ctl = data;
struct alert_v2_entry *t = value;
RRDCALC *rc = t->tmp;
- t->name = rc->name;
- t->summary = rc->summary;
+ t->name = rc->config.name;
+ t->summary = rc->config.summary; // the original summary
+ t->context = rrdlabels_create();
+ t->recipient = rrdlabels_create();
+ t->classification = rrdlabels_create();
+ t->component = rrdlabels_create();
+ t->type = rrdlabels_create();
+ if (string_strlen(rc->rrdset->context))
+ rrdlabels_add(t->context, string2str(rc->rrdset->context), "yes", RRDLABEL_SRC_AUTO);
+ if (string_strlen(rc->config.recipient))
+ rrdlabels_add(t->recipient, string2str(rc->config.recipient), "yes", RRDLABEL_SRC_AUTO);
+ if (string_strlen(rc->config.classification))
+ rrdlabels_add(t->classification, string2str(rc->config.classification), "yes", RRDLABEL_SRC_AUTO);
+ if (string_strlen(rc->config.component))
+ rrdlabels_add(t->component, string2str(rc->config.component), "yes", RRDLABEL_SRC_AUTO);
+ if (string_strlen(rc->config.type))
+ rrdlabels_add(t->type, string2str(rc->config.type), "yes", RRDLABEL_SRC_AUTO);
t->ati = ctl->alerts.ati++;
t->nodes = dictionary_create(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_VALUE_LINK_DONT_CLONE|DICT_OPTION_NAME_LINK_DONT_CLONE);
@@ -328,12 +402,29 @@ static void alerts_v2_insert_callback(const DICTIONARY_ITEM *item __maybe_unused
static bool alerts_v2_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *data __maybe_unused) {
struct alert_v2_entry *t = old_value, *n = new_value;
RRDCALC *rc = n->tmp;
+ if (string_strlen(rc->rrdset->context))
+ rrdlabels_add(t->context, string2str(rc->rrdset->context), "yes", RRDLABEL_SRC_AUTO);
+ if (string_strlen(rc->config.recipient))
+ rrdlabels_add(t->recipient, string2str(rc->config.recipient), "yes", RRDLABEL_SRC_AUTO);
+ if (string_strlen(rc->config.classification))
+ rrdlabels_add(t->classification, string2str(rc->config.classification), "yes", RRDLABEL_SRC_AUTO);
+ if (string_strlen(rc->config.component))
+ rrdlabels_add(t->component, string2str(rc->config.component), "yes", RRDLABEL_SRC_AUTO);
+ if (string_strlen(rc->config.type))
+ rrdlabels_add(t->type, string2str(rc->config.type), "yes", RRDLABEL_SRC_AUTO);
alerts_v2_add(t, rc);
return true;
}
static void alerts_v2_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
struct alert_v2_entry *t = value;
+
+ rrdlabels_destroy(t->context);
+ rrdlabels_destroy(t->recipient);
+ rrdlabels_destroy(t->classification);
+ rrdlabels_destroy(t->component);
+ rrdlabels_destroy(t->type);
+
dictionary_destroy(t->nodes);
dictionary_destroy(t->configs);
}
@@ -347,16 +438,16 @@ static void alert_instances_v2_insert_callback(const DICTIONARY_ITEM *item __may
t->chart_id = rc->rrdset->id;
t->chart_name = rc->rrdset->name;
t->family = rc->rrdset->family;
- t->units = rc->units;
- t->classification = rc->classification;
- t->type = rc->type;
- t->recipient = rc->recipient;
- t->component = rc->component;
- t->name = rc->name;
- t->source = rc->source;
+ t->units = rc->config.units;
+ t->classification = rc->config.classification;
+ t->type = rc->config.type;
+ t->recipient = rc->config.recipient;
+ t->component = rc->config.component;
+ t->name = rc->config.name;
+ t->source = rc->config.source;
t->status = rc->status;
t->flags = rc->run_flags;
- t->info = rc->info;
+ t->info = rc->config.info;
t->summary = rc->summary;
t->value = rc->value;
t->last_updated = rc->last_updated;
@@ -365,12 +456,9 @@ static void alert_instances_v2_insert_callback(const DICTIONARY_ITEM *item __may
t->host = rc->rrdset->rrdhost;
t->alarm_id = rc->id;
t->ni = ctl->nodes.ni;
- t->global_id = rc->ae ? rc->ae->global_id : 0;
- t->name = rc->name;
- uuid_copy(t->config_hash_id, rc->config_hash_id);
- if(rc->ae)
- uuid_copy(t->last_transition_id, rc->ae->transition_id);
+ uuid_copy(t->config_hash_id, rc->config.hash_id);
+ health_alarm_log_get_global_id_and_transition_id_for_rrdcalc(rc, &t->global_id, &t->last_transition_id);
}
static bool alert_instances_v2_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value __maybe_unused, void *new_value __maybe_unused, void *data __maybe_unused) {
@@ -422,7 +510,7 @@ static FTS_MATCH rrdcontext_to_json_v2_full_text_search(struct rrdcontext_to_jso
size_t label_searches = 0;
if(unlikely(ri->rrdlabels && rrdlabels_entries(ri->rrdlabels) &&
- rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, q, ':', &label_searches))) {
+ rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, q, ':', &label_searches) == SP_MATCHED_POSITIVE)) {
ctl->q.fts.searches += label_searches;
ctl->q.fts.char_searches += label_searches;
matched = FTS_MATCHED_LABEL;
@@ -435,12 +523,12 @@ static FTS_MATCH rrdcontext_to_json_v2_full_text_search(struct rrdcontext_to_jso
RRDSET *st = ri->rrdset;
rw_spinlock_read_lock(&st->alerts.spinlock);
for (RRDCALC *rcl = st->alerts.base; rcl; rcl = rcl->next) {
- if(unlikely(full_text_search_string(&ctl->q.fts, q, rcl->name))) {
+ if(unlikely(full_text_search_string(&ctl->q.fts, q, rcl->config.name))) {
matched = FTS_MATCHED_ALERT;
break;
}
- if(unlikely(full_text_search_string(&ctl->q.fts, q, rcl->info))) {
+ if(unlikely(full_text_search_string(&ctl->q.fts, q, rcl->config.info))) {
matched = FTS_MATCHED_ALERT_INFO;
break;
}
@@ -460,7 +548,7 @@ static bool rrdcontext_matches_alert(struct rrdcontext_to_json_v2_data *ctl, RRD
RRDSET *st = ri->rrdset;
rw_spinlock_read_lock(&st->alerts.spinlock);
for (RRDCALC *rcl = st->alerts.base; rcl; rcl = rcl->next) {
- if(ctl->alerts.alert_name_pattern && !simple_pattern_matches_string(ctl->alerts.alert_name_pattern, rcl->name))
+ if(ctl->alerts.alert_name_pattern && !simple_pattern_matches_string(ctl->alerts.alert_name_pattern, rcl->config.name))
continue;
if(ctl->alerts.alarm_id_filter && ctl->alerts.alarm_id_filter != rcl->id)
@@ -500,11 +588,51 @@ static bool rrdcontext_matches_alert(struct rrdcontext_to_json_v2_data *ctl, RRD
struct alert_v2_entry t = {
.tmp = rcl,
};
- struct alert_v2_entry *a2e = dictionary_set(ctl->alerts.alerts, string2str(rcl->name), &t,
- sizeof(struct alert_v2_entry));
+ struct alert_v2_entry *a2e =
+ dictionary_set(ctl->alerts.summary, string2str(rcl->config.name),
+ &t, sizeof(struct alert_v2_entry));
size_t ati = a2e->ati;
matches++;
+ dictionary_set_advanced(ctl->alerts.by_type,
+ string2str(rcl->config.type),
+ (ssize_t)string_strlen(rcl->config.type),
+ NULL,
+ sizeof(struct alert_by_x_entry),
+ rcl);
+
+ dictionary_set_advanced(ctl->alerts.by_component,
+ string2str(rcl->config.component),
+ (ssize_t)string_strlen(rcl->config.component),
+ NULL,
+ sizeof(struct alert_by_x_entry),
+ rcl);
+
+ dictionary_set_advanced(ctl->alerts.by_classification,
+ string2str(rcl->config.classification),
+ (ssize_t)string_strlen(rcl->config.classification),
+ NULL,
+ sizeof(struct alert_by_x_entry),
+ rcl);
+
+ dictionary_set_advanced(ctl->alerts.by_recipient,
+ string2str(rcl->config.recipient),
+ (ssize_t)string_strlen(rcl->config.recipient),
+ NULL,
+ sizeof(struct alert_by_x_entry),
+ rcl);
+
+ char *module = NULL;
+ rrdlabels_get_value_strdup_or_null(st->rrdlabels, &module, "_collect_module");
+ if(!module || !*module) module = "[unset]";
+
+ dictionary_set_advanced(ctl->alerts.by_module,
+ module,
+ -1,
+ NULL,
+ sizeof(struct alert_by_x_entry),
+ rcl);
+
if (ctl->options & (CONTEXT_V2_OPTION_ALERTS_WITH_INSTANCES | CONTEXT_V2_OPTION_ALERTS_WITH_VALUES)) {
char key[20 + 1];
snprintfz(key, sizeof(key) - 1, "%p", rcl);
@@ -726,6 +854,15 @@ static void agent_capabilities_to_json(BUFFER *wb, RRDHOST *host, const char *ke
freez(capas);
}
+static inline void host_dyncfg_to_json_v2(BUFFER *wb, const char *key, RRDHOST_STATUS *s) {
+ buffer_json_member_add_object(wb, key);
+ {
+ buffer_json_member_add_string(wb, "status", rrdhost_dyncfg_status_to_string(s->dyncfg.status));
+ }
+ buffer_json_object_close(wb); // health
+
+}
+
static inline void rrdhost_health_to_json_v2(BUFFER *wb, const char *key, RRDHOST_STATUS *s) {
buffer_json_member_add_object(wb, key);
{
@@ -838,6 +975,8 @@ static void rrdcontext_to_json_v2_rrdhost(BUFFER *wb, RRDHOST *host, struct rrdc
host_functions2json(host, wb); // functions
agent_capabilities_to_json(wb, host, "capabilities");
+
+ host_dyncfg_to_json_v2(wb, "dyncfg", &s);
}
buffer_json_object_close(wb); // this instance
buffer_json_array_close(wb); // instances
@@ -913,8 +1052,11 @@ static ssize_t rrdcontext_to_json_v2_add_host(void *data, RRDHOST *host, bool qu
.size = 1,
.node_ids = &ctl->nodes.ni,
.help = NULL,
+ .tags = NULL,
+ .access = HTTP_ACCESS_ALL,
+ .priority = RRDFUNCTIONS_PRIORITY_DEFAULT,
};
- host_functions_to_dict(host, ctl->functions.dict, &t, sizeof(t), &t.help);
+ host_functions_to_dict(host, ctl->functions.dict, &t, sizeof(t), &t.help, &t.tags, &t.access, &t.priority);
}
if(ctl->mode & CONTEXTS_V2_NODES) {
@@ -982,6 +1124,30 @@ void buffer_json_query_timings(BUFFER *wb, const char *key, struct query_timings
void build_info_to_json_object(BUFFER *b);
+static void convert_seconds_to_dhms(time_t seconds, char *result, int result_size) {
+ int days, hours, minutes;
+
+ days = (int) (seconds / (24 * 3600));
+ seconds = (int) (seconds % (24 * 3600));
+ hours = (int) (seconds / 3600);
+ seconds %= 3600;
+ minutes = (int) (seconds / 60);
+ seconds %= 60;
+
+ // Format the result into the provided string buffer
+ BUFFER *buf = buffer_create(128, NULL);
+ if (days)
+ buffer_sprintf(buf,"%d day%s%s", days, days==1 ? "" : "s", hours || minutes ? ", " : "");
+ if (hours)
+ buffer_sprintf(buf,"%d hour%s%s", hours, hours==1 ? "" : "s", minutes ? ", " : "");
+ if (minutes)
+ buffer_sprintf(buf,"%d minute%s%s", minutes, minutes==1 ? "" : "s", seconds ? ", " : "");
+ if (seconds)
+ buffer_sprintf(buf,"%d second%s", (int) seconds, seconds==1 ? "" : "s");
+ strncpyz(result, buffer_tostring(buf), result_size);
+ buffer_free(buf);
+}
+
void buffer_json_agents_v2(BUFFER *wb, struct query_timings *timings, time_t now_s, bool info, bool array) {
if(!now_s)
now_s = now_realtime_sec();
@@ -1009,14 +1175,22 @@ void buffer_json_agents_v2(BUFFER *wb, struct query_timings *timings, time_t now
buffer_json_cloud_status(wb, now_s);
buffer_json_member_add_array(wb, "db_size");
+ size_t group_seconds = localhost->rrd_update_every;
for (size_t tier = 0; tier < storage_tiers; tier++) {
STORAGE_ENGINE *eng = localhost->db[tier].eng;
if (!eng) continue;
- uint64_t max = storage_engine_disk_space_max(eng->backend, localhost->db[tier].instance);
- uint64_t used = storage_engine_disk_space_used(eng->backend, localhost->db[tier].instance);
- time_t first_time_s = storage_engine_global_first_time_s(eng->backend, localhost->db[tier].instance);
- size_t currently_collected_metrics = storage_engine_collected_metrics(eng->backend, localhost->db[tier].instance);
+ group_seconds *= storage_tiers_grouping_iterations[tier];
+ uint64_t max = storage_engine_disk_space_max(eng->seb, localhost->db[tier].si);
+ uint64_t used = storage_engine_disk_space_used(eng->seb, localhost->db[tier].si);
+#ifdef ENABLE_DBENGINE
+ if (!max && eng->seb == STORAGE_ENGINE_BACKEND_DBENGINE) {
+ max = get_directory_free_bytes_space(multidb_ctx[tier]);
+ max += used;
+ }
+#endif
+ time_t first_time_s = storage_engine_global_first_time_s(eng->seb, localhost->db[tier].si);
+ size_t currently_collected_metrics = storage_engine_collected_metrics(eng->seb, localhost->db[tier].si);
NETDATA_DOUBLE percent;
if (used && max)
@@ -1026,6 +1200,12 @@ void buffer_json_agents_v2(BUFFER *wb, struct query_timings *timings, time_t now
buffer_json_add_array_item_object(wb);
buffer_json_member_add_uint64(wb, "tier", tier);
+ char human_retention[128];
+ convert_seconds_to_dhms((time_t) group_seconds, human_retention, sizeof(human_retention) - 1);
+ buffer_json_member_add_string(wb, "point_every", human_retention);
+
+ buffer_json_member_add_uint64(wb, "metrics", storage_engine_metrics(eng->seb, localhost->db[tier].si));
+ buffer_json_member_add_uint64(wb, "samples", storage_engine_samples(eng->seb, localhost->db[tier].si));
if(used || max) {
buffer_json_member_add_uint64(wb, "disk_used", used);
@@ -1034,13 +1214,33 @@ void buffer_json_agents_v2(BUFFER *wb, struct query_timings *timings, time_t now
}
if(first_time_s) {
+ time_t retention = now_s - first_time_s;
+
buffer_json_member_add_time_t(wb, "from", first_time_s);
buffer_json_member_add_time_t(wb, "to", now_s);
- buffer_json_member_add_time_t(wb, "retention", now_s - first_time_s);
+ buffer_json_member_add_time_t(wb, "retention", retention);
+
+ convert_seconds_to_dhms(retention, human_retention, sizeof(human_retention) - 1);
+ buffer_json_member_add_string(wb, "retention_human", human_retention);
+
+ if(used || max) { // we have disk space information
+ time_t time_retention = 0;
+#ifdef ENABLE_DBENGINE
+ time_retention = multidb_ctx[tier]->config.max_retention_s;
+#endif
+ time_t space_retention = (time_t)((NETDATA_DOUBLE)(now_s - first_time_s) * 100.0 / percent);
+ time_t actual_retention = MIN(space_retention, time_retention ? time_retention : space_retention);
+
+ if (time_retention) {
+ convert_seconds_to_dhms(time_retention, human_retention, sizeof(human_retention) - 1);
+ buffer_json_member_add_time_t(wb, "requested_retention", time_retention);
+ buffer_json_member_add_string(wb, "requested_retention_human", human_retention);
+ }
- if(used || max) // we have disk space information
- buffer_json_member_add_time_t(wb, "expected_retention",
- (time_t) ((NETDATA_DOUBLE) (now_s - first_time_s) * 100.0 / percent));
+ convert_seconds_to_dhms(actual_retention, human_retention, sizeof(human_retention) - 1);
+ buffer_json_member_add_time_t(wb, "expected_retention", actual_retention);
+ buffer_json_member_add_string(wb, "expected_retention_human", human_retention);
+ }
}
if(currently_collected_metrics)
@@ -1212,14 +1412,9 @@ static void contexts_v2_alert_config_to_json_from_sql_alert_config_data(struct s
buffer_json_member_add_string(wb, "type", is_template ? "template" : "alarm");
buffer_json_member_add_string(wb, "on", is_template ? t->selectors.on_template : t->selectors.on_key);
- buffer_json_member_add_string(wb, "os", t->selectors.os);
- buffer_json_member_add_string(wb, "hosts", t->selectors.hosts);
buffer_json_member_add_string(wb, "families", t->selectors.families);
- buffer_json_member_add_string(wb, "plugin", t->selectors.plugin);
- buffer_json_member_add_string(wb, "module", t->selectors.module);
buffer_json_member_add_string(wb, "host_labels", t->selectors.host_labels);
buffer_json_member_add_string(wb, "chart_labels", t->selectors.chart_labels);
- buffer_json_member_add_string(wb, "charts", t->selectors.charts);
}
buffer_json_object_close(wb); // selectors
@@ -1236,9 +1431,13 @@ static void contexts_v2_alert_config_to_json_from_sql_alert_config_data(struct s
buffer_json_member_add_time_t(wb, "after", t->value.db.after);
buffer_json_member_add_time_t(wb, "before", t->value.db.before);
+ buffer_json_member_add_string(wb, "time_group_condition", alerts_group_conditions_id2txt(t->value.db.time_group_condition));
+ buffer_json_member_add_double(wb, "time_group_value", t->value.db.time_group_value);
+ buffer_json_member_add_string(wb, "dims_group", alerts_dims_grouping_id2group(t->value.db.dims_group));
+ buffer_json_member_add_string(wb, "data_source", alerts_data_source_id2source(t->value.db.data_source));
buffer_json_member_add_string(wb, "method", t->value.db.method);
buffer_json_member_add_string(wb, "dimensions", t->value.db.dimensions);
- web_client_api_request_v1_data_options_to_buffer_json_array(wb, "options",(RRDR_OPTIONS) t->value.db.options);
+ rrdr_options_to_buffer_json_array(wb, "options", (RRDR_OPTIONS)t->value.db.options);
}
buffer_json_object_close(wb); // db
}
@@ -1378,6 +1577,41 @@ static int contexts_v2_alert_instance_to_json_callback(const DICTIONARY_ITEM *it
return 1;
}
+static void contexts_v2_alerts_by_x_update_prototypes(void *data, STRING *type, STRING *component, STRING *classification, STRING *recipient) {
+ struct rrdcontext_to_json_v2_data *ctl = data;
+
+ dictionary_set_advanced(ctl->alerts.by_type, string2str(type), (ssize_t)string_strlen(type), NULL, sizeof(struct alert_by_x_entry), NULL);
+ dictionary_set_advanced(ctl->alerts.by_component, string2str(component), (ssize_t)string_strlen(component), NULL, sizeof(struct alert_by_x_entry), NULL);
+ dictionary_set_advanced(ctl->alerts.by_classification, string2str(classification), (ssize_t)string_strlen(classification), NULL, sizeof(struct alert_by_x_entry), NULL);
+ dictionary_set_advanced(ctl->alerts.by_recipient, string2str(recipient), (ssize_t)string_strlen(recipient), NULL, sizeof(struct alert_by_x_entry), NULL);
+}
+
+static void contexts_v2_alerts_by_x_to_json(BUFFER *wb, DICTIONARY *dict, const char *key) {
+ buffer_json_member_add_array(wb, key);
+ {
+ struct alert_by_x_entry *b;
+ dfe_start_read(dict, b) {
+ buffer_json_add_array_item_object(wb);
+ {
+ buffer_json_member_add_string(wb, "name", b_dfe.name);
+ buffer_json_member_add_uint64(wb, "cr", b->running.counts.critical);
+ buffer_json_member_add_uint64(wb, "wr", b->running.counts.warning);
+ buffer_json_member_add_uint64(wb, "cl", b->running.counts.clear);
+ buffer_json_member_add_uint64(wb, "er", b->running.counts.error);
+ buffer_json_member_add_uint64(wb, "running", b->running.total);
+
+ buffer_json_member_add_uint64(wb, "running_silent", b->running.silent);
+
+ if(b->prototypes.available)
+ buffer_json_member_add_uint64(wb, "available", b->prototypes.available);
+ }
+ buffer_json_object_close(wb);
+ }
+ dfe_done(b);
+ }
+ buffer_json_array_close(wb);
+}
+
static void contexts_v2_alert_instances_to_json(BUFFER *wb, const char *key, struct rrdcontext_to_json_v2_data *ctl, bool debug) {
buffer_json_member_add_array(wb, key);
{
@@ -1397,28 +1631,66 @@ static void contexts_v2_alerts_to_json(BUFFER *wb, struct rrdcontext_to_json_v2_
buffer_json_member_add_array(wb, "alerts");
{
struct alert_v2_entry *t;
- dfe_start_read(ctl->alerts.alerts, t)
+ dfe_start_read(ctl->alerts.summary, t)
{
buffer_json_add_array_item_object(wb);
{
buffer_json_member_add_uint64(wb, "ati", t->ati);
+
+ buffer_json_member_add_array(wb, "ni");
+ void *host_guid;
+ dfe_start_read(t->nodes, host_guid) {
+ struct contexts_v2_node *cn = dictionary_get(ctl->nodes.dict,host_guid_dfe.name);
+ buffer_json_add_array_item_int64(wb, (int64_t) cn->ni);
+ }
+ dfe_done(host_guid);
+ buffer_json_array_close(wb);
+
buffer_json_member_add_string(wb, "nm", string2str(t->name));
buffer_json_member_add_string(wb, "sum", string2str(t->summary));
- buffer_json_member_add_uint64(wb, "cr", t->critical);
- buffer_json_member_add_uint64(wb, "wr", t->warning);
- buffer_json_member_add_uint64(wb, "cl", t->clear);
- buffer_json_member_add_uint64(wb, "er", t->error);
+ buffer_json_member_add_uint64(wb, "cr", t->counts.critical);
+ buffer_json_member_add_uint64(wb, "wr", t->counts.warning);
+ buffer_json_member_add_uint64(wb, "cl", t->counts.clear);
+ buffer_json_member_add_uint64(wb, "er", t->counts.error);
buffer_json_member_add_uint64(wb, "in", t->instances);
buffer_json_member_add_uint64(wb, "nd", dictionary_entries(t->nodes));
buffer_json_member_add_uint64(wb, "cfg", dictionary_entries(t->configs));
+
+ buffer_json_member_add_array(wb, "ctx");
+ rrdlabels_key_to_buffer_array_item(t->context, wb);
+ buffer_json_array_close(wb); // ctx
+
+ buffer_json_member_add_array(wb, "cls");
+ rrdlabels_key_to_buffer_array_item(t->classification, wb);
+ buffer_json_array_close(wb); // classification
+
+
+ buffer_json_member_add_array(wb, "cp");
+ rrdlabels_key_to_buffer_array_item(t->component, wb);
+ buffer_json_array_close(wb); // component
+
+ buffer_json_member_add_array(wb, "ty");
+ rrdlabels_key_to_buffer_array_item(t->type, wb);
+ buffer_json_array_close(wb); // type
+
+ buffer_json_member_add_array(wb, "to");
+ rrdlabels_key_to_buffer_array_item(t->recipient, wb);
+ buffer_json_array_close(wb); // recipient
}
buffer_json_object_close(wb); // alert name
}
dfe_done(t);
}
buffer_json_array_close(wb); // alerts
+
+ health_prototype_metadata_foreach(ctl, contexts_v2_alerts_by_x_update_prototypes);
+ contexts_v2_alerts_by_x_to_json(wb, ctl->alerts.by_type, "alerts_by_type");
+ contexts_v2_alerts_by_x_to_json(wb, ctl->alerts.by_component, "alerts_by_component");
+ contexts_v2_alerts_by_x_to_json(wb, ctl->alerts.by_classification, "alerts_by_classification");
+ contexts_v2_alerts_by_x_to_json(wb, ctl->alerts.by_recipient, "alerts_by_recipient");
+ contexts_v2_alerts_by_x_to_json(wb, ctl->alerts.by_module, "alerts_by_module");
}
if(ctl->request->options & (CONTEXT_V2_OPTION_ALERTS_WITH_INSTANCES|CONTEXT_V2_OPTION_ALERTS_WITH_VALUES)) {
@@ -1432,9 +1704,9 @@ static void contexts_v2_alerts_to_json(BUFFER *wb, struct rrdcontext_to_json_v2_
struct sql_alert_transition_fixed_size {
usec_t global_id;
- uuid_t transition_id;
- uuid_t host_id;
- uuid_t config_hash_id;
+ nd_uuid_t transition_id;
+ nd_uuid_t host_id;
+ nd_uuid_t config_hash_id;
uint32_t alarm_id;
char alert_name[SQL_TRANSITION_DATA_SMALL_STRING];
char chart[RRD_ID_LENGTH_MAX];
@@ -1926,12 +2198,42 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE
}
}
- ctl.alerts.alerts = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ ctl.alerts.summary = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
NULL, sizeof(struct alert_v2_entry));
- dictionary_register_insert_callback(ctl.alerts.alerts, alerts_v2_insert_callback, &ctl);
- dictionary_register_conflict_callback(ctl.alerts.alerts, alerts_v2_conflict_callback, &ctl);
- dictionary_register_delete_callback(ctl.alerts.alerts, alerts_v2_delete_callback, &ctl);
+ dictionary_register_insert_callback(ctl.alerts.summary, alerts_v2_insert_callback, &ctl);
+ dictionary_register_conflict_callback(ctl.alerts.summary, alerts_v2_conflict_callback, &ctl);
+ dictionary_register_delete_callback(ctl.alerts.summary, alerts_v2_delete_callback, &ctl);
+
+ ctl.alerts.by_type = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ NULL, sizeof(struct alert_by_x_entry));
+
+ dictionary_register_insert_callback(ctl.alerts.by_type, alerts_by_x_insert_callback, NULL);
+ dictionary_register_conflict_callback(ctl.alerts.by_type, alerts_by_x_conflict_callback, NULL);
+
+ ctl.alerts.by_component = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ NULL, sizeof(struct alert_by_x_entry));
+
+ dictionary_register_insert_callback(ctl.alerts.by_component, alerts_by_x_insert_callback, NULL);
+ dictionary_register_conflict_callback(ctl.alerts.by_component, alerts_by_x_conflict_callback, NULL);
+
+ ctl.alerts.by_classification = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ NULL, sizeof(struct alert_by_x_entry));
+
+ dictionary_register_insert_callback(ctl.alerts.by_classification, alerts_by_x_insert_callback, NULL);
+ dictionary_register_conflict_callback(ctl.alerts.by_classification, alerts_by_x_conflict_callback, NULL);
+
+ ctl.alerts.by_recipient = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ NULL, sizeof(struct alert_by_x_entry));
+
+ dictionary_register_insert_callback(ctl.alerts.by_recipient, alerts_by_x_insert_callback, NULL);
+ dictionary_register_conflict_callback(ctl.alerts.by_recipient, alerts_by_x_conflict_callback, NULL);
+
+ ctl.alerts.by_module = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ NULL, sizeof(struct alert_by_x_entry));
+
+ dictionary_register_insert_callback(ctl.alerts.by_module, alerts_by_x_insert_callback, NULL);
+ dictionary_register_conflict_callback(ctl.alerts.by_module, alerts_by_x_conflict_callback, NULL);
if(ctl.options & (CONTEXT_V2_OPTION_ALERTS_WITH_INSTANCES | CONTEXT_V2_OPTION_ALERTS_WITH_VALUES)) {
ctl.alerts.alert_instances = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
@@ -2062,12 +2364,19 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE
struct function_v2_entry *t;
dfe_start_read(ctl.functions.dict, t) {
buffer_json_add_array_item_object(wb);
- buffer_json_member_add_string(wb, "name", t_dfe.name);
- buffer_json_member_add_string(wb, "help", string2str(t->help));
- buffer_json_member_add_array(wb, "ni");
- for (size_t i = 0; i < t->used; i++)
- buffer_json_add_array_item_uint64(wb, t->node_ids[i]);
- buffer_json_array_close(wb);
+ {
+ buffer_json_member_add_string(wb, "name", t_dfe.name);
+ buffer_json_member_add_string(wb, "help", string2str(t->help));
+ buffer_json_member_add_array(wb, "ni");
+ {
+ for (size_t i = 0; i < t->used; i++)
+ buffer_json_add_array_item_uint64(wb, t->node_ids[i]);
+ }
+ buffer_json_array_close(wb);
+ buffer_json_member_add_string(wb, "tags", string2str(t->tags));
+ http_access2buffer_json_array(wb, "access", t->access);
+ buffer_json_member_add_uint64(wb, "priority", t->priority);
+ }
buffer_json_object_close(wb);
}
dfe_done(t);
@@ -2127,8 +2436,13 @@ cleanup:
dictionary_destroy(ctl.nodes.dict);
dictionary_destroy(ctl.contexts.dict);
dictionary_destroy(ctl.functions.dict);
- dictionary_destroy(ctl.alerts.alerts);
+ dictionary_destroy(ctl.alerts.summary);
dictionary_destroy(ctl.alerts.alert_instances);
+ dictionary_destroy(ctl.alerts.by_type);
+ dictionary_destroy(ctl.alerts.by_component);
+ dictionary_destroy(ctl.alerts.by_classification);
+ dictionary_destroy(ctl.alerts.by_recipient);
+ dictionary_destroy(ctl.alerts.by_module);
simple_pattern_free(ctl.nodes.scope_pattern);
simple_pattern_free(ctl.nodes.pattern);
simple_pattern_free(ctl.contexts.pattern);
diff --git a/src/database/contexts/context.c b/src/database/contexts/context.c
new file mode 100644
index 000000000..33037eb93
--- /dev/null
+++ b/src/database/contexts/context.c
@@ -0,0 +1,342 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "internal.h"
+
+inline const char *rrdcontext_acquired_id(RRDCONTEXT_ACQUIRED *rca) {
+ RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
+ return string2str(rc->id);
+}
+
+inline bool rrdcontext_acquired_belongs_to_host(RRDCONTEXT_ACQUIRED *rca, RRDHOST *host) {
+ RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
+ return rc->rrdhost == host;
+}
+
+// ----------------------------------------------------------------------------
+// RRDCONTEXT
+
+static void rrdcontext_freez(RRDCONTEXT *rc) {
+ string_freez(rc->id);
+ string_freez(rc->title);
+ string_freez(rc->units);
+ string_freez(rc->family);
+}
+
+static void rrdcontext_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost) {
+ RRDHOST *host = (RRDHOST *)rrdhost;
+ RRDCONTEXT *rc = (RRDCONTEXT *)value;
+
+ rc->rrdhost = host;
+ rc->flags = rc->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics at constructor
+
+ if(rc->hub.version) {
+ // we are loading data from the SQL database
+
+ if(rc->version)
+ netdata_log_error("RRDCONTEXT: context '%s' is already initialized with version %"PRIu64", but it is loaded again from SQL with version %"PRIu64"", string2str(rc->id), rc->version, rc->hub.version);
+
+ // IMPORTANT
+ // replace all string pointers in rc->hub with our own versions
+ // the originals are coming from a tmp allocation of sqlite
+
+ string_freez(rc->id);
+ rc->id = string_strdupz(rc->hub.id);
+ rc->hub.id = string2str(rc->id);
+
+ string_freez(rc->title);
+ rc->title = string_strdupz(rc->hub.title);
+ rc->hub.title = string2str(rc->title);
+
+ string_freez(rc->units);
+ rc->units = string_strdupz(rc->hub.units);
+ rc->hub.units = string2str(rc->units);
+
+ string_freez(rc->family);
+ rc->family = string_strdupz(rc->hub.family);
+ rc->hub.family = string2str(rc->family);
+
+ rc->chart_type = rrdset_type_id(rc->hub.chart_type);
+ rc->hub.chart_type = rrdset_type_name(rc->chart_type);
+
+ rc->version = rc->hub.version;
+ rc->priority = rc->hub.priority;
+ rc->first_time_s = (time_t)rc->hub.first_time_s;
+ rc->last_time_s = (time_t)rc->hub.last_time_s;
+
+ if(rc->hub.deleted || !rc->hub.first_time_s)
+ rrd_flag_set_deleted(rc, RRD_FLAG_NONE);
+ else {
+ if (rc->last_time_s == 0)
+ rrd_flag_set_collected(rc);
+ else
+ rrd_flag_set_archived(rc);
+ }
+
+ rc->flags |= RRD_FLAG_UPDATE_REASON_LOAD_SQL; // no need for atomics at constructor
+ }
+ else {
+ // we are adding this context now for the first time
+ rc->version = now_realtime_sec();
+ }
+
+ rrdinstances_create_in_rrdcontext(rc);
+ spinlock_init(&rc->spinlock);
+
+ // signal the react callback to do the job
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_NEW_OBJECT);
+}
+
+static void rrdcontext_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost __maybe_unused) {
+
+ RRDCONTEXT *rc = (RRDCONTEXT *)value;
+
+ rrdinstances_destroy_from_rrdcontext(rc);
+ rrdcontext_freez(rc);
+}
+
+typedef enum __attribute__((packed)) {
+ OLDNEW_KEEP_OLD,
+ OLDNEW_USE_NEW,
+ OLDNEW_MERGE,
+} OLDNEW;
+
+static inline OLDNEW oldnew_decide(bool archived, bool new_archived) {
+ if(archived && !new_archived)
+ return OLDNEW_USE_NEW;
+
+ if(!archived && new_archived)
+ return OLDNEW_KEEP_OLD;
+
+ return OLDNEW_MERGE;
+}
+
+static inline void string_replace(STRING **stringpp, STRING *new_string) {
+ STRING *old = *stringpp;
+ *stringpp = string_dup(new_string);
+ string_freez(old);
+}
+
+static inline void string_merge(STRING **stringpp, STRING *new_string) {
+ STRING *old = *stringpp;
+ *stringpp = string_2way_merge(*stringpp, new_string);
+ string_freez(old);
+}
+
+static void rrdcontext_merge_with(RRDCONTEXT *rc, bool archived, STRING *title, STRING *family, STRING *units, RRDSET_TYPE chart_type, uint32_t priority) {
+ OLDNEW oldnew = oldnew_decide(rrd_flag_is_archived(rc), archived);
+
+ switch(oldnew) {
+ case OLDNEW_KEEP_OLD:
+ break;
+
+ case OLDNEW_USE_NEW:
+ if(rc->title != title) {
+ string_replace(&rc->title, title);
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
+ }
+ if(rc->family != family) {
+ string_replace(&rc->family, family);
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
+ }
+ break;
+
+ case OLDNEW_MERGE:
+ if(rc->title != title) {
+ string_merge(&rc->title, title);
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
+ }
+ if(rc->family != family) {
+ string_merge(&rc->family, family);
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
+ }
+ break;
+ }
+
+ switch(oldnew) {
+ case OLDNEW_KEEP_OLD:
+ break;
+
+ case OLDNEW_USE_NEW:
+ case OLDNEW_MERGE:
+ if(rc->units != units) {
+ string_replace(&rc->units, units);
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
+ }
+
+ if(rc->chart_type != chart_type) {
+ rc->chart_type = chart_type;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
+ }
+
+ if(rc->priority != priority) {
+ rc->priority = priority;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
+ }
+ break;
+ }
+}
+
+void rrdcontext_update_from_collected_rrdinstance(RRDINSTANCE *ri) {
+ rrdcontext_merge_with(ri->rc, rrd_flag_is_archived(ri),
+ ri->title, ri->family, ri->units, ri->chart_type, ri->priority);
+}
+
+static bool rrdcontext_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *rrdhost __maybe_unused) {
+ RRDCONTEXT *rc = (RRDCONTEXT *)old_value;
+ RRDCONTEXT *rc_new = (RRDCONTEXT *)new_value;
+
+ //current rc is not archived, new_rc is archived, don't merge
+ if (!rrd_flag_is_archived(rc) && rrd_flag_is_archived(rc_new)) {
+ rrdcontext_freez(rc_new);
+ return false;
+ }
+
+ rrdcontext_lock(rc);
+
+ rrdcontext_merge_with(rc, rrd_flag_is_archived(rc_new),
+ rc_new->title, rc_new->family, rc_new->units, rc_new->chart_type, rc_new->priority);
+
+ rrd_flag_set(rc, rc_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no need for atomics on rc_new
+
+ if(rrd_flag_is_collected(rc) && rrd_flag_is_archived(rc))
+ rrd_flag_set_collected(rc);
+
+ if(rrd_flag_is_updated(rc))
+ rrd_flag_set(rc, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT);
+
+ rrdcontext_unlock(rc);
+
+ // free the resources of the new one
+ rrdcontext_freez(rc_new);
+
+ // the react callback will continue from here
+ return rrd_flag_is_updated(rc);
+}
+
+static void rrdcontext_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost __maybe_unused) {
+ RRDCONTEXT *rc = (RRDCONTEXT *)value;
+ rrdcontext_trigger_updates(rc, __FUNCTION__ );
+}
+
+void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function) {
+ if(rrd_flag_is_updated(rc) || !rrd_flag_check(rc, RRD_FLAG_LIVE_RETENTION))
+ rrdcontext_queue_for_post_processing(rc, function, rc->flags);
+}
+
+static void rrdcontext_hub_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
+ RRDCONTEXT *rc = context;
+ rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
+ rc->queue.queued_ut = now_realtime_usec();
+ rc->queue.queued_flags = rrd_flags_get(rc);
+}
+
+static void rrdcontext_hub_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
+ RRDCONTEXT *rc = context;
+ rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB);
+}
+
+static bool rrdcontext_hub_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) {
+ // context and new_context are the same
+ // we just need to update the timings
+ RRDCONTEXT *rc = context;
+ rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
+ rc->queue.queued_ut = now_realtime_usec();
+ rc->queue.queued_flags |= rrd_flags_get(rc);
+
+ return true;
+}
+
+static void rrdcontext_post_processing_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
+ RRDCONTEXT *rc = context;
+ rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP);
+ rc->pp.queued_flags = rc->flags;
+ rc->pp.queued_ut = now_realtime_usec();
+}
+
+static void rrdcontext_post_processing_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
+ RRDCONTEXT *rc = context;
+
+ // IMPORTANT:
+ // Do not rely on this flag being absent, because the dictionaries have delayed deletions (garbage collect)
+ // so, this flag may not be deleted immediately from the context.
+ rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_PP);
+ rc->pp.dequeued_ut = now_realtime_usec();
+}
+
+static bool rrdcontext_post_processing_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) {
+ RRDCONTEXT *rc = context;
+ bool changed = false;
+
+ if(!(rc->flags & RRD_FLAG_QUEUED_FOR_PP)) {
+ rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP);
+ changed = true;
+ }
+
+ if(rc->pp.queued_flags != rc->flags) {
+ rc->pp.queued_flags |= rc->flags;
+ changed = true;
+ }
+
+ return changed;
+}
+
+
+void rrdhost_create_rrdcontexts(RRDHOST *host) {
+ if(unlikely(!host)) return;
+ if(likely(host->rrdctx.contexts)) return;
+
+ host->rrdctx.contexts = dictionary_create_advanced(
+ DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ &dictionary_stats_category_rrdcontext, sizeof(RRDCONTEXT));
+
+ dictionary_register_insert_callback(host->rrdctx.contexts, rrdcontext_insert_callback, host);
+ dictionary_register_delete_callback(host->rrdctx.contexts, rrdcontext_delete_callback, host);
+ dictionary_register_conflict_callback(host->rrdctx.contexts, rrdcontext_conflict_callback, host);
+ dictionary_register_react_callback(host->rrdctx.contexts, rrdcontext_react_callback, host);
+
+ host->rrdctx.hub_queue = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0);
+ dictionary_register_insert_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_insert_callback, NULL);
+ dictionary_register_delete_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_delete_callback, NULL);
+ dictionary_register_conflict_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_conflict_callback, NULL);
+
+ host->rrdctx.pp_queue = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0);
+ dictionary_register_insert_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_insert_callback, NULL);
+ dictionary_register_delete_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_delete_callback, NULL);
+ dictionary_register_conflict_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_conflict_callback, NULL);
+}
+
+void rrdhost_destroy_rrdcontexts(RRDHOST *host) {
+ if(unlikely(!host)) return;
+ if(unlikely(!host->rrdctx.contexts)) return;
+
+ DICTIONARY *old;
+
+ if(host->rrdctx.hub_queue) {
+ old = host->rrdctx.hub_queue;
+ host->rrdctx.hub_queue = NULL;
+
+ RRDCONTEXT *rc;
+ dfe_start_write(old, rc) {
+ dictionary_del(old, string2str(rc->id));
+ }
+ dfe_done(rc);
+ dictionary_destroy(old);
+ }
+
+ if(host->rrdctx.pp_queue) {
+ old = host->rrdctx.pp_queue;
+ host->rrdctx.pp_queue = NULL;
+
+ RRDCONTEXT *rc;
+ dfe_start_write(old, rc) {
+ dictionary_del(old, string2str(rc->id));
+ }
+ dfe_done(rc);
+ dictionary_destroy(old);
+ }
+
+ old = host->rrdctx.contexts;
+ host->rrdctx.contexts = NULL;
+ dictionary_destroy(old);
+}
+
diff --git a/database/contexts/instance.c b/src/database/contexts/instance.c
index 39837dbf6..5d841bc82 100644
--- a/database/contexts/instance.c
+++ b/src/database/contexts/instance.c
@@ -137,7 +137,7 @@ static bool rrdinstance_conflict_callback(const DICTIONARY_ITEM *item __maybe_un
"RRDINSTANCE: '%s' cannot change id to '%s'",
string2str(ri->id), string2str(ri_new->id));
- if(uuid_memcmp(&ri->uuid, &ri_new->uuid) != 0) {
+ if(!uuid_eq(ri->uuid, ri_new->uuid)) {
#ifdef NETDATA_INTERNAL_CHECKS
char uuid1[UUID_STR_LEN], uuid2[UUID_STR_LEN];
uuid_unparse(ri->uuid, uuid1);
@@ -156,7 +156,7 @@ static bool rrdinstance_conflict_callback(const DICTIONARY_ITEM *item __maybe_un
}
#ifdef NETDATA_INTERNAL_CHECKS
- if(ri->rrdset && uuid_memcmp(&ri->uuid, &ri->rrdset->chart_uuid) != 0) {
+ if(ri->rrdset && !uuid_eq(ri->uuid, ri->rrdset->chart_uuid)) {
char uuid1[UUID_STR_LEN], uuid2[UUID_STR_LEN];
uuid_unparse(ri->uuid, uuid1);
uuid_unparse(ri->rrdset->chart_uuid, uuid2);
@@ -404,7 +404,7 @@ inline void rrdinstance_from_rrdset(RRDSET *st) {
fatal("RRDCONTEXT: cannot switch rrdcontext without switching rrdinstance too");
}
-#define rrdset_get_rrdinstance(st) rrdset_get_rrdinstance_with_trace(st, __FUNCTION__);
+#define rrdset_get_rrdinstance(st) rrdset_get_rrdinstance_with_trace(st, __FUNCTION__)
static inline RRDINSTANCE *rrdset_get_rrdinstance_with_trace(RRDSET *st, const char *function) {
if(unlikely(!st->rrdcontexts.rrdinstance)) {
netdata_log_error("RRDINSTANCE: RRDSET '%s' is not linked to an RRDINSTANCE at %s()", rrdset_id(st), function);
diff --git a/src/database/contexts/internal.h b/src/database/contexts/internal.h
new file mode 100644
index 000000000..270c59649
--- /dev/null
+++ b/src/database/contexts/internal.h
@@ -0,0 +1,388 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDCONTEXT_INTERNAL_H
+#define NETDATA_RRDCONTEXT_INTERNAL_H 1
+
+#include "rrdcontext.h"
+#include "../sqlite/sqlite_context.h"
+#include "../../aclk/schema-wrappers/context.h"
+#include "../../aclk/aclk_contexts_api.h"
+#include "../../aclk/aclk.h"
+#include "../storage_engine.h"
+
+#define MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST 5000
+#define FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS 120
+#define RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC (1000 * USEC_PER_MS)
+#define RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY 10
+
+#define LOG_TRANSITIONS false
+
+#define WORKER_JOB_HOSTS 1
+#define WORKER_JOB_CHECK 2
+#define WORKER_JOB_SEND 3
+#define WORKER_JOB_DEQUEUE 4
+#define WORKER_JOB_RETENTION 5
+#define WORKER_JOB_QUEUED 6
+#define WORKER_JOB_CLEANUP 7
+#define WORKER_JOB_CLEANUP_DELETE 8
+#define WORKER_JOB_PP_METRIC 9 // post-processing metrics
+#define WORKER_JOB_PP_INSTANCE 10 // post-processing instances
+#define WORKER_JOB_PP_CONTEXT 11 // post-processing contexts
+#define WORKER_JOB_HUB_QUEUE_SIZE 12
+#define WORKER_JOB_PP_QUEUE_SIZE 13
+
+
+typedef enum __attribute__ ((__packed__)) {
+ RRD_FLAG_NONE = 0,
+ RRD_FLAG_DELETED = (1 << 0), // this is a deleted object (metrics, instances, contexts)
+ RRD_FLAG_COLLECTED = (1 << 1), // this object is currently being collected
+ RRD_FLAG_UPDATED = (1 << 2), // this object has updates to propagate
+ RRD_FLAG_ARCHIVED = (1 << 3), // this object is not currently being collected
+ RRD_FLAG_OWN_LABELS = (1 << 4), // this instance has its own labels - not linked to an RRDSET
+ RRD_FLAG_LIVE_RETENTION = (1 << 5), // we have got live retention from the database
+ RRD_FLAG_QUEUED_FOR_HUB = (1 << 6), // this context is currently queued to be dispatched to hub
+ RRD_FLAG_QUEUED_FOR_PP = (1 << 7), // this context is currently queued to be post-processed
+ RRD_FLAG_HIDDEN = (1 << 8), // don't expose this to the hub or the API
+
+ RRD_FLAG_UPDATE_REASON_TRIGGERED = (1 << 9), // the update was triggered by the child object
+ RRD_FLAG_UPDATE_REASON_LOAD_SQL = (1 << 10), // this object has just been loaded from SQL
+ RRD_FLAG_UPDATE_REASON_NEW_OBJECT = (1 << 11), // this object has just been created
+ RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT = (1 << 12), // we received an update on this object
+ RRD_FLAG_UPDATE_REASON_CHANGED_LINKING = (1 << 13), // an instance or a metric switched RRDSET or RRDDIM
+ RRD_FLAG_UPDATE_REASON_CHANGED_METADATA = (1 << 14), // this context or instance changed uuid, name, units, title, family, chart type, priority, update every, rrd changed flags
+ RRD_FLAG_UPDATE_REASON_ZERO_RETENTION = (1 << 15), // this object has no retention
+ RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T = (1 << 16), // this object changed its oldest time in the db
+ RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T = (1 << 17), // this object change its latest time in the db
+ RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED = (1 << 18), // this object has stopped being collected
+ RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED = (1 << 19), // this object has started being collected
+ RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD = (1 << 20), // this context belongs to a host that just disconnected
+ RRD_FLAG_UPDATE_REASON_UNUSED = (1 << 21), // this context is not used anymore
+ RRD_FLAG_UPDATE_REASON_DB_ROTATION = (1 << 22), // this context changed because of a db rotation
+
+ RRD_FLAG_MERGED_COLLECTED_RI_TO_RC = (1 << 29),
+
+ // action to perform on an object
+ RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION = (1 << 30), // this object has to update its retention from the db
+} RRD_FLAGS;
+
+struct rrdcontext_reason {
+ RRD_FLAGS flag;
+ const char *name;
+ usec_t delay_ut;
+};
+
+extern struct rrdcontext_reason rrdcontext_reasons[];
+
+#define RRD_FLAG_ALL_UPDATE_REASONS ( \
+ RRD_FLAG_UPDATE_REASON_TRIGGERED \
+ |RRD_FLAG_UPDATE_REASON_LOAD_SQL \
+ |RRD_FLAG_UPDATE_REASON_NEW_OBJECT \
+ |RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT \
+ |RRD_FLAG_UPDATE_REASON_CHANGED_LINKING \
+ |RRD_FLAG_UPDATE_REASON_CHANGED_METADATA \
+ |RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \
+ |RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T \
+ |RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T \
+ |RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED \
+ |RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED \
+ |RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD \
+ |RRD_FLAG_UPDATE_REASON_DB_ROTATION \
+ |RRD_FLAG_UPDATE_REASON_UNUSED \
+ )
+
+#define RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS ( \
+ RRD_FLAG_ARCHIVED \
+ |RRD_FLAG_HIDDEN \
+ |RRD_FLAG_ALL_UPDATE_REASONS \
+ )
+
+#define RRD_FLAGS_REQUIRED_FOR_DELETIONS ( \
+ RRD_FLAG_DELETED \
+ |RRD_FLAG_LIVE_RETENTION \
+)
+
+#define RRD_FLAGS_PREVENTING_DELETIONS ( \
+ RRD_FLAG_QUEUED_FOR_HUB \
+ |RRD_FLAG_COLLECTED \
+ |RRD_FLAG_QUEUED_FOR_PP \
+)
+
+// get all the flags of an object
+#define rrd_flags_get(obj) __atomic_load_n(&((obj)->flags), __ATOMIC_SEQ_CST)
+
+// check if ANY of the given flags (bits) is set
+#define rrd_flag_check(obj, flag) (rrd_flags_get(obj) & (flag))
+
+// check if ALL the given flags (bits) are set
+#define rrd_flag_check_all(obj, flag) (rrd_flag_check(obj, flag) == (flag))
+
+// set one or more flags (bits)
+#define rrd_flag_set(obj, flag) __atomic_or_fetch(&((obj)->flags), flag, __ATOMIC_SEQ_CST)
+
+// clear one or more flags (bits)
+#define rrd_flag_clear(obj, flag) __atomic_and_fetch(&((obj)->flags), ~(flag), __ATOMIC_SEQ_CST)
+
+// replace the flags of an object, with the supplied ones
+#define rrd_flags_replace(obj, all_flags) __atomic_store_n(&((obj)->flags), all_flags, __ATOMIC_SEQ_CST)
+
+static inline void
+rrd_flag_add_remove_atomic(RRD_FLAGS *flags, RRD_FLAGS check, RRD_FLAGS conditionally_add, RRD_FLAGS always_remove) {
+ RRD_FLAGS expected, desired;
+
+ do {
+ expected = *flags;
+
+ desired = expected;
+ desired &= ~(always_remove);
+
+ if(!(expected & check))
+ desired |= (check | conditionally_add);
+
+ } while(!__atomic_compare_exchange_n(flags, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
+}
+
+#define rrd_flag_set_collected(obj) \
+ rrd_flag_add_remove_atomic(&((obj)->flags) \
+ /* check this flag */ \
+ , RRD_FLAG_COLLECTED \
+ \
+ /* add these flags together with the above, if the above is not already set */ \
+ , RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED | RRD_FLAG_UPDATED \
+ \
+ /* always remove these flags */ \
+ , RRD_FLAG_ARCHIVED \
+ | RRD_FLAG_DELETED \
+ | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED \
+ | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \
+ | RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD \
+ )
+
+#define rrd_flag_set_archived(obj) \
+ rrd_flag_add_remove_atomic(&((obj)->flags) \
+ /* check this flag */ \
+ , RRD_FLAG_ARCHIVED \
+ \
+ /* add these flags together with the above, if the above is not already set */ \
+ , RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED | RRD_FLAG_UPDATED \
+ \
+ /* always remove these flags */ \
+ , RRD_FLAG_COLLECTED \
+ | RRD_FLAG_DELETED \
+ | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED \
+ | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \
+ )
+
+#define rrd_flag_set_deleted(obj, reason) \
+ rrd_flag_add_remove_atomic(&((obj)->flags) \
+ /* check this flag */ \
+ , RRD_FLAG_DELETED \
+ \
+ /* add these flags together with the above, if the above is not already set */ \
+ , RRD_FLAG_UPDATE_REASON_ZERO_RETENTION | RRD_FLAG_UPDATED | (reason) \
+ \
+ /* always remove these flags */ \
+ , RRD_FLAG_ARCHIVED \
+ | RRD_FLAG_COLLECTED \
+ )
+
+#define rrd_flag_is_collected(obj) rrd_flag_check(obj, RRD_FLAG_COLLECTED)
+#define rrd_flag_is_archived(obj) rrd_flag_check(obj, RRD_FLAG_ARCHIVED)
+#define rrd_flag_is_deleted(obj) rrd_flag_check(obj, RRD_FLAG_DELETED)
+#define rrd_flag_is_updated(obj) rrd_flag_check(obj, RRD_FLAG_UPDATED)
+
+// mark an object as updated, providing reasons (additional bits)
+#define rrd_flag_set_updated(obj, reason) rrd_flag_set(obj, RRD_FLAG_UPDATED | (reason))
+
+// clear an object as being updated, clearing also all the reasons
+#define rrd_flag_unset_updated(obj) rrd_flag_clear(obj, RRD_FLAG_UPDATED | RRD_FLAG_ALL_UPDATE_REASONS)
+
+
+typedef struct rrdmetric {
+ nd_uuid_t uuid;
+
+ STRING *id;
+ STRING *name;
+
+ RRDDIM *rrddim;
+
+ time_t first_time_s;
+ time_t last_time_s;
+ RRD_FLAGS flags;
+
+ struct rrdinstance *ri;
+} RRDMETRIC;
+
+typedef struct rrdinstance {
+ nd_uuid_t uuid;
+
+ STRING *id;
+ STRING *name;
+ STRING *title;
+ STRING *units;
+ STRING *family;
+ uint32_t priority:24;
+ RRDSET_TYPE chart_type;
+
+ RRD_FLAGS flags; // flags related to this instance
+ time_t first_time_s;
+ time_t last_time_s;
+
+ time_t update_every_s; // data collection frequency
+ RRDSET *rrdset; // pointer to RRDSET when collected, or NULL
+
+ RRDLABELS *rrdlabels; // linked to RRDSET->chart_labels or own version
+
+ struct rrdcontext *rc;
+ DICTIONARY *rrdmetrics;
+
+ struct {
+ uint32_t collected_metrics_count; // a temporary variable to detect BEGIN/END without SET
+ // don't use it for other purposes
+ // it goes up and then resets to zero, on every iteration
+ } internal;
+} RRDINSTANCE;
+
+typedef struct rrdcontext {
+ uint64_t version;
+
+ STRING *id;
+ STRING *title;
+ STRING *units;
+ STRING *family;
+ uint32_t priority;
+ RRDSET_TYPE chart_type;
+
+ SPINLOCK spinlock;
+
+ RRD_FLAGS flags;
+ time_t first_time_s;
+ time_t last_time_s;
+
+ VERSIONED_CONTEXT_DATA hub;
+
+ DICTIONARY *rrdinstances;
+ RRDHOST *rrdhost;
+
+ struct {
+ RRD_FLAGS queued_flags; // the last flags that triggered the post-processing
+ usec_t queued_ut; // the last time this was queued
+ usec_t dequeued_ut; // the last time we sent (or deduplicated) this context
+ size_t executions; // how many times this context has been processed
+ } pp;
+
+ struct {
+ RRD_FLAGS queued_flags; // the last flags that triggered the queueing
+ usec_t queued_ut; // the last time this was queued
+ usec_t delay_calc_ut; // the last time we calculated the scheduled_dispatched_ut
+ usec_t scheduled_dispatch_ut; // the time it was/is scheduled to be sent
+ usec_t dequeued_ut; // the last time we sent (or deduplicated) this context
+ size_t dispatches; // the number of times this has been dispatched to hub
+ } queue;
+
+ struct {
+ uint32_t metrics; // the number of metrics in this context
+ } stats;
+} RRDCONTEXT;
+
+
+// ----------------------------------------------------------------------------
+// helper one-liners for RRDMETRIC
+
+bool rrdmetric_update_retention(RRDMETRIC *rm);
+
+static inline RRDMETRIC *rrdmetric_acquired_value(RRDMETRIC_ACQUIRED *rma) {
+ return dictionary_acquired_item_value((DICTIONARY_ITEM *)rma);
+}
+
+static inline RRDMETRIC_ACQUIRED *rrdmetric_acquired_dup(RRDMETRIC_ACQUIRED *rma) {
+ RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+ return (RRDMETRIC_ACQUIRED *)dictionary_acquired_item_dup(rm->ri->rrdmetrics, (DICTIONARY_ITEM *)rma);
+}
+
+static inline void rrdmetric_release(RRDMETRIC_ACQUIRED *rma) {
+ RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+ dictionary_acquired_item_release(rm->ri->rrdmetrics, (DICTIONARY_ITEM *)rma);
+}
+
+void rrdmetric_rrddim_is_freed(RRDDIM *rd);
+void rrdmetric_updated_rrddim_flags(RRDDIM *rd);
+void rrdmetric_collected_rrddim(RRDDIM *rd);
+
+// ----------------------------------------------------------------------------
+// helper one-liners for RRDINSTANCE
+
+static inline RRDINSTANCE *rrdinstance_acquired_value(RRDINSTANCE_ACQUIRED *ria) {
+ return dictionary_acquired_item_value((DICTIONARY_ITEM *)ria);
+}
+
+static inline RRDINSTANCE_ACQUIRED *rrdinstance_acquired_dup(RRDINSTANCE_ACQUIRED *ria) {
+ RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+ return (RRDINSTANCE_ACQUIRED *)dictionary_acquired_item_dup(ri->rc->rrdinstances, (DICTIONARY_ITEM *)ria);
+}
+
+static inline void rrdinstance_release(RRDINSTANCE_ACQUIRED *ria) {
+ RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+ dictionary_acquired_item_release(ri->rc->rrdinstances, (DICTIONARY_ITEM *)ria);
+}
+
+void rrdinstance_from_rrdset(RRDSET *st);
+void rrdinstance_rrdset_is_freed(RRDSET *st);
+void rrdinstance_rrdset_has_updated_retention(RRDSET *st);
+void rrdinstance_updated_rrdset_name(RRDSET *st);
+void rrdinstance_updated_rrdset_flags_no_action(RRDINSTANCE *ri, RRDSET *st);
+void rrdinstance_updated_rrdset_flags(RRDSET *st);
+void rrdinstance_collected_rrdset(RRDSET *st);
+
+void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function, RRD_FLAGS flags);
+
+// ----------------------------------------------------------------------------
+// helper one-liners for RRDCONTEXT
+
+static inline RRDCONTEXT *rrdcontext_acquired_value(RRDCONTEXT_ACQUIRED *rca) {
+ return dictionary_acquired_item_value((DICTIONARY_ITEM *)rca);
+}
+
+static inline RRDCONTEXT_ACQUIRED *rrdcontext_acquired_dup(RRDCONTEXT_ACQUIRED *rca) {
+ RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
+ return (RRDCONTEXT_ACQUIRED *)dictionary_acquired_item_dup(rc->rrdhost->rrdctx.contexts, (DICTIONARY_ITEM *)rca);
+}
+
+static inline void rrdcontext_release(RRDCONTEXT_ACQUIRED *rca) {
+ RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
+ dictionary_acquired_item_release(rc->rrdhost->rrdctx.contexts, (DICTIONARY_ITEM *)rca);
+}
+
+// ----------------------------------------------------------------------------
+// Forward definitions
+
+void rrdcontext_recalculate_context_retention(RRDCONTEXT *rc, RRD_FLAGS reason, bool worker_jobs);
+void rrdcontext_recalculate_host_retention(RRDHOST *host, RRD_FLAGS reason, bool worker_jobs);
+
+#define rrdcontext_lock(rc) spinlock_lock(&((rc)->spinlock))
+#define rrdcontext_unlock(rc) spinlock_unlock(&((rc)->spinlock))
+
+void rrdinstance_trigger_updates(RRDINSTANCE *ri, const char *function);
+void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function);
+
+void rrdinstances_create_in_rrdcontext(RRDCONTEXT *rc);
+void rrdinstances_destroy_from_rrdcontext(RRDCONTEXT *rc);
+
+void rrdmetrics_destroy_from_rrdinstance(RRDINSTANCE *ri);
+void rrdmetrics_create_in_rrdinstance(RRDINSTANCE *ri);
+
+void rrdmetric_from_rrddim(RRDDIM *rd);
+
+void rrd_reasons_to_buffer_json_array_items(RRD_FLAGS flags, BUFFER *wb);
+
+#define rrdcontext_version_hash(host) rrdcontext_version_hash_with_callback(host, NULL, false, NULL)
+uint64_t rrdcontext_version_hash_with_callback(
+ RRDHOST *host,
+ void (*callback)(RRDCONTEXT *, bool, void *),
+ bool snapshot,
+ void *bundle);
+
+void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe_unused, void *bundle __maybe_unused);
+
+void rrdcontext_update_from_collected_rrdinstance(RRDINSTANCE *ri);
+
+#endif //NETDATA_RRDCONTEXT_INTERNAL_H
diff --git a/src/database/contexts/metric.c b/src/database/contexts/metric.c
new file mode 100644
index 000000000..be29f33ea
--- /dev/null
+++ b/src/database/contexts/metric.c
@@ -0,0 +1,327 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "internal.h"
+
+static void rrdmetric_trigger_updates(RRDMETRIC *rm, const char *function);
+
+inline const char *rrdmetric_acquired_id(RRDMETRIC_ACQUIRED *rma) {
+ RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+ return string2str(rm->id);
+}
+
+inline const char *rrdmetric_acquired_name(RRDMETRIC_ACQUIRED *rma) {
+ RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+ return string2str(rm->name);
+}
+
+inline bool rrdmetric_acquired_has_name(RRDMETRIC_ACQUIRED *rma) {
+ RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+ return (rm->name && rm->name != rm->id);
+}
+
+inline STRING *rrdmetric_acquired_id_dup(RRDMETRIC_ACQUIRED *rma) {
+ RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+ return string_dup(rm->id);
+}
+
+inline STRING *rrdmetric_acquired_name_dup(RRDMETRIC_ACQUIRED *rma) {
+ RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+ return string_dup(rm->name);
+}
+
+inline NETDATA_DOUBLE rrdmetric_acquired_last_stored_value(RRDMETRIC_ACQUIRED *rma) {
+ RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+
+ if(rm->rrddim)
+ return rm->rrddim->collector.last_stored_value;
+
+ return NAN;
+}
+
+inline bool rrdmetric_acquired_belongs_to_instance(RRDMETRIC_ACQUIRED *rma, RRDINSTANCE_ACQUIRED *ria) {
+ RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+ RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+ return rm->ri == ri;
+}
+
+inline time_t rrdmetric_acquired_first_entry(RRDMETRIC_ACQUIRED *rma) {
+ RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+ return rm->first_time_s;
+}
+
+inline time_t rrdmetric_acquired_last_entry(RRDMETRIC_ACQUIRED *rma) {
+ RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+
+ if(rrd_flag_check(rm, RRD_FLAG_COLLECTED))
+ return 0;
+
+ return rm->last_time_s;
+}
+
+// ----------------------------------------------------------------------------
+// RRDMETRIC
+
+// free the contents of RRDMETRIC.
+// RRDMETRIC itself is managed by DICTIONARY - no need to free it here.
+static void rrdmetric_free(RRDMETRIC *rm) {
+ string_freez(rm->id);
+ string_freez(rm->name);
+
+ rm->id = NULL;
+ rm->name = NULL;
+ rm->ri = NULL;
+}
+
+// called when this rrdmetric is inserted to the rrdmetrics dictionary of a rrdinstance
+// the constructor of the rrdmetric object
+static void rrdmetric_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdinstance) {
+ RRDMETRIC *rm = value;
+
+ // link it to its parent
+ rm->ri = rrdinstance;
+
+ // remove flags that we need to figure out at runtime
+ rm->flags = rm->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics
+
+ // signal the react callback to do the job
+ rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_NEW_OBJECT);
+}
+
+// called when this rrdmetric is deleted from the rrdmetrics dictionary of a rrdinstance
+// the destructor of the rrdmetric object
+static void rrdmetric_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdinstance __maybe_unused) {
+ RRDMETRIC *rm = value;
+
+ internal_error(rm->rrddim, "RRDMETRIC: '%s' is freed but there is a RRDDIM linked to it.", string2str(rm->id));
+
+ // free the resources
+ rrdmetric_free(rm);
+}
+
+// called when the same rrdmetric is inserted again to the rrdmetrics dictionary of a rrdinstance
+// while this is called, the dictionary is write locked, but there may be other users of the object
+static bool rrdmetric_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *rrdinstance __maybe_unused) {
+ RRDMETRIC *rm = old_value;
+ RRDMETRIC *rm_new = new_value;
+
+ internal_error(rm->id != rm_new->id,
+ "RRDMETRIC: '%s' cannot change id to '%s'",
+ string2str(rm->id), string2str(rm_new->id));
+
+ if(!uuid_eq(rm->uuid, rm_new->uuid)) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ char uuid1[UUID_STR_LEN], uuid2[UUID_STR_LEN];
+ uuid_unparse(rm->uuid, uuid1);
+ uuid_unparse(rm_new->uuid, uuid2);
+
+ time_t old_first_time_s = 0;
+ time_t old_last_time_s = 0;
+ if(rrdmetric_update_retention(rm)) {
+ old_first_time_s = rm->first_time_s;
+ old_last_time_s = rm->last_time_s;
+ }
+
+ uuid_copy(rm->uuid, rm_new->uuid);
+
+ time_t new_first_time_s = 0;
+ time_t new_last_time_s = 0;
+ if(rrdmetric_update_retention(rm)) {
+ new_first_time_s = rm->first_time_s;
+ new_last_time_s = rm->last_time_s;
+ }
+
+ internal_error(true,
+ "RRDMETRIC: '%s' of instance '%s' of host '%s' changed UUID from '%s' (retention %ld to %ld, %ld secs) to '%s' (retention %ld to %ld, %ld secs)"
+ , string2str(rm->id)
+ , string2str(rm->ri->id)
+ , rrdhost_hostname(rm->ri->rc->rrdhost)
+ , uuid1, old_first_time_s, old_last_time_s, old_last_time_s - old_first_time_s
+ , uuid2, new_first_time_s, new_last_time_s, new_last_time_s - new_first_time_s
+ );
+#else
+ uuid_copy(rm->uuid, rm_new->uuid);
+#endif
+ rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
+ }
+
+ if(rm->rrddim && rm_new->rrddim && rm->rrddim != rm_new->rrddim) {
+ rm->rrddim = rm_new->rrddim;
+ rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LINKING);
+ }
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(rm->rrddim && !uuid_eq(rm->uuid, rm->rrddim->metric_uuid)) {
+ char uuid1[UUID_STR_LEN], uuid2[UUID_STR_LEN];
+ uuid_unparse(rm->uuid, uuid1);
+ uuid_unparse(rm_new->uuid, uuid2);
+ internal_error(true, "RRDMETRIC: '%s' is linked to RRDDIM '%s' but they have different UUIDs. RRDMETRIC has '%s', RRDDIM has '%s'", string2str(rm->id), rrddim_id(rm->rrddim), uuid1, uuid2);
+ }
+#endif
+
+ if(rm->rrddim != rm_new->rrddim)
+ rm->rrddim = rm_new->rrddim;
+
+ if(rm->name != rm_new->name) {
+ STRING *old = rm->name;
+ rm->name = string_dup(rm_new->name);
+ string_freez(old);
+ rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
+ }
+
+ if(!rm->first_time_s || (rm_new->first_time_s && rm_new->first_time_s < rm->first_time_s)) {
+ rm->first_time_s = rm_new->first_time_s;
+ rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if(!rm->last_time_s || (rm_new->last_time_s && rm_new->last_time_s > rm->last_time_s)) {
+ rm->last_time_s = rm_new->last_time_s;
+ rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ rrd_flag_set(rm, rm_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no needs for atomics on rm_new
+
+ if(rrd_flag_is_collected(rm) && rrd_flag_is_archived(rm))
+ rrd_flag_set_collected(rm);
+
+ if(rrd_flag_check(rm, RRD_FLAG_UPDATED))
+ rrd_flag_set(rm, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT);
+
+ rrdmetric_free(rm_new);
+
+ // the react callback will continue from here
+ return rrd_flag_is_updated(rm);
+}
+
+// this is called after the insert or the conflict callbacks,
+// but the dictionary is now unlocked
+static void rrdmetric_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdinstance __maybe_unused) {
+ RRDMETRIC *rm = value;
+ rrdmetric_trigger_updates(rm, __FUNCTION__ );
+}
+
+void rrdmetrics_create_in_rrdinstance(RRDINSTANCE *ri) {
+ if(unlikely(!ri)) return;
+ if(likely(ri->rrdmetrics)) return;
+
+ ri->rrdmetrics = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ &dictionary_stats_category_rrdcontext, sizeof(RRDMETRIC));
+
+ dictionary_register_insert_callback(ri->rrdmetrics, rrdmetric_insert_callback, ri);
+ dictionary_register_delete_callback(ri->rrdmetrics, rrdmetric_delete_callback, ri);
+ dictionary_register_conflict_callback(ri->rrdmetrics, rrdmetric_conflict_callback, ri);
+ dictionary_register_react_callback(ri->rrdmetrics, rrdmetric_react_callback, ri);
+}
+
+void rrdmetrics_destroy_from_rrdinstance(RRDINSTANCE *ri) {
+ if(unlikely(!ri || !ri->rrdmetrics)) return;
+ dictionary_destroy(ri->rrdmetrics);
+ ri->rrdmetrics = NULL;
+}
+
+// trigger post-processing of the rrdmetric, escalating changes to the rrdinstance it belongs
+static void rrdmetric_trigger_updates(RRDMETRIC *rm, const char *function) {
+ if(unlikely(rrd_flag_is_collected(rm)) && (!rm->rrddim || rrd_flag_check(rm, RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD)))
+ rrd_flag_set_archived(rm);
+
+ if(rrd_flag_is_updated(rm) || !rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION)) {
+ rrd_flag_set_updated(rm->ri, RRD_FLAG_UPDATE_REASON_TRIGGERED);
+ rrdcontext_queue_for_post_processing(rm->ri->rc, function, rm->flags);
+ }
+}
+
+// ----------------------------------------------------------------------------
+// RRDMETRIC HOOKS ON RRDDIM
+
+void rrdmetric_from_rrddim(RRDDIM *rd) {
+ if(unlikely(!rd->rrdset))
+ fatal("RRDMETRIC: rrddim '%s' does not have a rrdset.", rrddim_id(rd));
+
+ if(unlikely(!rd->rrdset->rrdhost))
+ fatal("RRDMETRIC: rrdset '%s' does not have a rrdhost", rrdset_id(rd->rrdset));
+
+ if(unlikely(!rd->rrdset->rrdcontexts.rrdinstance))
+ fatal("RRDMETRIC: rrdset '%s' does not have a rrdinstance", rrdset_id(rd->rrdset));
+
+ RRDINSTANCE *ri = rrdinstance_acquired_value(rd->rrdset->rrdcontexts.rrdinstance);
+
+ RRDMETRIC trm = {
+ .id = string_dup(rd->id),
+ .name = string_dup(rd->name),
+ .flags = RRD_FLAG_NONE, // no need for atomics
+ .rrddim = rd,
+ };
+ uuid_copy(trm.uuid, rd->metric_uuid);
+
+ RRDMETRIC_ACQUIRED *rma = (RRDMETRIC_ACQUIRED *)dictionary_set_and_acquire_item(ri->rrdmetrics, string2str(trm.id), &trm, sizeof(trm));
+
+ if(rd->rrdcontexts.rrdmetric)
+ rrdmetric_release(rd->rrdcontexts.rrdmetric);
+
+ rd->rrdcontexts.rrdmetric = rma;
+ rd->rrdcontexts.collected = false;
+}
+
+#define rrddim_get_rrdmetric(rd) rrddim_get_rrdmetric_with_trace(rd, __FUNCTION__)
+static inline RRDMETRIC *rrddim_get_rrdmetric_with_trace(RRDDIM *rd, const char *function) {
+ if(unlikely(!rd->rrdcontexts.rrdmetric)) {
+ netdata_log_error("RRDMETRIC: RRDDIM '%s' is not linked to an RRDMETRIC at %s()", rrddim_id(rd), function);
+ return NULL;
+ }
+
+ RRDMETRIC *rm = rrdmetric_acquired_value(rd->rrdcontexts.rrdmetric);
+ if(unlikely(!rm)) {
+ netdata_log_error("RRDMETRIC: RRDDIM '%s' lost the link to its RRDMETRIC at %s()", rrddim_id(rd), function);
+ return NULL;
+ }
+
+ if(unlikely(rm->rrddim != rd))
+ fatal("RRDMETRIC: '%s' is not linked to RRDDIM '%s' at %s()", string2str(rm->id), rrddim_id(rd), function);
+
+ return rm;
+}
+
+inline void rrdmetric_rrddim_is_freed(RRDDIM *rd) {
+ RRDMETRIC *rm = rrddim_get_rrdmetric(rd);
+ if(unlikely(!rm)) return;
+
+ if(unlikely(rrd_flag_is_collected(rm)))
+ rrd_flag_set_archived(rm);
+
+ rm->rrddim = NULL;
+ rrdmetric_trigger_updates(rm, __FUNCTION__ );
+ rrdmetric_release(rd->rrdcontexts.rrdmetric);
+ rd->rrdcontexts.rrdmetric = NULL;
+ rd->rrdcontexts.collected = false;
+}
+
+inline void rrdmetric_updated_rrddim_flags(RRDDIM *rd) {
+ rd->rrdcontexts.collected = false;
+
+ RRDMETRIC *rm = rrddim_get_rrdmetric(rd);
+ if(unlikely(!rm)) return;
+
+ if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED|RRDDIM_FLAG_OBSOLETE))) {
+ if(unlikely(rrd_flag_is_collected(rm)))
+ rrd_flag_set_archived(rm);
+ }
+
+ rrdmetric_trigger_updates(rm, __FUNCTION__ );
+}
+
+inline void rrdmetric_collected_rrddim(RRDDIM *rd) {
+ if(rd->rrdcontexts.collected)
+ return;
+
+ rd->rrdcontexts.collected = true;
+
+ RRDMETRIC *rm = rrddim_get_rrdmetric(rd);
+ if(unlikely(!rm)) return;
+
+ if(unlikely(!rrd_flag_is_collected(rm)))
+ rrd_flag_set_collected(rm);
+
+ // we use this variable to detect BEGIN/END without SET
+ rm->ri->internal.collected_metrics_count++;
+
+ rrdmetric_trigger_updates(rm, __FUNCTION__ );
+}
diff --git a/database/contexts/query_scope.c b/src/database/contexts/query_scope.c
index f3bcd0b3f..f3bcd0b3f 100644
--- a/database/contexts/query_scope.c
+++ b/src/database/contexts/query_scope.c
diff --git a/database/contexts/query_target.c b/src/database/contexts/query_target.c
index 95abc3e65..29a9c3e59 100644
--- a/database/contexts/query_target.c
+++ b/src/database/contexts/query_target.c
@@ -4,7 +4,7 @@
#define QUERY_TARGET_MAX_REALLOC_INCREASE 500
#define query_target_realloc_size(size, start) \
- (size) ? ((size) < QUERY_TARGET_MAX_REALLOC_INCREASE ? (size) * 2 : (size) + QUERY_TARGET_MAX_REALLOC_INCREASE) : (start);
+ (size) ? ((size) < QUERY_TARGET_MAX_REALLOC_INCREASE ? (size) * 2 : (size) + QUERY_TARGET_MAX_REALLOC_INCREASE) : (start)
static void query_metric_release(QUERY_TARGET *qt, QUERY_METRIC *qm);
static void query_dimension_release(QUERY_DIMENSION *qd);
@@ -82,7 +82,6 @@ void query_target_release(QUERY_TARGET *qt) {
simple_pattern_free(qt->instances.labels_pattern);
qt->instances.labels_pattern = NULL;
-
simple_pattern_free(qt->query.pattern);
qt->query.pattern = NULL;
@@ -221,10 +220,10 @@ static inline void query_metric_release(QUERY_TARGET *qt, QUERY_METRIC *qm) {
// reset the tiers
for(size_t tier = 0; tier < storage_tiers ;tier++) {
- if(qm->tiers[tier].db_metric_handle) {
+ if(qm->tiers[tier].smh) {
STORAGE_ENGINE *eng = query_metric_storage_engine(qt, qm, tier);
- eng->api.metric_release(qm->tiers[tier].db_metric_handle);
- qm->tiers[tier].db_metric_handle = NULL;
+ eng->api.metric_release(qm->tiers[tier].smh);
+ qm->tiers[tier].smh = NULL;
}
}
}
@@ -241,7 +240,7 @@ static bool query_metric_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CON
struct {
STORAGE_ENGINE *eng;
- STORAGE_METRIC_HANDLE *db_metric_handle;
+ STORAGE_METRIC_HANDLE *smh;
time_t db_first_time_s;
time_t db_last_time_s;
time_t db_update_every_s;
@@ -252,14 +251,14 @@ static bool query_metric_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CON
tier_retention[tier].eng = eng;
tier_retention[tier].db_update_every_s = (time_t) (qn->rrdhost->db[tier].tier_grouping * ri->update_every_s);
- if(rm->rrddim && rm->rrddim->tiers[tier].db_metric_handle)
- tier_retention[tier].db_metric_handle = eng->api.metric_dup(rm->rrddim->tiers[tier].db_metric_handle);
+ if(rm->rrddim && rm->rrddim->tiers[tier].smh)
+ tier_retention[tier].smh = eng->api.metric_dup(rm->rrddim->tiers[tier].smh);
else
- tier_retention[tier].db_metric_handle = eng->api.metric_get(qn->rrdhost->db[tier].instance, &rm->uuid);
+ tier_retention[tier].smh = eng->api.metric_get(qn->rrdhost->db[tier].si, &rm->uuid);
- if(tier_retention[tier].db_metric_handle) {
- tier_retention[tier].db_first_time_s = storage_engine_oldest_time_s(tier_retention[tier].eng->backend, tier_retention[tier].db_metric_handle);
- tier_retention[tier].db_last_time_s = storage_engine_latest_time_s(tier_retention[tier].eng->backend, tier_retention[tier].db_metric_handle);
+ if(tier_retention[tier].smh) {
+ tier_retention[tier].db_first_time_s = storage_engine_oldest_time_s(tier_retention[tier].eng->seb, tier_retention[tier].smh);
+ tier_retention[tier].db_last_time_s = storage_engine_latest_time_s(tier_retention[tier].eng->seb, tier_retention[tier].smh);
if(!common_first_time_s)
common_first_time_s = tier_retention[tier].db_first_time_s;
@@ -331,7 +330,7 @@ static bool query_metric_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CON
for (size_t tier = 0; tier < storage_tiers; tier++) {
internal_fatal(tier_retention[tier].eng != query_metric_storage_engine(qt, qm, tier), "QUERY TARGET: storage engine mismatch");
- qm->tiers[tier].db_metric_handle = tier_retention[tier].db_metric_handle;
+ qm->tiers[tier].smh = tier_retention[tier].smh;
qm->tiers[tier].db_first_time_s = tier_retention[tier].db_first_time_s;
qm->tiers[tier].db_last_time_s = tier_retention[tier].db_last_time_s;
qm->tiers[tier].db_update_every_s = tier_retention[tier].db_update_every_s;
@@ -342,8 +341,10 @@ static bool query_metric_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CON
// cleanup anything we allocated to the retention we will not use
for(size_t tier = 0; tier < storage_tiers ;tier++) {
- if (tier_retention[tier].db_metric_handle)
- tier_retention[tier].eng->api.metric_release(tier_retention[tier].db_metric_handle);
+ if (tier_retention[tier].smh) {
+ tier_retention[tier].eng->api.metric_release(tier_retention[tier].smh);
+ tier_retention[tier].smh = NULL;
+ }
}
return false;
@@ -627,7 +628,7 @@ static bool query_target_match_alert_pattern(RRDINSTANCE_ACQUIRED *ria, SIMPLE_P
rw_spinlock_read_lock(&st->alerts.spinlock);
if (st->alerts.base) {
for (RRDCALC *rc = st->alerts.base; rc; rc = rc->next) {
- SIMPLE_PATTERN_RESULT ret = simple_pattern_matches_string_extract(pattern, rc->name, NULL, 0);
+ SIMPLE_PATTERN_RESULT ret = simple_pattern_matches_string_extract(pattern, rc->config.name, NULL, 0);
if(ret == SP_MATCHED_POSITIVE) {
matched = true;
@@ -641,7 +642,7 @@ static bool query_target_match_alert_pattern(RRDINSTANCE_ACQUIRED *ria, SIMPLE_P
else
buffer_flush(wb);
- buffer_fast_strcat(wb, string2str(rc->name), string_strlen(rc->name));
+ buffer_fast_strcat(wb, string2str(rc->config.name), string_strlen(rc->config.name));
buffer_fast_strcat(wb, ":", 1);
buffer_strcat(wb, rrdcalc_status2string(rc->status));
@@ -725,13 +726,22 @@ static inline SIMPLE_PATTERN_RESULT query_instance_matches(QUERY_INSTANCE *qi,
return ret;
}
-static inline bool query_instance_matches_labels(RRDINSTANCE *ri, SIMPLE_PATTERN *chart_label_key_sp, SIMPLE_PATTERN *labels_sp) {
- if ((chart_label_key_sp && !rrdlabels_match_simple_pattern_parsed(
- ri->rrdlabels, chart_label_key_sp, '\0', NULL)) ||
- (labels_sp && !rrdlabels_match_simple_pattern_parsed(
- ri->rrdlabels, labels_sp, ':', NULL)))
+static inline bool query_instance_matches_labels(
+ RRDINSTANCE *ri,
+ SIMPLE_PATTERN *chart_label_key_sp,
+ SIMPLE_PATTERN *labels_sp)
+{
+
+ if (chart_label_key_sp && rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, chart_label_key_sp, '\0', NULL) != SP_MATCHED_POSITIVE)
return false;
+ if (labels_sp) {
+ struct pattern_array *pa = pattern_array_add_simple_pattern(NULL, labels_sp, ':');
+ bool found = pattern_array_label_match(pa, ri->rrdlabels, ':', NULL);
+ pattern_array_free(pa);
+ return found;
+ }
+
return true;
}
@@ -752,7 +762,10 @@ static bool query_instance_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_C
qi, ri, qt->instances.pattern, qtl->match_ids, qtl->match_names, qt->request.version, qtl->host_node_id_str));
if(queryable_instance)
- queryable_instance = query_instance_matches_labels(ri, qt->instances.chart_label_key_pattern, qt->instances.labels_pattern);
+ queryable_instance = query_instance_matches_labels(
+ ri,
+ qt->instances.chart_label_key_pattern,
+ qt->instances.labels_pattern);
if(queryable_instance) {
if(qt->instances.alerts_pattern && !query_target_match_alert_pattern(ria, qt->instances.alerts_pattern))
@@ -1216,6 +1229,5 @@ ssize_t weights_foreach_rrdmetric_in_context(RRDCONTEXT_ACQUIRED *rca,
break;
}
dfe_done(ri);
-
return count;
}
diff --git a/database/contexts/rrdcontext.c b/src/database/contexts/rrdcontext.c
index 9dee39be2..f755e1f7e 100644
--- a/database/contexts/rrdcontext.c
+++ b/src/database/contexts/rrdcontext.c
@@ -105,7 +105,7 @@ void rrdcontext_db_rotation(void) {
rrdcontext_next_db_rotation_ut = now_realtime_usec() + FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS * USEC_PER_SEC;
}
-int rrdcontext_find_dimension_uuid(RRDSET *st, const char *id, uuid_t *store_uuid) {
+int rrdcontext_find_dimension_uuid(RRDSET *st, const char *id, nd_uuid_t *store_uuid) {
if(!st->rrdhost) return 1;
if(!st->context) return 2;
@@ -139,7 +139,7 @@ int rrdcontext_find_dimension_uuid(RRDSET *st, const char *id, uuid_t *store_uui
return 0;
}
-int rrdcontext_find_chart_uuid(RRDSET *st, uuid_t *store_uuid) {
+int rrdcontext_find_chart_uuid(RRDSET *st, nd_uuid_t *store_uuid) {
if(!st->rrdhost) return 1;
if(!st->context) return 2;
@@ -203,23 +203,6 @@ static bool rrdhost_check_our_claim_id(const char *claim_id) {
return (strcasecmp(claim_id, localhost->aclk_state.claimed_id) == 0) ? true : false;
}
-static RRDHOST *rrdhost_find_by_node_id(const char *node_id) {
- uuid_t uuid;
- if (uuid_parse(node_id, uuid))
- return NULL;
-
- RRDHOST *host = NULL;
- dfe_start_read(rrdhost_root_index, host) {
- if(!host->node_id) continue;
-
- if(uuid_memcmp(&uuid, host->node_id) == 0)
- break;
- }
- dfe_done(host);
-
- return host;
-}
-
void rrdcontext_hub_checkpoint_command(void *ptr) {
struct ctxs_checkpoint *cmd = ptr;
@@ -234,7 +217,7 @@ void rrdcontext_hub_checkpoint_command(void *ptr) {
return;
}
- RRDHOST *host = rrdhost_find_by_node_id(cmd->node_id);
+ RRDHOST *host = find_host_by_node_id(cmd->node_id);
if(!host) {
nd_log(NDLS_DAEMON, NDLP_WARNING,
"RRDCONTEXT: received checkpoint command for claim id '%s', node id '%s', "
@@ -308,7 +291,7 @@ void rrdcontext_hub_stop_streaming_command(void *ptr) {
return;
}
- RRDHOST *host = rrdhost_find_by_node_id(cmd->node_id);
+ RRDHOST *host = find_host_by_node_id(cmd->node_id);
if(!host) {
nd_log(NDLS_DAEMON, NDLP_WARNING,
"RRDCONTEXT: received stop streaming command for claim id '%s', node id '%s', "
diff --git a/database/contexts/rrdcontext.h b/src/database/contexts/rrdcontext.h
index 9c497a5a5..9fea55d38 100644
--- a/database/contexts/rrdcontext.h
+++ b/src/database/contexts/rrdcontext.h
@@ -99,7 +99,7 @@ void rrdcontext_updated_rrddim_multiplier(RRDDIM *rd);
void rrdcontext_updated_rrddim_divisor(RRDDIM *rd);
void rrdcontext_updated_rrddim_flags(RRDDIM *rd);
void rrdcontext_collected_rrddim(RRDDIM *rd);
-int rrdcontext_find_dimension_uuid(RRDSET *st, const char *id, uuid_t *store_uuid);
+int rrdcontext_find_dimension_uuid(RRDSET *st, const char *id, nd_uuid_t *store_uuid);
// ----------------------------------------------------------------------------
// public API for rrdsets
@@ -110,7 +110,7 @@ void rrdcontext_updated_rrdset_name(RRDSET *st);
void rrdcontext_updated_rrdset_flags(RRDSET *st);
void rrdcontext_updated_retention_rrdset(RRDSET *st);
void rrdcontext_collected_rrdset(RRDSET *st);
-int rrdcontext_find_chart_uuid(RRDSET *st, uuid_t *store_uuid);
+int rrdcontext_find_chart_uuid(RRDSET *st, nd_uuid_t *store_uuid);
// ----------------------------------------------------------------------------
// public API for ACLK
@@ -165,7 +165,7 @@ typedef struct query_alerts_counts { // counts the number of alerts related t
size_t other; // number of alerts in any other state
} QUERY_ALERTS_COUNTS;
-typedef struct query_node {
+typedef struct _query_node {
uint32_t slot;
RRDHOST *rrdhost;
char node_id[UUID_STR_LEN];
@@ -177,7 +177,7 @@ typedef struct query_node {
QUERY_ALERTS_COUNTS alerts;
} QUERY_NODE;
-typedef struct query_context {
+typedef struct _query_context {
uint32_t slot;
RRDCONTEXT_ACQUIRED *rca;
@@ -187,7 +187,7 @@ typedef struct query_context {
QUERY_ALERTS_COUNTS alerts;
} QUERY_CONTEXT;
-typedef struct query_instance {
+typedef struct _query_instance {
uint32_t slot;
uint32_t query_host_id;
RRDINSTANCE_ACQUIRED *ria;
@@ -199,18 +199,18 @@ typedef struct query_instance {
QUERY_ALERTS_COUNTS alerts;
} QUERY_INSTANCE;
-typedef struct query_dimension {
+typedef struct _query_dimension {
uint32_t slot;
uint32_t priority;
RRDMETRIC_ACQUIRED *rma;
QUERY_STATUS status;
} QUERY_DIMENSION;
-typedef struct query_metric {
+typedef struct _query_metric {
RRDR_DIMENSION_FLAGS status;
struct query_metric_tier {
- STORAGE_METRIC_HANDLE *db_metric_handle;
+ STORAGE_METRIC_HANDLE *smh;
time_t db_first_time_s; // the oldest timestamp available for this tier
time_t db_last_time_s; // the latest timestamp available for this tier
time_t db_update_every_s; // latest update every for this tier
@@ -299,6 +299,8 @@ typedef struct query_target_request {
qt_interrupt_callback_t interrupt_callback;
void *interrupt_callback_data;
+
+ nd_uuid_t *transaction;
} QUERY_TARGET_REQUEST;
#define GROUP_BY_MAX_LABEL_KEYS 10
@@ -419,9 +421,9 @@ typedef struct query_target {
struct sql_alert_transition_data {
usec_t global_id;
- uuid_t *transition_id;
- uuid_t *host_id;
- uuid_t *config_hash_id;
+ nd_uuid_t *transition_id;
+ nd_uuid_t *host_id;
+ nd_uuid_t *config_hash_id;
uint32_t alarm_id;
const char *alert_name;
const char *chart;
@@ -452,21 +454,16 @@ struct sql_alert_transition_data {
};
struct sql_alert_config_data {
- uuid_t *config_hash_id;
+ nd_uuid_t *config_hash_id;
const char *name;
struct {
const char *on_template;
const char *on_key;
- const char *os;
- const char *hosts;
const char *families;
- const char *plugin;
- const char *module;
const char *host_labels;
const char *chart_labels;
- const char *charts;
} selectors;
const char *info;
@@ -479,6 +476,10 @@ struct sql_alert_config_data {
struct {
const char *dimensions;
const char *method;
+ ALERT_LOOKUP_TIME_GROUP_CONDITION time_group_condition;
+ NETDATA_DOUBLE time_group_value;
+ ALERT_LOOKUP_DIMS_GROUPING dims_group;
+ ALERT_LOOKUP_DATA_SOURCE data_source;
uint32_t options;
int32_t after;
@@ -538,9 +539,9 @@ struct sql_alert_instance_v2_entry {
time_t last_updated;
time_t last_status_change;
NETDATA_DOUBLE last_status_change_value;
- uuid_t config_hash_id;
+ nd_uuid_t config_hash_id;
usec_t global_id;
- uuid_t last_transition_id;
+ nd_uuid_t last_transition_id;
uint32_t alarm_id;
RRDHOST *host;
size_t ni;
diff --git a/database/contexts/worker.c b/src/database/contexts/worker.c
index 9d7c18863..6012c14f5 100644
--- a/database/contexts/worker.c
+++ b/src/database/contexts/worker.c
@@ -239,7 +239,7 @@ bool rrdmetric_update_retention(RRDMETRIC *rm) {
STORAGE_ENGINE *eng = rrdhost->db[tier].eng;
time_t first_time_t, last_time_t;
- if (eng->api.metric_retention_by_uuid(rrdhost->db[tier].instance, &rm->uuid, &first_time_t, &last_time_t)) {
+ if (eng->api.metric_retention_by_uuid(rrdhost->db[tier].si, &rm->uuid, &first_time_t, &last_time_t)) {
if (first_time_t < min_first_time_t)
min_first_time_t = first_time_t;
@@ -753,30 +753,26 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function __maybe_unused, RRD_FLAGS flags __maybe_unused) {
if(unlikely(!rc->rrdhost->rrdctx.pp_queue)) return;
- if(!rrd_flag_check(rc, RRD_FLAG_QUEUED_FOR_PP)) {
- dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx.pp_queue,
- string2str(rc->id),
- rc,
- sizeof(*rc));
-
-#if(defined(NETDATA_INTERNAL_CHECKS) && defined(LOG_POST_PROCESSING_QUEUE_INSERTIONS))
- {
- BUFFER *wb_flags = buffer_create(1000);
- rrd_flags_to_buffer(flags, wb_flags);
-
- BUFFER *wb_reasons = buffer_create(1000);
- rrd_reasons_to_buffer(flags, wb_reasons);
-
- internal_error(true, "RRDCONTEXT: '%s' update triggered by function %s(), due to flags: %s, reasons: %s",
- string2str(rc->id), function,
- buffer_tostring(wb_flags),
- buffer_tostring(wb_reasons));
-
- buffer_free(wb_reasons);
- buffer_free(wb_flags);
- }
-#endif
+#if 0
+ if(string_strcmp(rc->id, "system.cpu") == 0) {
+ CLEAN_BUFFER *wb = buffer_create(0, NULL);
+ buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
+ buffer_json_member_add_array(wb, "flags");
+ rrd_flags_to_buffer_json_array_items(rc->flags, wb);
+ buffer_json_array_close(wb);
+ buffer_json_member_add_array(wb, "reasons");
+ rrd_reasons_to_buffer_json_array_items(rc->flags, wb);
+ buffer_json_array_close(wb);
+ buffer_json_finalize(wb);
+ nd_log(NDLS_DAEMON, NDLP_EMERG, "%s() context '%s', triggered: %s",
+ function, string2str(rc->id), buffer_tostring(wb));
}
+#endif
+
+ dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx.pp_queue,
+ string2str(rc->id),
+ rc,
+ sizeof(*rc));
}
static void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc) {
@@ -960,16 +956,13 @@ static void rrdcontext_dequeue_from_hub_queue(RRDCONTEXT *rc) {
static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now_ut) {
// check if we have received a streaming command for this host
- if(!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS) || !aclk_connected || !host->rrdctx.hub_queue)
+ if(!host->node_id || !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS) || !aclk_connected || !host->rrdctx.hub_queue)
return;
// check if there are queued items to send
if(!dictionary_entries(host->rrdctx.hub_queue))
return;
- if(!host->node_id)
- return;
-
size_t messages_added = 0;
contexts_updated_t bundle = NULL;
@@ -1060,8 +1053,10 @@ static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now
// ----------------------------------------------------------------------------
// worker thread
-static void rrdcontext_main_cleanup(void *ptr) {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+static void rrdcontext_main_cleanup(void *pptr) {
+ struct netdata_static_thread *static_thread = CLEANUP_FUNCTION_GET_PTR(pptr);
+ if(!static_thread) return;
+
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
// custom code
@@ -1071,83 +1066,82 @@ static void rrdcontext_main_cleanup(void *ptr) {
}
void *rrdcontext_main(void *ptr) {
- netdata_thread_cleanup_push(rrdcontext_main_cleanup, ptr);
-
- worker_register("RRDCONTEXT");
- worker_register_job_name(WORKER_JOB_HOSTS, "hosts");
- worker_register_job_name(WORKER_JOB_CHECK, "dedup checks");
- worker_register_job_name(WORKER_JOB_SEND, "sent contexts");
- worker_register_job_name(WORKER_JOB_DEQUEUE, "deduplicated contexts");
- worker_register_job_name(WORKER_JOB_RETENTION, "metrics retention");
- worker_register_job_name(WORKER_JOB_QUEUED, "queued contexts");
- worker_register_job_name(WORKER_JOB_CLEANUP, "cleanups");
- worker_register_job_name(WORKER_JOB_CLEANUP_DELETE, "deletes");
- worker_register_job_name(WORKER_JOB_PP_METRIC, "check metrics");
- worker_register_job_name(WORKER_JOB_PP_INSTANCE, "check instances");
- worker_register_job_name(WORKER_JOB_PP_CONTEXT, "check contexts");
-
- worker_register_job_custom_metric(WORKER_JOB_HUB_QUEUE_SIZE, "hub queue size", "contexts", WORKER_METRIC_ABSOLUTE);
- worker_register_job_custom_metric(WORKER_JOB_PP_QUEUE_SIZE, "post processing queue size", "contexts", WORKER_METRIC_ABSOLUTE);
-
- heartbeat_t hb;
- heartbeat_init(&hb);
- usec_t step = RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC;
-
- while (service_running(SERVICE_CONTEXT)) {
- worker_is_idle();
- heartbeat_next(&hb, step);
-
- if(unlikely(!service_running(SERVICE_CONTEXT))) break;
-
- usec_t now_ut = now_realtime_usec();
-
- if(rrdcontext_next_db_rotation_ut && now_ut > rrdcontext_next_db_rotation_ut) {
- rrdcontext_recalculate_retention_all_hosts();
- rrdcontext_garbage_collect_for_all_hosts();
- rrdcontext_next_db_rotation_ut = 0;
- }
+ CLEANUP_FUNCTION_REGISTER(rrdcontext_main_cleanup) cleanup_ptr = ptr;
+
+ worker_register("RRDCONTEXT");
+ worker_register_job_name(WORKER_JOB_HOSTS, "hosts");
+ worker_register_job_name(WORKER_JOB_CHECK, "dedup checks");
+ worker_register_job_name(WORKER_JOB_SEND, "sent contexts");
+ worker_register_job_name(WORKER_JOB_DEQUEUE, "deduplicated contexts");
+ worker_register_job_name(WORKER_JOB_RETENTION, "metrics retention");
+ worker_register_job_name(WORKER_JOB_QUEUED, "queued contexts");
+ worker_register_job_name(WORKER_JOB_CLEANUP, "cleanups");
+ worker_register_job_name(WORKER_JOB_CLEANUP_DELETE, "deletes");
+ worker_register_job_name(WORKER_JOB_PP_METRIC, "check metrics");
+ worker_register_job_name(WORKER_JOB_PP_INSTANCE, "check instances");
+ worker_register_job_name(WORKER_JOB_PP_CONTEXT, "check contexts");
+
+ worker_register_job_custom_metric(WORKER_JOB_HUB_QUEUE_SIZE, "hub queue size", "contexts", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_PP_QUEUE_SIZE, "post processing queue size", "contexts", WORKER_METRIC_ABSOLUTE);
+
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+ usec_t step = RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC;
+
+ while (service_running(SERVICE_CONTEXT)) {
+ worker_is_idle();
+ heartbeat_next(&hb, step);
+
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+
+ usec_t now_ut = now_realtime_usec();
+
+ if(rrdcontext_next_db_rotation_ut && now_ut > rrdcontext_next_db_rotation_ut) {
+ rrdcontext_recalculate_retention_all_hosts();
+ rrdcontext_garbage_collect_for_all_hosts();
+ rrdcontext_next_db_rotation_ut = 0;
+ }
- size_t hub_queued_contexts_for_all_hosts = 0;
- size_t pp_queued_contexts_for_all_hosts = 0;
+ size_t hub_queued_contexts_for_all_hosts = 0;
+ size_t pp_queued_contexts_for_all_hosts = 0;
- RRDHOST *host;
- dfe_start_reentrant(rrdhost_root_index, host) {
- if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+ RRDHOST *host;
+ dfe_start_reentrant(rrdhost_root_index, host) {
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
- worker_is_busy(WORKER_JOB_HOSTS);
+ worker_is_busy(WORKER_JOB_HOSTS);
- if(host->rrdctx.pp_queue) {
- pp_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.pp_queue);
- rrdcontext_post_process_queued_contexts(host);
- dictionary_garbage_collect(host->rrdctx.pp_queue);
- }
-
- if(host->rrdctx.hub_queue) {
- hub_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.hub_queue);
- rrdcontext_dispatch_queued_contexts_to_hub(host, now_ut);
- dictionary_garbage_collect(host->rrdctx.hub_queue);
- }
+ if(host->rrdctx.pp_queue) {
+ pp_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.pp_queue);
+ rrdcontext_post_process_queued_contexts(host);
+ dictionary_garbage_collect(host->rrdctx.pp_queue);
+ }
- if (host->rrdctx.contexts)
- dictionary_garbage_collect(host->rrdctx.contexts);
+ if(host->rrdctx.hub_queue) {
+ hub_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.hub_queue);
+ rrdcontext_dispatch_queued_contexts_to_hub(host, now_ut);
+ dictionary_garbage_collect(host->rrdctx.hub_queue);
+ }
- // calculate the number of metrics and instances in the host
- RRDCONTEXT *rc;
- uint32_t metrics = 0, instances = 0;
- dfe_start_read(host->rrdctx.contexts, rc) {
- metrics += rc->stats.metrics;
- instances += dictionary_entries(rc->rrdinstances);
- }
- dfe_done(rc);
- host->rrdctx.metrics = metrics;
- host->rrdctx.instances = instances;
- }
- dfe_done(host);
+ if (host->rrdctx.contexts)
+ dictionary_garbage_collect(host->rrdctx.contexts);
- worker_set_metric(WORKER_JOB_HUB_QUEUE_SIZE, (NETDATA_DOUBLE)hub_queued_contexts_for_all_hosts);
- worker_set_metric(WORKER_JOB_PP_QUEUE_SIZE, (NETDATA_DOUBLE)pp_queued_contexts_for_all_hosts);
+ // calculate the number of metrics and instances in the host
+ RRDCONTEXT *rc;
+ uint32_t metrics = 0, instances = 0;
+ dfe_start_read(host->rrdctx.contexts, rc) {
+ metrics += rc->stats.metrics;
+ instances += dictionary_entries(rc->rrdinstances);
}
+ dfe_done(rc);
+ host->rrdctx.metrics = metrics;
+ host->rrdctx.instances = instances;
+ }
+ dfe_done(host);
+
+ worker_set_metric(WORKER_JOB_HUB_QUEUE_SIZE, (NETDATA_DOUBLE)hub_queued_contexts_for_all_hosts);
+ worker_set_metric(WORKER_JOB_PP_QUEUE_SIZE, (NETDATA_DOUBLE)pp_queued_contexts_for_all_hosts);
+ }
- netdata_thread_cleanup_pop(1);
return NULL;
}