diff options
Diffstat (limited to '')
-rw-r--r-- | src/database/contexts/README.md (renamed from database/contexts/README.md) | 0 | ||||
-rw-r--r-- | src/database/contexts/api_v1.c (renamed from database/contexts/api_v1.c) | 8 | ||||
-rw-r--r-- | src/database/contexts/api_v2.c (renamed from database/contexts/api_v2.c) | 442 | ||||
-rw-r--r-- | src/database/contexts/context.c | 342 | ||||
-rw-r--r-- | src/database/contexts/instance.c (renamed from database/contexts/instance.c) | 6 | ||||
-rw-r--r-- | src/database/contexts/internal.h | 388 | ||||
-rw-r--r-- | src/database/contexts/metric.c | 327 | ||||
-rw-r--r-- | src/database/contexts/query_scope.c (renamed from database/contexts/query_scope.c) | 0 | ||||
-rw-r--r-- | src/database/contexts/query_target.c (renamed from database/contexts/query_target.c) | 60 | ||||
-rw-r--r-- | src/database/contexts/rrdcontext.c (renamed from database/contexts/rrdcontext.c) | 25 | ||||
-rw-r--r-- | src/database/contexts/rrdcontext.h (renamed from database/contexts/rrdcontext.h) | 39 | ||||
-rw-r--r-- | src/database/contexts/worker.c (renamed from database/contexts/worker.c) | 192 |
12 files changed, 1595 insertions, 234 deletions
diff --git a/database/contexts/README.md b/src/database/contexts/README.md index e69de29bb..e69de29bb 100644 --- a/database/contexts/README.md +++ b/src/database/contexts/README.md diff --git a/database/contexts/api_v1.c b/src/database/contexts/api_v1.c index f144e6f7b..355aaf91a 100644 --- a/database/contexts/api_v1.c +++ b/src/database/contexts/api_v1.c @@ -131,13 +131,13 @@ static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void if(before && (!ri->first_time_s || before < ri->first_time_s)) return 0; - if(t_parent->chart_label_key && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, t_parent->chart_label_key, - '\0', NULL)) + if(t_parent->chart_label_key && rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, t_parent->chart_label_key, + '\0', NULL) != SP_MATCHED_POSITIVE) return 0; - if(t_parent->chart_labels_filter && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, + if(t_parent->chart_labels_filter && rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, t_parent->chart_labels_filter, ':', - NULL)) + NULL) != SP_MATCHED_POSITIVE) return 0; time_t first_time_s = ri->first_time_s; diff --git a/database/contexts/api_v2.c b/src/database/contexts/api_v2.c index 3ca49a319..07cd3ac83 100644 --- a/database/contexts/api_v2.c +++ b/src/database/contexts/api_v2.c @@ -167,6 +167,9 @@ struct function_v2_entry { size_t used; size_t *node_ids; STRING *help; + STRING *tags; + HTTP_ACCESS access; + int priority; }; struct context_v2_entry { @@ -180,24 +183,45 @@ struct context_v2_entry { FTS_MATCH match; }; +struct alert_counts { + size_t critical; + size_t warning; + size_t clear; + size_t error; +}; + struct alert_v2_entry { RRDCALC *tmp; STRING *name; STRING *summary; + RRDLABELS *recipient; + RRDLABELS *classification; + RRDLABELS *context; + RRDLABELS *component; + RRDLABELS *type; size_t ati; - size_t critical; - size_t warning; - size_t clear; - size_t error; + struct alert_counts counts; size_t instances; DICTIONARY *nodes; DICTIONARY *configs; }; +struct alert_by_x_entry { + struct { + struct alert_counts counts; + size_t silent; + size_t total; + } running; + + struct { + size_t available; + } prototypes; +}; + typedef struct full_text_search_index { size_t searches; size_t string_searches; @@ -251,8 +275,14 @@ struct rrdcontext_to_json_v2_data { size_t ati; - DICTIONARY *alerts; + DICTIONARY *summary; DICTIONARY *alert_instances; + + DICTIONARY *by_type; + DICTIONARY *by_component; + DICTIONARY *by_classification; + DICTIONARY *by_recipient; + DICTIONARY *by_module; } alerts; struct { @@ -276,9 +306,7 @@ struct rrdcontext_to_json_v2_data { struct query_timings timings; }; -static void alerts_v2_add(struct alert_v2_entry *t, RRDCALC *rc) { - t->instances++; - +static void alert_counts_add(struct alert_counts *t, RRDCALC *rc) { switch(rc->status) { case RRDCALC_STATUS_CRITICAL: t->critical++; @@ -303,20 +331,66 @@ static void alerts_v2_add(struct alert_v2_entry *t, RRDCALC *rc) { break; } +} + +static void alerts_v2_add(struct alert_v2_entry *t, RRDCALC *rc) { + t->instances++; + + alert_counts_add(&t->counts, rc); dictionary_set(t->nodes, rc->rrdset->rrdhost->machine_guid, NULL, 0); char key[UUID_STR_LEN + 1]; - uuid_unparse_lower(rc->config_hash_id, key); + uuid_unparse_lower(rc->config.hash_id, key); dictionary_set(t->configs, key, NULL, 0); } +static void alerts_by_x_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data) { + static STRING *silent = NULL; + if(unlikely(!silent)) silent = string_strdupz("silent"); + + struct alert_by_x_entry *b = value; + RRDCALC *rc = data; + if(!rc) { + // prototype + b->prototypes.available++; + } + else { + alert_counts_add(&b->running.counts, rc); + + b->running.total++; + + if (rc->config.recipient == silent) + b->running.silent++; + } +} + +static bool alerts_by_x_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value __maybe_unused, void *data __maybe_unused) { + alerts_by_x_insert_callback(item, old_value, data); + return false; +} + static void alerts_v2_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data) { struct rrdcontext_to_json_v2_data *ctl = data; struct alert_v2_entry *t = value; RRDCALC *rc = t->tmp; - t->name = rc->name; - t->summary = rc->summary; + t->name = rc->config.name; + t->summary = rc->config.summary; // the original summary + t->context = rrdlabels_create(); + t->recipient = rrdlabels_create(); + t->classification = rrdlabels_create(); + t->component = rrdlabels_create(); + t->type = rrdlabels_create(); + if (string_strlen(rc->rrdset->context)) + rrdlabels_add(t->context, string2str(rc->rrdset->context), "yes", RRDLABEL_SRC_AUTO); + if (string_strlen(rc->config.recipient)) + rrdlabels_add(t->recipient, string2str(rc->config.recipient), "yes", RRDLABEL_SRC_AUTO); + if (string_strlen(rc->config.classification)) + rrdlabels_add(t->classification, string2str(rc->config.classification), "yes", RRDLABEL_SRC_AUTO); + if (string_strlen(rc->config.component)) + rrdlabels_add(t->component, string2str(rc->config.component), "yes", RRDLABEL_SRC_AUTO); + if (string_strlen(rc->config.type)) + rrdlabels_add(t->type, string2str(rc->config.type), "yes", RRDLABEL_SRC_AUTO); t->ati = ctl->alerts.ati++; t->nodes = dictionary_create(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_VALUE_LINK_DONT_CLONE|DICT_OPTION_NAME_LINK_DONT_CLONE); @@ -328,12 +402,29 @@ static void alerts_v2_insert_callback(const DICTIONARY_ITEM *item __maybe_unused static bool alerts_v2_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *data __maybe_unused) { struct alert_v2_entry *t = old_value, *n = new_value; RRDCALC *rc = n->tmp; + if (string_strlen(rc->rrdset->context)) + rrdlabels_add(t->context, string2str(rc->rrdset->context), "yes", RRDLABEL_SRC_AUTO); + if (string_strlen(rc->config.recipient)) + rrdlabels_add(t->recipient, string2str(rc->config.recipient), "yes", RRDLABEL_SRC_AUTO); + if (string_strlen(rc->config.classification)) + rrdlabels_add(t->classification, string2str(rc->config.classification), "yes", RRDLABEL_SRC_AUTO); + if (string_strlen(rc->config.component)) + rrdlabels_add(t->component, string2str(rc->config.component), "yes", RRDLABEL_SRC_AUTO); + if (string_strlen(rc->config.type)) + rrdlabels_add(t->type, string2str(rc->config.type), "yes", RRDLABEL_SRC_AUTO); alerts_v2_add(t, rc); return true; } static void alerts_v2_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { struct alert_v2_entry *t = value; + + rrdlabels_destroy(t->context); + rrdlabels_destroy(t->recipient); + rrdlabels_destroy(t->classification); + rrdlabels_destroy(t->component); + rrdlabels_destroy(t->type); + dictionary_destroy(t->nodes); dictionary_destroy(t->configs); } @@ -347,16 +438,16 @@ static void alert_instances_v2_insert_callback(const DICTIONARY_ITEM *item __may t->chart_id = rc->rrdset->id; t->chart_name = rc->rrdset->name; t->family = rc->rrdset->family; - t->units = rc->units; - t->classification = rc->classification; - t->type = rc->type; - t->recipient = rc->recipient; - t->component = rc->component; - t->name = rc->name; - t->source = rc->source; + t->units = rc->config.units; + t->classification = rc->config.classification; + t->type = rc->config.type; + t->recipient = rc->config.recipient; + t->component = rc->config.component; + t->name = rc->config.name; + t->source = rc->config.source; t->status = rc->status; t->flags = rc->run_flags; - t->info = rc->info; + t->info = rc->config.info; t->summary = rc->summary; t->value = rc->value; t->last_updated = rc->last_updated; @@ -365,12 +456,9 @@ static void alert_instances_v2_insert_callback(const DICTIONARY_ITEM *item __may t->host = rc->rrdset->rrdhost; t->alarm_id = rc->id; t->ni = ctl->nodes.ni; - t->global_id = rc->ae ? rc->ae->global_id : 0; - t->name = rc->name; - uuid_copy(t->config_hash_id, rc->config_hash_id); - if(rc->ae) - uuid_copy(t->last_transition_id, rc->ae->transition_id); + uuid_copy(t->config_hash_id, rc->config.hash_id); + health_alarm_log_get_global_id_and_transition_id_for_rrdcalc(rc, &t->global_id, &t->last_transition_id); } static bool alert_instances_v2_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value __maybe_unused, void *new_value __maybe_unused, void *data __maybe_unused) { @@ -422,7 +510,7 @@ static FTS_MATCH rrdcontext_to_json_v2_full_text_search(struct rrdcontext_to_jso size_t label_searches = 0; if(unlikely(ri->rrdlabels && rrdlabels_entries(ri->rrdlabels) && - rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, q, ':', &label_searches))) { + rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, q, ':', &label_searches) == SP_MATCHED_POSITIVE)) { ctl->q.fts.searches += label_searches; ctl->q.fts.char_searches += label_searches; matched = FTS_MATCHED_LABEL; @@ -435,12 +523,12 @@ static FTS_MATCH rrdcontext_to_json_v2_full_text_search(struct rrdcontext_to_jso RRDSET *st = ri->rrdset; rw_spinlock_read_lock(&st->alerts.spinlock); for (RRDCALC *rcl = st->alerts.base; rcl; rcl = rcl->next) { - if(unlikely(full_text_search_string(&ctl->q.fts, q, rcl->name))) { + if(unlikely(full_text_search_string(&ctl->q.fts, q, rcl->config.name))) { matched = FTS_MATCHED_ALERT; break; } - if(unlikely(full_text_search_string(&ctl->q.fts, q, rcl->info))) { + if(unlikely(full_text_search_string(&ctl->q.fts, q, rcl->config.info))) { matched = FTS_MATCHED_ALERT_INFO; break; } @@ -460,7 +548,7 @@ static bool rrdcontext_matches_alert(struct rrdcontext_to_json_v2_data *ctl, RRD RRDSET *st = ri->rrdset; rw_spinlock_read_lock(&st->alerts.spinlock); for (RRDCALC *rcl = st->alerts.base; rcl; rcl = rcl->next) { - if(ctl->alerts.alert_name_pattern && !simple_pattern_matches_string(ctl->alerts.alert_name_pattern, rcl->name)) + if(ctl->alerts.alert_name_pattern && !simple_pattern_matches_string(ctl->alerts.alert_name_pattern, rcl->config.name)) continue; if(ctl->alerts.alarm_id_filter && ctl->alerts.alarm_id_filter != rcl->id) @@ -500,11 +588,51 @@ static bool rrdcontext_matches_alert(struct rrdcontext_to_json_v2_data *ctl, RRD struct alert_v2_entry t = { .tmp = rcl, }; - struct alert_v2_entry *a2e = dictionary_set(ctl->alerts.alerts, string2str(rcl->name), &t, - sizeof(struct alert_v2_entry)); + struct alert_v2_entry *a2e = + dictionary_set(ctl->alerts.summary, string2str(rcl->config.name), + &t, sizeof(struct alert_v2_entry)); size_t ati = a2e->ati; matches++; + dictionary_set_advanced(ctl->alerts.by_type, + string2str(rcl->config.type), + (ssize_t)string_strlen(rcl->config.type), + NULL, + sizeof(struct alert_by_x_entry), + rcl); + + dictionary_set_advanced(ctl->alerts.by_component, + string2str(rcl->config.component), + (ssize_t)string_strlen(rcl->config.component), + NULL, + sizeof(struct alert_by_x_entry), + rcl); + + dictionary_set_advanced(ctl->alerts.by_classification, + string2str(rcl->config.classification), + (ssize_t)string_strlen(rcl->config.classification), + NULL, + sizeof(struct alert_by_x_entry), + rcl); + + dictionary_set_advanced(ctl->alerts.by_recipient, + string2str(rcl->config.recipient), + (ssize_t)string_strlen(rcl->config.recipient), + NULL, + sizeof(struct alert_by_x_entry), + rcl); + + char *module = NULL; + rrdlabels_get_value_strdup_or_null(st->rrdlabels, &module, "_collect_module"); + if(!module || !*module) module = "[unset]"; + + dictionary_set_advanced(ctl->alerts.by_module, + module, + -1, + NULL, + sizeof(struct alert_by_x_entry), + rcl); + if (ctl->options & (CONTEXT_V2_OPTION_ALERTS_WITH_INSTANCES | CONTEXT_V2_OPTION_ALERTS_WITH_VALUES)) { char key[20 + 1]; snprintfz(key, sizeof(key) - 1, "%p", rcl); @@ -726,6 +854,15 @@ static void agent_capabilities_to_json(BUFFER *wb, RRDHOST *host, const char *ke freez(capas); } +static inline void host_dyncfg_to_json_v2(BUFFER *wb, const char *key, RRDHOST_STATUS *s) { + buffer_json_member_add_object(wb, key); + { + buffer_json_member_add_string(wb, "status", rrdhost_dyncfg_status_to_string(s->dyncfg.status)); + } + buffer_json_object_close(wb); // health + +} + static inline void rrdhost_health_to_json_v2(BUFFER *wb, const char *key, RRDHOST_STATUS *s) { buffer_json_member_add_object(wb, key); { @@ -838,6 +975,8 @@ static void rrdcontext_to_json_v2_rrdhost(BUFFER *wb, RRDHOST *host, struct rrdc host_functions2json(host, wb); // functions agent_capabilities_to_json(wb, host, "capabilities"); + + host_dyncfg_to_json_v2(wb, "dyncfg", &s); } buffer_json_object_close(wb); // this instance buffer_json_array_close(wb); // instances @@ -913,8 +1052,11 @@ static ssize_t rrdcontext_to_json_v2_add_host(void *data, RRDHOST *host, bool qu .size = 1, .node_ids = &ctl->nodes.ni, .help = NULL, + .tags = NULL, + .access = HTTP_ACCESS_ALL, + .priority = RRDFUNCTIONS_PRIORITY_DEFAULT, }; - host_functions_to_dict(host, ctl->functions.dict, &t, sizeof(t), &t.help); + host_functions_to_dict(host, ctl->functions.dict, &t, sizeof(t), &t.help, &t.tags, &t.access, &t.priority); } if(ctl->mode & CONTEXTS_V2_NODES) { @@ -982,6 +1124,30 @@ void buffer_json_query_timings(BUFFER *wb, const char *key, struct query_timings void build_info_to_json_object(BUFFER *b); +static void convert_seconds_to_dhms(time_t seconds, char *result, int result_size) { + int days, hours, minutes; + + days = (int) (seconds / (24 * 3600)); + seconds = (int) (seconds % (24 * 3600)); + hours = (int) (seconds / 3600); + seconds %= 3600; + minutes = (int) (seconds / 60); + seconds %= 60; + + // Format the result into the provided string buffer + BUFFER *buf = buffer_create(128, NULL); + if (days) + buffer_sprintf(buf,"%d day%s%s", days, days==1 ? "" : "s", hours || minutes ? ", " : ""); + if (hours) + buffer_sprintf(buf,"%d hour%s%s", hours, hours==1 ? "" : "s", minutes ? ", " : ""); + if (minutes) + buffer_sprintf(buf,"%d minute%s%s", minutes, minutes==1 ? "" : "s", seconds ? ", " : ""); + if (seconds) + buffer_sprintf(buf,"%d second%s", (int) seconds, seconds==1 ? "" : "s"); + strncpyz(result, buffer_tostring(buf), result_size); + buffer_free(buf); +} + void buffer_json_agents_v2(BUFFER *wb, struct query_timings *timings, time_t now_s, bool info, bool array) { if(!now_s) now_s = now_realtime_sec(); @@ -1009,14 +1175,22 @@ void buffer_json_agents_v2(BUFFER *wb, struct query_timings *timings, time_t now buffer_json_cloud_status(wb, now_s); buffer_json_member_add_array(wb, "db_size"); + size_t group_seconds = localhost->rrd_update_every; for (size_t tier = 0; tier < storage_tiers; tier++) { STORAGE_ENGINE *eng = localhost->db[tier].eng; if (!eng) continue; - uint64_t max = storage_engine_disk_space_max(eng->backend, localhost->db[tier].instance); - uint64_t used = storage_engine_disk_space_used(eng->backend, localhost->db[tier].instance); - time_t first_time_s = storage_engine_global_first_time_s(eng->backend, localhost->db[tier].instance); - size_t currently_collected_metrics = storage_engine_collected_metrics(eng->backend, localhost->db[tier].instance); + group_seconds *= storage_tiers_grouping_iterations[tier]; + uint64_t max = storage_engine_disk_space_max(eng->seb, localhost->db[tier].si); + uint64_t used = storage_engine_disk_space_used(eng->seb, localhost->db[tier].si); +#ifdef ENABLE_DBENGINE + if (!max && eng->seb == STORAGE_ENGINE_BACKEND_DBENGINE) { + max = get_directory_free_bytes_space(multidb_ctx[tier]); + max += used; + } +#endif + time_t first_time_s = storage_engine_global_first_time_s(eng->seb, localhost->db[tier].si); + size_t currently_collected_metrics = storage_engine_collected_metrics(eng->seb, localhost->db[tier].si); NETDATA_DOUBLE percent; if (used && max) @@ -1026,6 +1200,12 @@ void buffer_json_agents_v2(BUFFER *wb, struct query_timings *timings, time_t now buffer_json_add_array_item_object(wb); buffer_json_member_add_uint64(wb, "tier", tier); + char human_retention[128]; + convert_seconds_to_dhms((time_t) group_seconds, human_retention, sizeof(human_retention) - 1); + buffer_json_member_add_string(wb, "point_every", human_retention); + + buffer_json_member_add_uint64(wb, "metrics", storage_engine_metrics(eng->seb, localhost->db[tier].si)); + buffer_json_member_add_uint64(wb, "samples", storage_engine_samples(eng->seb, localhost->db[tier].si)); if(used || max) { buffer_json_member_add_uint64(wb, "disk_used", used); @@ -1034,13 +1214,33 @@ void buffer_json_agents_v2(BUFFER *wb, struct query_timings *timings, time_t now } if(first_time_s) { + time_t retention = now_s - first_time_s; + buffer_json_member_add_time_t(wb, "from", first_time_s); buffer_json_member_add_time_t(wb, "to", now_s); - buffer_json_member_add_time_t(wb, "retention", now_s - first_time_s); + buffer_json_member_add_time_t(wb, "retention", retention); + + convert_seconds_to_dhms(retention, human_retention, sizeof(human_retention) - 1); + buffer_json_member_add_string(wb, "retention_human", human_retention); + + if(used || max) { // we have disk space information + time_t time_retention = 0; +#ifdef ENABLE_DBENGINE + time_retention = multidb_ctx[tier]->config.max_retention_s; +#endif + time_t space_retention = (time_t)((NETDATA_DOUBLE)(now_s - first_time_s) * 100.0 / percent); + time_t actual_retention = MIN(space_retention, time_retention ? time_retention : space_retention); + + if (time_retention) { + convert_seconds_to_dhms(time_retention, human_retention, sizeof(human_retention) - 1); + buffer_json_member_add_time_t(wb, "requested_retention", time_retention); + buffer_json_member_add_string(wb, "requested_retention_human", human_retention); + } - if(used || max) // we have disk space information - buffer_json_member_add_time_t(wb, "expected_retention", - (time_t) ((NETDATA_DOUBLE) (now_s - first_time_s) * 100.0 / percent)); + convert_seconds_to_dhms(actual_retention, human_retention, sizeof(human_retention) - 1); + buffer_json_member_add_time_t(wb, "expected_retention", actual_retention); + buffer_json_member_add_string(wb, "expected_retention_human", human_retention); + } } if(currently_collected_metrics) @@ -1212,14 +1412,9 @@ static void contexts_v2_alert_config_to_json_from_sql_alert_config_data(struct s buffer_json_member_add_string(wb, "type", is_template ? "template" : "alarm"); buffer_json_member_add_string(wb, "on", is_template ? t->selectors.on_template : t->selectors.on_key); - buffer_json_member_add_string(wb, "os", t->selectors.os); - buffer_json_member_add_string(wb, "hosts", t->selectors.hosts); buffer_json_member_add_string(wb, "families", t->selectors.families); - buffer_json_member_add_string(wb, "plugin", t->selectors.plugin); - buffer_json_member_add_string(wb, "module", t->selectors.module); buffer_json_member_add_string(wb, "host_labels", t->selectors.host_labels); buffer_json_member_add_string(wb, "chart_labels", t->selectors.chart_labels); - buffer_json_member_add_string(wb, "charts", t->selectors.charts); } buffer_json_object_close(wb); // selectors @@ -1236,9 +1431,13 @@ static void contexts_v2_alert_config_to_json_from_sql_alert_config_data(struct s buffer_json_member_add_time_t(wb, "after", t->value.db.after); buffer_json_member_add_time_t(wb, "before", t->value.db.before); + buffer_json_member_add_string(wb, "time_group_condition", alerts_group_conditions_id2txt(t->value.db.time_group_condition)); + buffer_json_member_add_double(wb, "time_group_value", t->value.db.time_group_value); + buffer_json_member_add_string(wb, "dims_group", alerts_dims_grouping_id2group(t->value.db.dims_group)); + buffer_json_member_add_string(wb, "data_source", alerts_data_source_id2source(t->value.db.data_source)); buffer_json_member_add_string(wb, "method", t->value.db.method); buffer_json_member_add_string(wb, "dimensions", t->value.db.dimensions); - web_client_api_request_v1_data_options_to_buffer_json_array(wb, "options",(RRDR_OPTIONS) t->value.db.options); + rrdr_options_to_buffer_json_array(wb, "options", (RRDR_OPTIONS)t->value.db.options); } buffer_json_object_close(wb); // db } @@ -1378,6 +1577,41 @@ static int contexts_v2_alert_instance_to_json_callback(const DICTIONARY_ITEM *it return 1; } +static void contexts_v2_alerts_by_x_update_prototypes(void *data, STRING *type, STRING *component, STRING *classification, STRING *recipient) { + struct rrdcontext_to_json_v2_data *ctl = data; + + dictionary_set_advanced(ctl->alerts.by_type, string2str(type), (ssize_t)string_strlen(type), NULL, sizeof(struct alert_by_x_entry), NULL); + dictionary_set_advanced(ctl->alerts.by_component, string2str(component), (ssize_t)string_strlen(component), NULL, sizeof(struct alert_by_x_entry), NULL); + dictionary_set_advanced(ctl->alerts.by_classification, string2str(classification), (ssize_t)string_strlen(classification), NULL, sizeof(struct alert_by_x_entry), NULL); + dictionary_set_advanced(ctl->alerts.by_recipient, string2str(recipient), (ssize_t)string_strlen(recipient), NULL, sizeof(struct alert_by_x_entry), NULL); +} + +static void contexts_v2_alerts_by_x_to_json(BUFFER *wb, DICTIONARY *dict, const char *key) { + buffer_json_member_add_array(wb, key); + { + struct alert_by_x_entry *b; + dfe_start_read(dict, b) { + buffer_json_add_array_item_object(wb); + { + buffer_json_member_add_string(wb, "name", b_dfe.name); + buffer_json_member_add_uint64(wb, "cr", b->running.counts.critical); + buffer_json_member_add_uint64(wb, "wr", b->running.counts.warning); + buffer_json_member_add_uint64(wb, "cl", b->running.counts.clear); + buffer_json_member_add_uint64(wb, "er", b->running.counts.error); + buffer_json_member_add_uint64(wb, "running", b->running.total); + + buffer_json_member_add_uint64(wb, "running_silent", b->running.silent); + + if(b->prototypes.available) + buffer_json_member_add_uint64(wb, "available", b->prototypes.available); + } + buffer_json_object_close(wb); + } + dfe_done(b); + } + buffer_json_array_close(wb); +} + static void contexts_v2_alert_instances_to_json(BUFFER *wb, const char *key, struct rrdcontext_to_json_v2_data *ctl, bool debug) { buffer_json_member_add_array(wb, key); { @@ -1397,28 +1631,66 @@ static void contexts_v2_alerts_to_json(BUFFER *wb, struct rrdcontext_to_json_v2_ buffer_json_member_add_array(wb, "alerts"); { struct alert_v2_entry *t; - dfe_start_read(ctl->alerts.alerts, t) + dfe_start_read(ctl->alerts.summary, t) { buffer_json_add_array_item_object(wb); { buffer_json_member_add_uint64(wb, "ati", t->ati); + + buffer_json_member_add_array(wb, "ni"); + void *host_guid; + dfe_start_read(t->nodes, host_guid) { + struct contexts_v2_node *cn = dictionary_get(ctl->nodes.dict,host_guid_dfe.name); + buffer_json_add_array_item_int64(wb, (int64_t) cn->ni); + } + dfe_done(host_guid); + buffer_json_array_close(wb); + buffer_json_member_add_string(wb, "nm", string2str(t->name)); buffer_json_member_add_string(wb, "sum", string2str(t->summary)); - buffer_json_member_add_uint64(wb, "cr", t->critical); - buffer_json_member_add_uint64(wb, "wr", t->warning); - buffer_json_member_add_uint64(wb, "cl", t->clear); - buffer_json_member_add_uint64(wb, "er", t->error); + buffer_json_member_add_uint64(wb, "cr", t->counts.critical); + buffer_json_member_add_uint64(wb, "wr", t->counts.warning); + buffer_json_member_add_uint64(wb, "cl", t->counts.clear); + buffer_json_member_add_uint64(wb, "er", t->counts.error); buffer_json_member_add_uint64(wb, "in", t->instances); buffer_json_member_add_uint64(wb, "nd", dictionary_entries(t->nodes)); buffer_json_member_add_uint64(wb, "cfg", dictionary_entries(t->configs)); + + buffer_json_member_add_array(wb, "ctx"); + rrdlabels_key_to_buffer_array_item(t->context, wb); + buffer_json_array_close(wb); // ctx + + buffer_json_member_add_array(wb, "cls"); + rrdlabels_key_to_buffer_array_item(t->classification, wb); + buffer_json_array_close(wb); // classification + + + buffer_json_member_add_array(wb, "cp"); + rrdlabels_key_to_buffer_array_item(t->component, wb); + buffer_json_array_close(wb); // component + + buffer_json_member_add_array(wb, "ty"); + rrdlabels_key_to_buffer_array_item(t->type, wb); + buffer_json_array_close(wb); // type + + buffer_json_member_add_array(wb, "to"); + rrdlabels_key_to_buffer_array_item(t->recipient, wb); + buffer_json_array_close(wb); // recipient } buffer_json_object_close(wb); // alert name } dfe_done(t); } buffer_json_array_close(wb); // alerts + + health_prototype_metadata_foreach(ctl, contexts_v2_alerts_by_x_update_prototypes); + contexts_v2_alerts_by_x_to_json(wb, ctl->alerts.by_type, "alerts_by_type"); + contexts_v2_alerts_by_x_to_json(wb, ctl->alerts.by_component, "alerts_by_component"); + contexts_v2_alerts_by_x_to_json(wb, ctl->alerts.by_classification, "alerts_by_classification"); + contexts_v2_alerts_by_x_to_json(wb, ctl->alerts.by_recipient, "alerts_by_recipient"); + contexts_v2_alerts_by_x_to_json(wb, ctl->alerts.by_module, "alerts_by_module"); } if(ctl->request->options & (CONTEXT_V2_OPTION_ALERTS_WITH_INSTANCES|CONTEXT_V2_OPTION_ALERTS_WITH_VALUES)) { @@ -1432,9 +1704,9 @@ static void contexts_v2_alerts_to_json(BUFFER *wb, struct rrdcontext_to_json_v2_ struct sql_alert_transition_fixed_size { usec_t global_id; - uuid_t transition_id; - uuid_t host_id; - uuid_t config_hash_id; + nd_uuid_t transition_id; + nd_uuid_t host_id; + nd_uuid_t config_hash_id; uint32_t alarm_id; char alert_name[SQL_TRANSITION_DATA_SMALL_STRING]; char chart[RRD_ID_LENGTH_MAX]; @@ -1926,12 +2198,42 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE } } - ctl.alerts.alerts = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + ctl.alerts.summary = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, sizeof(struct alert_v2_entry)); - dictionary_register_insert_callback(ctl.alerts.alerts, alerts_v2_insert_callback, &ctl); - dictionary_register_conflict_callback(ctl.alerts.alerts, alerts_v2_conflict_callback, &ctl); - dictionary_register_delete_callback(ctl.alerts.alerts, alerts_v2_delete_callback, &ctl); + dictionary_register_insert_callback(ctl.alerts.summary, alerts_v2_insert_callback, &ctl); + dictionary_register_conflict_callback(ctl.alerts.summary, alerts_v2_conflict_callback, &ctl); + dictionary_register_delete_callback(ctl.alerts.summary, alerts_v2_delete_callback, &ctl); + + ctl.alerts.by_type = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(struct alert_by_x_entry)); + + dictionary_register_insert_callback(ctl.alerts.by_type, alerts_by_x_insert_callback, NULL); + dictionary_register_conflict_callback(ctl.alerts.by_type, alerts_by_x_conflict_callback, NULL); + + ctl.alerts.by_component = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(struct alert_by_x_entry)); + + dictionary_register_insert_callback(ctl.alerts.by_component, alerts_by_x_insert_callback, NULL); + dictionary_register_conflict_callback(ctl.alerts.by_component, alerts_by_x_conflict_callback, NULL); + + ctl.alerts.by_classification = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(struct alert_by_x_entry)); + + dictionary_register_insert_callback(ctl.alerts.by_classification, alerts_by_x_insert_callback, NULL); + dictionary_register_conflict_callback(ctl.alerts.by_classification, alerts_by_x_conflict_callback, NULL); + + ctl.alerts.by_recipient = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(struct alert_by_x_entry)); + + dictionary_register_insert_callback(ctl.alerts.by_recipient, alerts_by_x_insert_callback, NULL); + dictionary_register_conflict_callback(ctl.alerts.by_recipient, alerts_by_x_conflict_callback, NULL); + + ctl.alerts.by_module = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(struct alert_by_x_entry)); + + dictionary_register_insert_callback(ctl.alerts.by_module, alerts_by_x_insert_callback, NULL); + dictionary_register_conflict_callback(ctl.alerts.by_module, alerts_by_x_conflict_callback, NULL); if(ctl.options & (CONTEXT_V2_OPTION_ALERTS_WITH_INSTANCES | CONTEXT_V2_OPTION_ALERTS_WITH_VALUES)) { ctl.alerts.alert_instances = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, @@ -2062,12 +2364,19 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE struct function_v2_entry *t; dfe_start_read(ctl.functions.dict, t) { buffer_json_add_array_item_object(wb); - buffer_json_member_add_string(wb, "name", t_dfe.name); - buffer_json_member_add_string(wb, "help", string2str(t->help)); - buffer_json_member_add_array(wb, "ni"); - for (size_t i = 0; i < t->used; i++) - buffer_json_add_array_item_uint64(wb, t->node_ids[i]); - buffer_json_array_close(wb); + { + buffer_json_member_add_string(wb, "name", t_dfe.name); + buffer_json_member_add_string(wb, "help", string2str(t->help)); + buffer_json_member_add_array(wb, "ni"); + { + for (size_t i = 0; i < t->used; i++) + buffer_json_add_array_item_uint64(wb, t->node_ids[i]); + } + buffer_json_array_close(wb); + buffer_json_member_add_string(wb, "tags", string2str(t->tags)); + http_access2buffer_json_array(wb, "access", t->access); + buffer_json_member_add_uint64(wb, "priority", t->priority); + } buffer_json_object_close(wb); } dfe_done(t); @@ -2127,8 +2436,13 @@ cleanup: dictionary_destroy(ctl.nodes.dict); dictionary_destroy(ctl.contexts.dict); dictionary_destroy(ctl.functions.dict); - dictionary_destroy(ctl.alerts.alerts); + dictionary_destroy(ctl.alerts.summary); dictionary_destroy(ctl.alerts.alert_instances); + dictionary_destroy(ctl.alerts.by_type); + dictionary_destroy(ctl.alerts.by_component); + dictionary_destroy(ctl.alerts.by_classification); + dictionary_destroy(ctl.alerts.by_recipient); + dictionary_destroy(ctl.alerts.by_module); simple_pattern_free(ctl.nodes.scope_pattern); simple_pattern_free(ctl.nodes.pattern); simple_pattern_free(ctl.contexts.pattern); diff --git a/src/database/contexts/context.c b/src/database/contexts/context.c new file mode 100644 index 000000000..33037eb93 --- /dev/null +++ b/src/database/contexts/context.c @@ -0,0 +1,342 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "internal.h" + +inline const char *rrdcontext_acquired_id(RRDCONTEXT_ACQUIRED *rca) { + RRDCONTEXT *rc = rrdcontext_acquired_value(rca); + return string2str(rc->id); +} + +inline bool rrdcontext_acquired_belongs_to_host(RRDCONTEXT_ACQUIRED *rca, RRDHOST *host) { + RRDCONTEXT *rc = rrdcontext_acquired_value(rca); + return rc->rrdhost == host; +} + +// ---------------------------------------------------------------------------- +// RRDCONTEXT + +static void rrdcontext_freez(RRDCONTEXT *rc) { + string_freez(rc->id); + string_freez(rc->title); + string_freez(rc->units); + string_freez(rc->family); +} + +static void rrdcontext_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost) { + RRDHOST *host = (RRDHOST *)rrdhost; + RRDCONTEXT *rc = (RRDCONTEXT *)value; + + rc->rrdhost = host; + rc->flags = rc->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics at constructor + + if(rc->hub.version) { + // we are loading data from the SQL database + + if(rc->version) + netdata_log_error("RRDCONTEXT: context '%s' is already initialized with version %"PRIu64", but it is loaded again from SQL with version %"PRIu64"", string2str(rc->id), rc->version, rc->hub.version); + + // IMPORTANT + // replace all string pointers in rc->hub with our own versions + // the originals are coming from a tmp allocation of sqlite + + string_freez(rc->id); + rc->id = string_strdupz(rc->hub.id); + rc->hub.id = string2str(rc->id); + + string_freez(rc->title); + rc->title = string_strdupz(rc->hub.title); + rc->hub.title = string2str(rc->title); + + string_freez(rc->units); + rc->units = string_strdupz(rc->hub.units); + rc->hub.units = string2str(rc->units); + + string_freez(rc->family); + rc->family = string_strdupz(rc->hub.family); + rc->hub.family = string2str(rc->family); + + rc->chart_type = rrdset_type_id(rc->hub.chart_type); + rc->hub.chart_type = rrdset_type_name(rc->chart_type); + + rc->version = rc->hub.version; + rc->priority = rc->hub.priority; + rc->first_time_s = (time_t)rc->hub.first_time_s; + rc->last_time_s = (time_t)rc->hub.last_time_s; + + if(rc->hub.deleted || !rc->hub.first_time_s) + rrd_flag_set_deleted(rc, RRD_FLAG_NONE); + else { + if (rc->last_time_s == 0) + rrd_flag_set_collected(rc); + else + rrd_flag_set_archived(rc); + } + + rc->flags |= RRD_FLAG_UPDATE_REASON_LOAD_SQL; // no need for atomics at constructor + } + else { + // we are adding this context now for the first time + rc->version = now_realtime_sec(); + } + + rrdinstances_create_in_rrdcontext(rc); + spinlock_init(&rc->spinlock); + + // signal the react callback to do the job + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_NEW_OBJECT); +} + +static void rrdcontext_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost __maybe_unused) { + + RRDCONTEXT *rc = (RRDCONTEXT *)value; + + rrdinstances_destroy_from_rrdcontext(rc); + rrdcontext_freez(rc); +} + +typedef enum __attribute__((packed)) { + OLDNEW_KEEP_OLD, + OLDNEW_USE_NEW, + OLDNEW_MERGE, +} OLDNEW; + +static inline OLDNEW oldnew_decide(bool archived, bool new_archived) { + if(archived && !new_archived) + return OLDNEW_USE_NEW; + + if(!archived && new_archived) + return OLDNEW_KEEP_OLD; + + return OLDNEW_MERGE; +} + +static inline void string_replace(STRING **stringpp, STRING *new_string) { + STRING *old = *stringpp; + *stringpp = string_dup(new_string); + string_freez(old); +} + +static inline void string_merge(STRING **stringpp, STRING *new_string) { + STRING *old = *stringpp; + *stringpp = string_2way_merge(*stringpp, new_string); + string_freez(old); +} + +static void rrdcontext_merge_with(RRDCONTEXT *rc, bool archived, STRING *title, STRING *family, STRING *units, RRDSET_TYPE chart_type, uint32_t priority) { + OLDNEW oldnew = oldnew_decide(rrd_flag_is_archived(rc), archived); + + switch(oldnew) { + case OLDNEW_KEEP_OLD: + break; + + case OLDNEW_USE_NEW: + if(rc->title != title) { + string_replace(&rc->title, title); + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + if(rc->family != family) { + string_replace(&rc->family, family); + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + break; + + case OLDNEW_MERGE: + if(rc->title != title) { + string_merge(&rc->title, title); + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + if(rc->family != family) { + string_merge(&rc->family, family); + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + break; + } + + switch(oldnew) { + case OLDNEW_KEEP_OLD: + break; + + case OLDNEW_USE_NEW: + case OLDNEW_MERGE: + if(rc->units != units) { + string_replace(&rc->units, units); + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + + if(rc->chart_type != chart_type) { + rc->chart_type = chart_type; + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + + if(rc->priority != priority) { + rc->priority = priority; + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + break; + } +} + +void rrdcontext_update_from_collected_rrdinstance(RRDINSTANCE *ri) { + rrdcontext_merge_with(ri->rc, rrd_flag_is_archived(ri), + ri->title, ri->family, ri->units, ri->chart_type, ri->priority); +} + +static bool rrdcontext_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *rrdhost __maybe_unused) { + RRDCONTEXT *rc = (RRDCONTEXT *)old_value; + RRDCONTEXT *rc_new = (RRDCONTEXT *)new_value; + + //current rc is not archived, new_rc is archived, don't merge + if (!rrd_flag_is_archived(rc) && rrd_flag_is_archived(rc_new)) { + rrdcontext_freez(rc_new); + return false; + } + + rrdcontext_lock(rc); + + rrdcontext_merge_with(rc, rrd_flag_is_archived(rc_new), + rc_new->title, rc_new->family, rc_new->units, rc_new->chart_type, rc_new->priority); + + rrd_flag_set(rc, rc_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no need for atomics on rc_new + + if(rrd_flag_is_collected(rc) && rrd_flag_is_archived(rc)) + rrd_flag_set_collected(rc); + + if(rrd_flag_is_updated(rc)) + rrd_flag_set(rc, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT); + + rrdcontext_unlock(rc); + + // free the resources of the new one + rrdcontext_freez(rc_new); + + // the react callback will continue from here + return rrd_flag_is_updated(rc); +} + +static void rrdcontext_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost __maybe_unused) { + RRDCONTEXT *rc = (RRDCONTEXT *)value; + rrdcontext_trigger_updates(rc, __FUNCTION__ ); +} + +void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function) { + if(rrd_flag_is_updated(rc) || !rrd_flag_check(rc, RRD_FLAG_LIVE_RETENTION)) + rrdcontext_queue_for_post_processing(rc, function, rc->flags); +} + +static void rrdcontext_hub_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) { + RRDCONTEXT *rc = context; + rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB); + rc->queue.queued_ut = now_realtime_usec(); + rc->queue.queued_flags = rrd_flags_get(rc); +} + +static void rrdcontext_hub_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) { + RRDCONTEXT *rc = context; + rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB); +} + +static bool rrdcontext_hub_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) { + // context and new_context are the same + // we just need to update the timings + RRDCONTEXT *rc = context; + rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB); + rc->queue.queued_ut = now_realtime_usec(); + rc->queue.queued_flags |= rrd_flags_get(rc); + + return true; +} + +static void rrdcontext_post_processing_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) { + RRDCONTEXT *rc = context; + rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP); + rc->pp.queued_flags = rc->flags; + rc->pp.queued_ut = now_realtime_usec(); +} + +static void rrdcontext_post_processing_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) { + RRDCONTEXT *rc = context; + + // IMPORTANT: + // Do not rely on this flag being absent, because the dictionaries have delayed deletions (garbage collect) + // so, this flag may not be deleted immediately from the context. + rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_PP); + rc->pp.dequeued_ut = now_realtime_usec(); +} + +static bool rrdcontext_post_processing_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) { + RRDCONTEXT *rc = context; + bool changed = false; + + if(!(rc->flags & RRD_FLAG_QUEUED_FOR_PP)) { + rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP); + changed = true; + } + + if(rc->pp.queued_flags != rc->flags) { + rc->pp.queued_flags |= rc->flags; + changed = true; + } + + return changed; +} + + +void rrdhost_create_rrdcontexts(RRDHOST *host) { + if(unlikely(!host)) return; + if(likely(host->rrdctx.contexts)) return; + + host->rrdctx.contexts = dictionary_create_advanced( + DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + &dictionary_stats_category_rrdcontext, sizeof(RRDCONTEXT)); + + dictionary_register_insert_callback(host->rrdctx.contexts, rrdcontext_insert_callback, host); + dictionary_register_delete_callback(host->rrdctx.contexts, rrdcontext_delete_callback, host); + dictionary_register_conflict_callback(host->rrdctx.contexts, rrdcontext_conflict_callback, host); + dictionary_register_react_callback(host->rrdctx.contexts, rrdcontext_react_callback, host); + + host->rrdctx.hub_queue = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0); + dictionary_register_insert_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_insert_callback, NULL); + dictionary_register_delete_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_delete_callback, NULL); + dictionary_register_conflict_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_conflict_callback, NULL); + + host->rrdctx.pp_queue = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0); + dictionary_register_insert_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_insert_callback, NULL); + dictionary_register_delete_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_delete_callback, NULL); + dictionary_register_conflict_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_conflict_callback, NULL); +} + +void rrdhost_destroy_rrdcontexts(RRDHOST *host) { + if(unlikely(!host)) return; + if(unlikely(!host->rrdctx.contexts)) return; + + DICTIONARY *old; + + if(host->rrdctx.hub_queue) { + old = host->rrdctx.hub_queue; + host->rrdctx.hub_queue = NULL; + + RRDCONTEXT *rc; + dfe_start_write(old, rc) { + dictionary_del(old, string2str(rc->id)); + } + dfe_done(rc); + dictionary_destroy(old); + } + + if(host->rrdctx.pp_queue) { + old = host->rrdctx.pp_queue; + host->rrdctx.pp_queue = NULL; + + RRDCONTEXT *rc; + dfe_start_write(old, rc) { + dictionary_del(old, string2str(rc->id)); + } + dfe_done(rc); + dictionary_destroy(old); + } + + old = host->rrdctx.contexts; + host->rrdctx.contexts = NULL; + dictionary_destroy(old); +} + diff --git a/database/contexts/instance.c b/src/database/contexts/instance.c index 39837dbf6..5d841bc82 100644 --- a/database/contexts/instance.c +++ b/src/database/contexts/instance.c @@ -137,7 +137,7 @@ static bool rrdinstance_conflict_callback(const DICTIONARY_ITEM *item __maybe_un "RRDINSTANCE: '%s' cannot change id to '%s'", string2str(ri->id), string2str(ri_new->id)); - if(uuid_memcmp(&ri->uuid, &ri_new->uuid) != 0) { + if(!uuid_eq(ri->uuid, ri_new->uuid)) { #ifdef NETDATA_INTERNAL_CHECKS char uuid1[UUID_STR_LEN], uuid2[UUID_STR_LEN]; uuid_unparse(ri->uuid, uuid1); @@ -156,7 +156,7 @@ static bool rrdinstance_conflict_callback(const DICTIONARY_ITEM *item __maybe_un } #ifdef NETDATA_INTERNAL_CHECKS - if(ri->rrdset && uuid_memcmp(&ri->uuid, &ri->rrdset->chart_uuid) != 0) { + if(ri->rrdset && !uuid_eq(ri->uuid, ri->rrdset->chart_uuid)) { char uuid1[UUID_STR_LEN], uuid2[UUID_STR_LEN]; uuid_unparse(ri->uuid, uuid1); uuid_unparse(ri->rrdset->chart_uuid, uuid2); @@ -404,7 +404,7 @@ inline void rrdinstance_from_rrdset(RRDSET *st) { fatal("RRDCONTEXT: cannot switch rrdcontext without switching rrdinstance too"); } -#define rrdset_get_rrdinstance(st) rrdset_get_rrdinstance_with_trace(st, __FUNCTION__); +#define rrdset_get_rrdinstance(st) rrdset_get_rrdinstance_with_trace(st, __FUNCTION__) static inline RRDINSTANCE *rrdset_get_rrdinstance_with_trace(RRDSET *st, const char *function) { if(unlikely(!st->rrdcontexts.rrdinstance)) { netdata_log_error("RRDINSTANCE: RRDSET '%s' is not linked to an RRDINSTANCE at %s()", rrdset_id(st), function); diff --git a/src/database/contexts/internal.h b/src/database/contexts/internal.h new file mode 100644 index 000000000..270c59649 --- /dev/null +++ b/src/database/contexts/internal.h @@ -0,0 +1,388 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDCONTEXT_INTERNAL_H +#define NETDATA_RRDCONTEXT_INTERNAL_H 1 + +#include "rrdcontext.h" +#include "../sqlite/sqlite_context.h" +#include "../../aclk/schema-wrappers/context.h" +#include "../../aclk/aclk_contexts_api.h" +#include "../../aclk/aclk.h" +#include "../storage_engine.h" + +#define MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST 5000 +#define FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS 120 +#define RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC (1000 * USEC_PER_MS) +#define RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY 10 + +#define LOG_TRANSITIONS false + +#define WORKER_JOB_HOSTS 1 +#define WORKER_JOB_CHECK 2 +#define WORKER_JOB_SEND 3 +#define WORKER_JOB_DEQUEUE 4 +#define WORKER_JOB_RETENTION 5 +#define WORKER_JOB_QUEUED 6 +#define WORKER_JOB_CLEANUP 7 +#define WORKER_JOB_CLEANUP_DELETE 8 +#define WORKER_JOB_PP_METRIC 9 // post-processing metrics +#define WORKER_JOB_PP_INSTANCE 10 // post-processing instances +#define WORKER_JOB_PP_CONTEXT 11 // post-processing contexts +#define WORKER_JOB_HUB_QUEUE_SIZE 12 +#define WORKER_JOB_PP_QUEUE_SIZE 13 + + +typedef enum __attribute__ ((__packed__)) { + RRD_FLAG_NONE = 0, + RRD_FLAG_DELETED = (1 << 0), // this is a deleted object (metrics, instances, contexts) + RRD_FLAG_COLLECTED = (1 << 1), // this object is currently being collected + RRD_FLAG_UPDATED = (1 << 2), // this object has updates to propagate + RRD_FLAG_ARCHIVED = (1 << 3), // this object is not currently being collected + RRD_FLAG_OWN_LABELS = (1 << 4), // this instance has its own labels - not linked to an RRDSET + RRD_FLAG_LIVE_RETENTION = (1 << 5), // we have got live retention from the database + RRD_FLAG_QUEUED_FOR_HUB = (1 << 6), // this context is currently queued to be dispatched to hub + RRD_FLAG_QUEUED_FOR_PP = (1 << 7), // this context is currently queued to be post-processed + RRD_FLAG_HIDDEN = (1 << 8), // don't expose this to the hub or the API + + RRD_FLAG_UPDATE_REASON_TRIGGERED = (1 << 9), // the update was triggered by the child object + RRD_FLAG_UPDATE_REASON_LOAD_SQL = (1 << 10), // this object has just been loaded from SQL + RRD_FLAG_UPDATE_REASON_NEW_OBJECT = (1 << 11), // this object has just been created + RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT = (1 << 12), // we received an update on this object + RRD_FLAG_UPDATE_REASON_CHANGED_LINKING = (1 << 13), // an instance or a metric switched RRDSET or RRDDIM + RRD_FLAG_UPDATE_REASON_CHANGED_METADATA = (1 << 14), // this context or instance changed uuid, name, units, title, family, chart type, priority, update every, rrd changed flags + RRD_FLAG_UPDATE_REASON_ZERO_RETENTION = (1 << 15), // this object has no retention + RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T = (1 << 16), // this object changed its oldest time in the db + RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T = (1 << 17), // this object change its latest time in the db + RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED = (1 << 18), // this object has stopped being collected + RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED = (1 << 19), // this object has started being collected + RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD = (1 << 20), // this context belongs to a host that just disconnected + RRD_FLAG_UPDATE_REASON_UNUSED = (1 << 21), // this context is not used anymore + RRD_FLAG_UPDATE_REASON_DB_ROTATION = (1 << 22), // this context changed because of a db rotation + + RRD_FLAG_MERGED_COLLECTED_RI_TO_RC = (1 << 29), + + // action to perform on an object + RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION = (1 << 30), // this object has to update its retention from the db +} RRD_FLAGS; + +struct rrdcontext_reason { + RRD_FLAGS flag; + const char *name; + usec_t delay_ut; +}; + +extern struct rrdcontext_reason rrdcontext_reasons[]; + +#define RRD_FLAG_ALL_UPDATE_REASONS ( \ + RRD_FLAG_UPDATE_REASON_TRIGGERED \ + |RRD_FLAG_UPDATE_REASON_LOAD_SQL \ + |RRD_FLAG_UPDATE_REASON_NEW_OBJECT \ + |RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT \ + |RRD_FLAG_UPDATE_REASON_CHANGED_LINKING \ + |RRD_FLAG_UPDATE_REASON_CHANGED_METADATA \ + |RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \ + |RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T \ + |RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T \ + |RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED \ + |RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED \ + |RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD \ + |RRD_FLAG_UPDATE_REASON_DB_ROTATION \ + |RRD_FLAG_UPDATE_REASON_UNUSED \ + ) + +#define RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS ( \ + RRD_FLAG_ARCHIVED \ + |RRD_FLAG_HIDDEN \ + |RRD_FLAG_ALL_UPDATE_REASONS \ + ) + +#define RRD_FLAGS_REQUIRED_FOR_DELETIONS ( \ + RRD_FLAG_DELETED \ + |RRD_FLAG_LIVE_RETENTION \ +) + +#define RRD_FLAGS_PREVENTING_DELETIONS ( \ + RRD_FLAG_QUEUED_FOR_HUB \ + |RRD_FLAG_COLLECTED \ + |RRD_FLAG_QUEUED_FOR_PP \ +) + +// get all the flags of an object +#define rrd_flags_get(obj) __atomic_load_n(&((obj)->flags), __ATOMIC_SEQ_CST) + +// check if ANY of the given flags (bits) is set +#define rrd_flag_check(obj, flag) (rrd_flags_get(obj) & (flag)) + +// check if ALL the given flags (bits) are set +#define rrd_flag_check_all(obj, flag) (rrd_flag_check(obj, flag) == (flag)) + +// set one or more flags (bits) +#define rrd_flag_set(obj, flag) __atomic_or_fetch(&((obj)->flags), flag, __ATOMIC_SEQ_CST) + +// clear one or more flags (bits) +#define rrd_flag_clear(obj, flag) __atomic_and_fetch(&((obj)->flags), ~(flag), __ATOMIC_SEQ_CST) + +// replace the flags of an object, with the supplied ones +#define rrd_flags_replace(obj, all_flags) __atomic_store_n(&((obj)->flags), all_flags, __ATOMIC_SEQ_CST) + +static inline void +rrd_flag_add_remove_atomic(RRD_FLAGS *flags, RRD_FLAGS check, RRD_FLAGS conditionally_add, RRD_FLAGS always_remove) { + RRD_FLAGS expected, desired; + + do { + expected = *flags; + + desired = expected; + desired &= ~(always_remove); + + if(!(expected & check)) + desired |= (check | conditionally_add); + + } while(!__atomic_compare_exchange_n(flags, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); +} + +#define rrd_flag_set_collected(obj) \ + rrd_flag_add_remove_atomic(&((obj)->flags) \ + /* check this flag */ \ + , RRD_FLAG_COLLECTED \ + \ + /* add these flags together with the above, if the above is not already set */ \ + , RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED | RRD_FLAG_UPDATED \ + \ + /* always remove these flags */ \ + , RRD_FLAG_ARCHIVED \ + | RRD_FLAG_DELETED \ + | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED \ + | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \ + | RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD \ + ) + +#define rrd_flag_set_archived(obj) \ + rrd_flag_add_remove_atomic(&((obj)->flags) \ + /* check this flag */ \ + , RRD_FLAG_ARCHIVED \ + \ + /* add these flags together with the above, if the above is not already set */ \ + , RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED | RRD_FLAG_UPDATED \ + \ + /* always remove these flags */ \ + , RRD_FLAG_COLLECTED \ + | RRD_FLAG_DELETED \ + | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED \ + | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \ + ) + +#define rrd_flag_set_deleted(obj, reason) \ + rrd_flag_add_remove_atomic(&((obj)->flags) \ + /* check this flag */ \ + , RRD_FLAG_DELETED \ + \ + /* add these flags together with the above, if the above is not already set */ \ + , RRD_FLAG_UPDATE_REASON_ZERO_RETENTION | RRD_FLAG_UPDATED | (reason) \ + \ + /* always remove these flags */ \ + , RRD_FLAG_ARCHIVED \ + | RRD_FLAG_COLLECTED \ + ) + +#define rrd_flag_is_collected(obj) rrd_flag_check(obj, RRD_FLAG_COLLECTED) +#define rrd_flag_is_archived(obj) rrd_flag_check(obj, RRD_FLAG_ARCHIVED) +#define rrd_flag_is_deleted(obj) rrd_flag_check(obj, RRD_FLAG_DELETED) +#define rrd_flag_is_updated(obj) rrd_flag_check(obj, RRD_FLAG_UPDATED) + +// mark an object as updated, providing reasons (additional bits) +#define rrd_flag_set_updated(obj, reason) rrd_flag_set(obj, RRD_FLAG_UPDATED | (reason)) + +// clear an object as being updated, clearing also all the reasons +#define rrd_flag_unset_updated(obj) rrd_flag_clear(obj, RRD_FLAG_UPDATED | RRD_FLAG_ALL_UPDATE_REASONS) + + +typedef struct rrdmetric { + nd_uuid_t uuid; + + STRING *id; + STRING *name; + + RRDDIM *rrddim; + + time_t first_time_s; + time_t last_time_s; + RRD_FLAGS flags; + + struct rrdinstance *ri; +} RRDMETRIC; + +typedef struct rrdinstance { + nd_uuid_t uuid; + + STRING *id; + STRING *name; + STRING *title; + STRING *units; + STRING *family; + uint32_t priority:24; + RRDSET_TYPE chart_type; + + RRD_FLAGS flags; // flags related to this instance + time_t first_time_s; + time_t last_time_s; + + time_t update_every_s; // data collection frequency + RRDSET *rrdset; // pointer to RRDSET when collected, or NULL + + RRDLABELS *rrdlabels; // linked to RRDSET->chart_labels or own version + + struct rrdcontext *rc; + DICTIONARY *rrdmetrics; + + struct { + uint32_t collected_metrics_count; // a temporary variable to detect BEGIN/END without SET + // don't use it for other purposes + // it goes up and then resets to zero, on every iteration + } internal; +} RRDINSTANCE; + +typedef struct rrdcontext { + uint64_t version; + + STRING *id; + STRING *title; + STRING *units; + STRING *family; + uint32_t priority; + RRDSET_TYPE chart_type; + + SPINLOCK spinlock; + + RRD_FLAGS flags; + time_t first_time_s; + time_t last_time_s; + + VERSIONED_CONTEXT_DATA hub; + + DICTIONARY *rrdinstances; + RRDHOST *rrdhost; + + struct { + RRD_FLAGS queued_flags; // the last flags that triggered the post-processing + usec_t queued_ut; // the last time this was queued + usec_t dequeued_ut; // the last time we sent (or deduplicated) this context + size_t executions; // how many times this context has been processed + } pp; + + struct { + RRD_FLAGS queued_flags; // the last flags that triggered the queueing + usec_t queued_ut; // the last time this was queued + usec_t delay_calc_ut; // the last time we calculated the scheduled_dispatched_ut + usec_t scheduled_dispatch_ut; // the time it was/is scheduled to be sent + usec_t dequeued_ut; // the last time we sent (or deduplicated) this context + size_t dispatches; // the number of times this has been dispatched to hub + } queue; + + struct { + uint32_t metrics; // the number of metrics in this context + } stats; +} RRDCONTEXT; + + +// ---------------------------------------------------------------------------- +// helper one-liners for RRDMETRIC + +bool rrdmetric_update_retention(RRDMETRIC *rm); + +static inline RRDMETRIC *rrdmetric_acquired_value(RRDMETRIC_ACQUIRED *rma) { + return dictionary_acquired_item_value((DICTIONARY_ITEM *)rma); +} + +static inline RRDMETRIC_ACQUIRED *rrdmetric_acquired_dup(RRDMETRIC_ACQUIRED *rma) { + RRDMETRIC *rm = rrdmetric_acquired_value(rma); + return (RRDMETRIC_ACQUIRED *)dictionary_acquired_item_dup(rm->ri->rrdmetrics, (DICTIONARY_ITEM *)rma); +} + +static inline void rrdmetric_release(RRDMETRIC_ACQUIRED *rma) { + RRDMETRIC *rm = rrdmetric_acquired_value(rma); + dictionary_acquired_item_release(rm->ri->rrdmetrics, (DICTIONARY_ITEM *)rma); +} + +void rrdmetric_rrddim_is_freed(RRDDIM *rd); +void rrdmetric_updated_rrddim_flags(RRDDIM *rd); +void rrdmetric_collected_rrddim(RRDDIM *rd); + +// ---------------------------------------------------------------------------- +// helper one-liners for RRDINSTANCE + +static inline RRDINSTANCE *rrdinstance_acquired_value(RRDINSTANCE_ACQUIRED *ria) { + return dictionary_acquired_item_value((DICTIONARY_ITEM *)ria); +} + +static inline RRDINSTANCE_ACQUIRED *rrdinstance_acquired_dup(RRDINSTANCE_ACQUIRED *ria) { + RRDINSTANCE *ri = rrdinstance_acquired_value(ria); + return (RRDINSTANCE_ACQUIRED *)dictionary_acquired_item_dup(ri->rc->rrdinstances, (DICTIONARY_ITEM *)ria); +} + +static inline void rrdinstance_release(RRDINSTANCE_ACQUIRED *ria) { + RRDINSTANCE *ri = rrdinstance_acquired_value(ria); + dictionary_acquired_item_release(ri->rc->rrdinstances, (DICTIONARY_ITEM *)ria); +} + +void rrdinstance_from_rrdset(RRDSET *st); +void rrdinstance_rrdset_is_freed(RRDSET *st); +void rrdinstance_rrdset_has_updated_retention(RRDSET *st); +void rrdinstance_updated_rrdset_name(RRDSET *st); +void rrdinstance_updated_rrdset_flags_no_action(RRDINSTANCE *ri, RRDSET *st); +void rrdinstance_updated_rrdset_flags(RRDSET *st); +void rrdinstance_collected_rrdset(RRDSET *st); + +void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function, RRD_FLAGS flags); + +// ---------------------------------------------------------------------------- +// helper one-liners for RRDCONTEXT + +static inline RRDCONTEXT *rrdcontext_acquired_value(RRDCONTEXT_ACQUIRED *rca) { + return dictionary_acquired_item_value((DICTIONARY_ITEM *)rca); +} + +static inline RRDCONTEXT_ACQUIRED *rrdcontext_acquired_dup(RRDCONTEXT_ACQUIRED *rca) { + RRDCONTEXT *rc = rrdcontext_acquired_value(rca); + return (RRDCONTEXT_ACQUIRED *)dictionary_acquired_item_dup(rc->rrdhost->rrdctx.contexts, (DICTIONARY_ITEM *)rca); +} + +static inline void rrdcontext_release(RRDCONTEXT_ACQUIRED *rca) { + RRDCONTEXT *rc = rrdcontext_acquired_value(rca); + dictionary_acquired_item_release(rc->rrdhost->rrdctx.contexts, (DICTIONARY_ITEM *)rca); +} + +// ---------------------------------------------------------------------------- +// Forward definitions + +void rrdcontext_recalculate_context_retention(RRDCONTEXT *rc, RRD_FLAGS reason, bool worker_jobs); +void rrdcontext_recalculate_host_retention(RRDHOST *host, RRD_FLAGS reason, bool worker_jobs); + +#define rrdcontext_lock(rc) spinlock_lock(&((rc)->spinlock)) +#define rrdcontext_unlock(rc) spinlock_unlock(&((rc)->spinlock)) + +void rrdinstance_trigger_updates(RRDINSTANCE *ri, const char *function); +void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function); + +void rrdinstances_create_in_rrdcontext(RRDCONTEXT *rc); +void rrdinstances_destroy_from_rrdcontext(RRDCONTEXT *rc); + +void rrdmetrics_destroy_from_rrdinstance(RRDINSTANCE *ri); +void rrdmetrics_create_in_rrdinstance(RRDINSTANCE *ri); + +void rrdmetric_from_rrddim(RRDDIM *rd); + +void rrd_reasons_to_buffer_json_array_items(RRD_FLAGS flags, BUFFER *wb); + +#define rrdcontext_version_hash(host) rrdcontext_version_hash_with_callback(host, NULL, false, NULL) +uint64_t rrdcontext_version_hash_with_callback( + RRDHOST *host, + void (*callback)(RRDCONTEXT *, bool, void *), + bool snapshot, + void *bundle); + +void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe_unused, void *bundle __maybe_unused); + +void rrdcontext_update_from_collected_rrdinstance(RRDINSTANCE *ri); + +#endif //NETDATA_RRDCONTEXT_INTERNAL_H diff --git a/src/database/contexts/metric.c b/src/database/contexts/metric.c new file mode 100644 index 000000000..be29f33ea --- /dev/null +++ b/src/database/contexts/metric.c @@ -0,0 +1,327 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "internal.h" + +static void rrdmetric_trigger_updates(RRDMETRIC *rm, const char *function); + +inline const char *rrdmetric_acquired_id(RRDMETRIC_ACQUIRED *rma) { + RRDMETRIC *rm = rrdmetric_acquired_value(rma); + return string2str(rm->id); +} + +inline const char *rrdmetric_acquired_name(RRDMETRIC_ACQUIRED *rma) { + RRDMETRIC *rm = rrdmetric_acquired_value(rma); + return string2str(rm->name); +} + +inline bool rrdmetric_acquired_has_name(RRDMETRIC_ACQUIRED *rma) { + RRDMETRIC *rm = rrdmetric_acquired_value(rma); + return (rm->name && rm->name != rm->id); +} + +inline STRING *rrdmetric_acquired_id_dup(RRDMETRIC_ACQUIRED *rma) { + RRDMETRIC *rm = rrdmetric_acquired_value(rma); + return string_dup(rm->id); +} + +inline STRING *rrdmetric_acquired_name_dup(RRDMETRIC_ACQUIRED *rma) { + RRDMETRIC *rm = rrdmetric_acquired_value(rma); + return string_dup(rm->name); +} + +inline NETDATA_DOUBLE rrdmetric_acquired_last_stored_value(RRDMETRIC_ACQUIRED *rma) { + RRDMETRIC *rm = rrdmetric_acquired_value(rma); + + if(rm->rrddim) + return rm->rrddim->collector.last_stored_value; + + return NAN; +} + +inline bool rrdmetric_acquired_belongs_to_instance(RRDMETRIC_ACQUIRED *rma, RRDINSTANCE_ACQUIRED *ria) { + RRDMETRIC *rm = rrdmetric_acquired_value(rma); + RRDINSTANCE *ri = rrdinstance_acquired_value(ria); + return rm->ri == ri; +} + +inline time_t rrdmetric_acquired_first_entry(RRDMETRIC_ACQUIRED *rma) { + RRDMETRIC *rm = rrdmetric_acquired_value(rma); + return rm->first_time_s; +} + +inline time_t rrdmetric_acquired_last_entry(RRDMETRIC_ACQUIRED *rma) { + RRDMETRIC *rm = rrdmetric_acquired_value(rma); + + if(rrd_flag_check(rm, RRD_FLAG_COLLECTED)) + return 0; + + return rm->last_time_s; +} + +// ---------------------------------------------------------------------------- +// RRDMETRIC + +// free the contents of RRDMETRIC. +// RRDMETRIC itself is managed by DICTIONARY - no need to free it here. +static void rrdmetric_free(RRDMETRIC *rm) { + string_freez(rm->id); + string_freez(rm->name); + + rm->id = NULL; + rm->name = NULL; + rm->ri = NULL; +} + +// called when this rrdmetric is inserted to the rrdmetrics dictionary of a rrdinstance +// the constructor of the rrdmetric object +static void rrdmetric_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdinstance) { + RRDMETRIC *rm = value; + + // link it to its parent + rm->ri = rrdinstance; + + // remove flags that we need to figure out at runtime + rm->flags = rm->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics + + // signal the react callback to do the job + rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_NEW_OBJECT); +} + +// called when this rrdmetric is deleted from the rrdmetrics dictionary of a rrdinstance +// the destructor of the rrdmetric object +static void rrdmetric_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdinstance __maybe_unused) { + RRDMETRIC *rm = value; + + internal_error(rm->rrddim, "RRDMETRIC: '%s' is freed but there is a RRDDIM linked to it.", string2str(rm->id)); + + // free the resources + rrdmetric_free(rm); +} + +// called when the same rrdmetric is inserted again to the rrdmetrics dictionary of a rrdinstance +// while this is called, the dictionary is write locked, but there may be other users of the object +static bool rrdmetric_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *rrdinstance __maybe_unused) { + RRDMETRIC *rm = old_value; + RRDMETRIC *rm_new = new_value; + + internal_error(rm->id != rm_new->id, + "RRDMETRIC: '%s' cannot change id to '%s'", + string2str(rm->id), string2str(rm_new->id)); + + if(!uuid_eq(rm->uuid, rm_new->uuid)) { +#ifdef NETDATA_INTERNAL_CHECKS + char uuid1[UUID_STR_LEN], uuid2[UUID_STR_LEN]; + uuid_unparse(rm->uuid, uuid1); + uuid_unparse(rm_new->uuid, uuid2); + + time_t old_first_time_s = 0; + time_t old_last_time_s = 0; + if(rrdmetric_update_retention(rm)) { + old_first_time_s = rm->first_time_s; + old_last_time_s = rm->last_time_s; + } + + uuid_copy(rm->uuid, rm_new->uuid); + + time_t new_first_time_s = 0; + time_t new_last_time_s = 0; + if(rrdmetric_update_retention(rm)) { + new_first_time_s = rm->first_time_s; + new_last_time_s = rm->last_time_s; + } + + internal_error(true, + "RRDMETRIC: '%s' of instance '%s' of host '%s' changed UUID from '%s' (retention %ld to %ld, %ld secs) to '%s' (retention %ld to %ld, %ld secs)" + , string2str(rm->id) + , string2str(rm->ri->id) + , rrdhost_hostname(rm->ri->rc->rrdhost) + , uuid1, old_first_time_s, old_last_time_s, old_last_time_s - old_first_time_s + , uuid2, new_first_time_s, new_last_time_s, new_last_time_s - new_first_time_s + ); +#else + uuid_copy(rm->uuid, rm_new->uuid); +#endif + rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + + if(rm->rrddim && rm_new->rrddim && rm->rrddim != rm_new->rrddim) { + rm->rrddim = rm_new->rrddim; + rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LINKING); + } + +#ifdef NETDATA_INTERNAL_CHECKS + if(rm->rrddim && !uuid_eq(rm->uuid, rm->rrddim->metric_uuid)) { + char uuid1[UUID_STR_LEN], uuid2[UUID_STR_LEN]; + uuid_unparse(rm->uuid, uuid1); + uuid_unparse(rm_new->uuid, uuid2); + internal_error(true, "RRDMETRIC: '%s' is linked to RRDDIM '%s' but they have different UUIDs. RRDMETRIC has '%s', RRDDIM has '%s'", string2str(rm->id), rrddim_id(rm->rrddim), uuid1, uuid2); + } +#endif + + if(rm->rrddim != rm_new->rrddim) + rm->rrddim = rm_new->rrddim; + + if(rm->name != rm_new->name) { + STRING *old = rm->name; + rm->name = string_dup(rm_new->name); + string_freez(old); + rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + + if(!rm->first_time_s || (rm_new->first_time_s && rm_new->first_time_s < rm->first_time_s)) { + rm->first_time_s = rm_new->first_time_s; + rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); + } + + if(!rm->last_time_s || (rm_new->last_time_s && rm_new->last_time_s > rm->last_time_s)) { + rm->last_time_s = rm_new->last_time_s; + rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); + } + + rrd_flag_set(rm, rm_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no needs for atomics on rm_new + + if(rrd_flag_is_collected(rm) && rrd_flag_is_archived(rm)) + rrd_flag_set_collected(rm); + + if(rrd_flag_check(rm, RRD_FLAG_UPDATED)) + rrd_flag_set(rm, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT); + + rrdmetric_free(rm_new); + + // the react callback will continue from here + return rrd_flag_is_updated(rm); +} + +// this is called after the insert or the conflict callbacks, +// but the dictionary is now unlocked +static void rrdmetric_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdinstance __maybe_unused) { + RRDMETRIC *rm = value; + rrdmetric_trigger_updates(rm, __FUNCTION__ ); +} + +void rrdmetrics_create_in_rrdinstance(RRDINSTANCE *ri) { + if(unlikely(!ri)) return; + if(likely(ri->rrdmetrics)) return; + + ri->rrdmetrics = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + &dictionary_stats_category_rrdcontext, sizeof(RRDMETRIC)); + + dictionary_register_insert_callback(ri->rrdmetrics, rrdmetric_insert_callback, ri); + dictionary_register_delete_callback(ri->rrdmetrics, rrdmetric_delete_callback, ri); + dictionary_register_conflict_callback(ri->rrdmetrics, rrdmetric_conflict_callback, ri); + dictionary_register_react_callback(ri->rrdmetrics, rrdmetric_react_callback, ri); +} + +void rrdmetrics_destroy_from_rrdinstance(RRDINSTANCE *ri) { + if(unlikely(!ri || !ri->rrdmetrics)) return; + dictionary_destroy(ri->rrdmetrics); + ri->rrdmetrics = NULL; +} + +// trigger post-processing of the rrdmetric, escalating changes to the rrdinstance it belongs +static void rrdmetric_trigger_updates(RRDMETRIC *rm, const char *function) { + if(unlikely(rrd_flag_is_collected(rm)) && (!rm->rrddim || rrd_flag_check(rm, RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD))) + rrd_flag_set_archived(rm); + + if(rrd_flag_is_updated(rm) || !rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION)) { + rrd_flag_set_updated(rm->ri, RRD_FLAG_UPDATE_REASON_TRIGGERED); + rrdcontext_queue_for_post_processing(rm->ri->rc, function, rm->flags); + } +} + +// ---------------------------------------------------------------------------- +// RRDMETRIC HOOKS ON RRDDIM + +void rrdmetric_from_rrddim(RRDDIM *rd) { + if(unlikely(!rd->rrdset)) + fatal("RRDMETRIC: rrddim '%s' does not have a rrdset.", rrddim_id(rd)); + + if(unlikely(!rd->rrdset->rrdhost)) + fatal("RRDMETRIC: rrdset '%s' does not have a rrdhost", rrdset_id(rd->rrdset)); + + if(unlikely(!rd->rrdset->rrdcontexts.rrdinstance)) + fatal("RRDMETRIC: rrdset '%s' does not have a rrdinstance", rrdset_id(rd->rrdset)); + + RRDINSTANCE *ri = rrdinstance_acquired_value(rd->rrdset->rrdcontexts.rrdinstance); + + RRDMETRIC trm = { + .id = string_dup(rd->id), + .name = string_dup(rd->name), + .flags = RRD_FLAG_NONE, // no need for atomics + .rrddim = rd, + }; + uuid_copy(trm.uuid, rd->metric_uuid); + + RRDMETRIC_ACQUIRED *rma = (RRDMETRIC_ACQUIRED *)dictionary_set_and_acquire_item(ri->rrdmetrics, string2str(trm.id), &trm, sizeof(trm)); + + if(rd->rrdcontexts.rrdmetric) + rrdmetric_release(rd->rrdcontexts.rrdmetric); + + rd->rrdcontexts.rrdmetric = rma; + rd->rrdcontexts.collected = false; +} + +#define rrddim_get_rrdmetric(rd) rrddim_get_rrdmetric_with_trace(rd, __FUNCTION__) +static inline RRDMETRIC *rrddim_get_rrdmetric_with_trace(RRDDIM *rd, const char *function) { + if(unlikely(!rd->rrdcontexts.rrdmetric)) { + netdata_log_error("RRDMETRIC: RRDDIM '%s' is not linked to an RRDMETRIC at %s()", rrddim_id(rd), function); + return NULL; + } + + RRDMETRIC *rm = rrdmetric_acquired_value(rd->rrdcontexts.rrdmetric); + if(unlikely(!rm)) { + netdata_log_error("RRDMETRIC: RRDDIM '%s' lost the link to its RRDMETRIC at %s()", rrddim_id(rd), function); + return NULL; + } + + if(unlikely(rm->rrddim != rd)) + fatal("RRDMETRIC: '%s' is not linked to RRDDIM '%s' at %s()", string2str(rm->id), rrddim_id(rd), function); + + return rm; +} + +inline void rrdmetric_rrddim_is_freed(RRDDIM *rd) { + RRDMETRIC *rm = rrddim_get_rrdmetric(rd); + if(unlikely(!rm)) return; + + if(unlikely(rrd_flag_is_collected(rm))) + rrd_flag_set_archived(rm); + + rm->rrddim = NULL; + rrdmetric_trigger_updates(rm, __FUNCTION__ ); + rrdmetric_release(rd->rrdcontexts.rrdmetric); + rd->rrdcontexts.rrdmetric = NULL; + rd->rrdcontexts.collected = false; +} + +inline void rrdmetric_updated_rrddim_flags(RRDDIM *rd) { + rd->rrdcontexts.collected = false; + + RRDMETRIC *rm = rrddim_get_rrdmetric(rd); + if(unlikely(!rm)) return; + + if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED|RRDDIM_FLAG_OBSOLETE))) { + if(unlikely(rrd_flag_is_collected(rm))) + rrd_flag_set_archived(rm); + } + + rrdmetric_trigger_updates(rm, __FUNCTION__ ); +} + +inline void rrdmetric_collected_rrddim(RRDDIM *rd) { + if(rd->rrdcontexts.collected) + return; + + rd->rrdcontexts.collected = true; + + RRDMETRIC *rm = rrddim_get_rrdmetric(rd); + if(unlikely(!rm)) return; + + if(unlikely(!rrd_flag_is_collected(rm))) + rrd_flag_set_collected(rm); + + // we use this variable to detect BEGIN/END without SET + rm->ri->internal.collected_metrics_count++; + + rrdmetric_trigger_updates(rm, __FUNCTION__ ); +} diff --git a/database/contexts/query_scope.c b/src/database/contexts/query_scope.c index f3bcd0b3f..f3bcd0b3f 100644 --- a/database/contexts/query_scope.c +++ b/src/database/contexts/query_scope.c diff --git a/database/contexts/query_target.c b/src/database/contexts/query_target.c index 95abc3e65..29a9c3e59 100644 --- a/database/contexts/query_target.c +++ b/src/database/contexts/query_target.c @@ -4,7 +4,7 @@ #define QUERY_TARGET_MAX_REALLOC_INCREASE 500 #define query_target_realloc_size(size, start) \ - (size) ? ((size) < QUERY_TARGET_MAX_REALLOC_INCREASE ? (size) * 2 : (size) + QUERY_TARGET_MAX_REALLOC_INCREASE) : (start); + (size) ? ((size) < QUERY_TARGET_MAX_REALLOC_INCREASE ? (size) * 2 : (size) + QUERY_TARGET_MAX_REALLOC_INCREASE) : (start) static void query_metric_release(QUERY_TARGET *qt, QUERY_METRIC *qm); static void query_dimension_release(QUERY_DIMENSION *qd); @@ -82,7 +82,6 @@ void query_target_release(QUERY_TARGET *qt) { simple_pattern_free(qt->instances.labels_pattern); qt->instances.labels_pattern = NULL; - simple_pattern_free(qt->query.pattern); qt->query.pattern = NULL; @@ -221,10 +220,10 @@ static inline void query_metric_release(QUERY_TARGET *qt, QUERY_METRIC *qm) { // reset the tiers for(size_t tier = 0; tier < storage_tiers ;tier++) { - if(qm->tiers[tier].db_metric_handle) { + if(qm->tiers[tier].smh) { STORAGE_ENGINE *eng = query_metric_storage_engine(qt, qm, tier); - eng->api.metric_release(qm->tiers[tier].db_metric_handle); - qm->tiers[tier].db_metric_handle = NULL; + eng->api.metric_release(qm->tiers[tier].smh); + qm->tiers[tier].smh = NULL; } } } @@ -241,7 +240,7 @@ static bool query_metric_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CON struct { STORAGE_ENGINE *eng; - STORAGE_METRIC_HANDLE *db_metric_handle; + STORAGE_METRIC_HANDLE *smh; time_t db_first_time_s; time_t db_last_time_s; time_t db_update_every_s; @@ -252,14 +251,14 @@ static bool query_metric_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CON tier_retention[tier].eng = eng; tier_retention[tier].db_update_every_s = (time_t) (qn->rrdhost->db[tier].tier_grouping * ri->update_every_s); - if(rm->rrddim && rm->rrddim->tiers[tier].db_metric_handle) - tier_retention[tier].db_metric_handle = eng->api.metric_dup(rm->rrddim->tiers[tier].db_metric_handle); + if(rm->rrddim && rm->rrddim->tiers[tier].smh) + tier_retention[tier].smh = eng->api.metric_dup(rm->rrddim->tiers[tier].smh); else - tier_retention[tier].db_metric_handle = eng->api.metric_get(qn->rrdhost->db[tier].instance, &rm->uuid); + tier_retention[tier].smh = eng->api.metric_get(qn->rrdhost->db[tier].si, &rm->uuid); - if(tier_retention[tier].db_metric_handle) { - tier_retention[tier].db_first_time_s = storage_engine_oldest_time_s(tier_retention[tier].eng->backend, tier_retention[tier].db_metric_handle); - tier_retention[tier].db_last_time_s = storage_engine_latest_time_s(tier_retention[tier].eng->backend, tier_retention[tier].db_metric_handle); + if(tier_retention[tier].smh) { + tier_retention[tier].db_first_time_s = storage_engine_oldest_time_s(tier_retention[tier].eng->seb, tier_retention[tier].smh); + tier_retention[tier].db_last_time_s = storage_engine_latest_time_s(tier_retention[tier].eng->seb, tier_retention[tier].smh); if(!common_first_time_s) common_first_time_s = tier_retention[tier].db_first_time_s; @@ -331,7 +330,7 @@ static bool query_metric_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CON for (size_t tier = 0; tier < storage_tiers; tier++) { internal_fatal(tier_retention[tier].eng != query_metric_storage_engine(qt, qm, tier), "QUERY TARGET: storage engine mismatch"); - qm->tiers[tier].db_metric_handle = tier_retention[tier].db_metric_handle; + qm->tiers[tier].smh = tier_retention[tier].smh; qm->tiers[tier].db_first_time_s = tier_retention[tier].db_first_time_s; qm->tiers[tier].db_last_time_s = tier_retention[tier].db_last_time_s; qm->tiers[tier].db_update_every_s = tier_retention[tier].db_update_every_s; @@ -342,8 +341,10 @@ static bool query_metric_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CON // cleanup anything we allocated to the retention we will not use for(size_t tier = 0; tier < storage_tiers ;tier++) { - if (tier_retention[tier].db_metric_handle) - tier_retention[tier].eng->api.metric_release(tier_retention[tier].db_metric_handle); + if (tier_retention[tier].smh) { + tier_retention[tier].eng->api.metric_release(tier_retention[tier].smh); + tier_retention[tier].smh = NULL; + } } return false; @@ -627,7 +628,7 @@ static bool query_target_match_alert_pattern(RRDINSTANCE_ACQUIRED *ria, SIMPLE_P rw_spinlock_read_lock(&st->alerts.spinlock); if (st->alerts.base) { for (RRDCALC *rc = st->alerts.base; rc; rc = rc->next) { - SIMPLE_PATTERN_RESULT ret = simple_pattern_matches_string_extract(pattern, rc->name, NULL, 0); + SIMPLE_PATTERN_RESULT ret = simple_pattern_matches_string_extract(pattern, rc->config.name, NULL, 0); if(ret == SP_MATCHED_POSITIVE) { matched = true; @@ -641,7 +642,7 @@ static bool query_target_match_alert_pattern(RRDINSTANCE_ACQUIRED *ria, SIMPLE_P else buffer_flush(wb); - buffer_fast_strcat(wb, string2str(rc->name), string_strlen(rc->name)); + buffer_fast_strcat(wb, string2str(rc->config.name), string_strlen(rc->config.name)); buffer_fast_strcat(wb, ":", 1); buffer_strcat(wb, rrdcalc_status2string(rc->status)); @@ -725,13 +726,22 @@ static inline SIMPLE_PATTERN_RESULT query_instance_matches(QUERY_INSTANCE *qi, return ret; } -static inline bool query_instance_matches_labels(RRDINSTANCE *ri, SIMPLE_PATTERN *chart_label_key_sp, SIMPLE_PATTERN *labels_sp) { - if ((chart_label_key_sp && !rrdlabels_match_simple_pattern_parsed( - ri->rrdlabels, chart_label_key_sp, '\0', NULL)) || - (labels_sp && !rrdlabels_match_simple_pattern_parsed( - ri->rrdlabels, labels_sp, ':', NULL))) +static inline bool query_instance_matches_labels( + RRDINSTANCE *ri, + SIMPLE_PATTERN *chart_label_key_sp, + SIMPLE_PATTERN *labels_sp) +{ + + if (chart_label_key_sp && rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, chart_label_key_sp, '\0', NULL) != SP_MATCHED_POSITIVE) return false; + if (labels_sp) { + struct pattern_array *pa = pattern_array_add_simple_pattern(NULL, labels_sp, ':'); + bool found = pattern_array_label_match(pa, ri->rrdlabels, ':', NULL); + pattern_array_free(pa); + return found; + } + return true; } @@ -752,7 +762,10 @@ static bool query_instance_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_C qi, ri, qt->instances.pattern, qtl->match_ids, qtl->match_names, qt->request.version, qtl->host_node_id_str)); if(queryable_instance) - queryable_instance = query_instance_matches_labels(ri, qt->instances.chart_label_key_pattern, qt->instances.labels_pattern); + queryable_instance = query_instance_matches_labels( + ri, + qt->instances.chart_label_key_pattern, + qt->instances.labels_pattern); if(queryable_instance) { if(qt->instances.alerts_pattern && !query_target_match_alert_pattern(ria, qt->instances.alerts_pattern)) @@ -1216,6 +1229,5 @@ ssize_t weights_foreach_rrdmetric_in_context(RRDCONTEXT_ACQUIRED *rca, break; } dfe_done(ri); - return count; } diff --git a/database/contexts/rrdcontext.c b/src/database/contexts/rrdcontext.c index 9dee39be2..f755e1f7e 100644 --- a/database/contexts/rrdcontext.c +++ b/src/database/contexts/rrdcontext.c @@ -105,7 +105,7 @@ void rrdcontext_db_rotation(void) { rrdcontext_next_db_rotation_ut = now_realtime_usec() + FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS * USEC_PER_SEC; } -int rrdcontext_find_dimension_uuid(RRDSET *st, const char *id, uuid_t *store_uuid) { +int rrdcontext_find_dimension_uuid(RRDSET *st, const char *id, nd_uuid_t *store_uuid) { if(!st->rrdhost) return 1; if(!st->context) return 2; @@ -139,7 +139,7 @@ int rrdcontext_find_dimension_uuid(RRDSET *st, const char *id, uuid_t *store_uui return 0; } -int rrdcontext_find_chart_uuid(RRDSET *st, uuid_t *store_uuid) { +int rrdcontext_find_chart_uuid(RRDSET *st, nd_uuid_t *store_uuid) { if(!st->rrdhost) return 1; if(!st->context) return 2; @@ -203,23 +203,6 @@ static bool rrdhost_check_our_claim_id(const char *claim_id) { return (strcasecmp(claim_id, localhost->aclk_state.claimed_id) == 0) ? true : false; } -static RRDHOST *rrdhost_find_by_node_id(const char *node_id) { - uuid_t uuid; - if (uuid_parse(node_id, uuid)) - return NULL; - - RRDHOST *host = NULL; - dfe_start_read(rrdhost_root_index, host) { - if(!host->node_id) continue; - - if(uuid_memcmp(&uuid, host->node_id) == 0) - break; - } - dfe_done(host); - - return host; -} - void rrdcontext_hub_checkpoint_command(void *ptr) { struct ctxs_checkpoint *cmd = ptr; @@ -234,7 +217,7 @@ void rrdcontext_hub_checkpoint_command(void *ptr) { return; } - RRDHOST *host = rrdhost_find_by_node_id(cmd->node_id); + RRDHOST *host = find_host_by_node_id(cmd->node_id); if(!host) { nd_log(NDLS_DAEMON, NDLP_WARNING, "RRDCONTEXT: received checkpoint command for claim id '%s', node id '%s', " @@ -308,7 +291,7 @@ void rrdcontext_hub_stop_streaming_command(void *ptr) { return; } - RRDHOST *host = rrdhost_find_by_node_id(cmd->node_id); + RRDHOST *host = find_host_by_node_id(cmd->node_id); if(!host) { nd_log(NDLS_DAEMON, NDLP_WARNING, "RRDCONTEXT: received stop streaming command for claim id '%s', node id '%s', " diff --git a/database/contexts/rrdcontext.h b/src/database/contexts/rrdcontext.h index 9c497a5a5..9fea55d38 100644 --- a/database/contexts/rrdcontext.h +++ b/src/database/contexts/rrdcontext.h @@ -99,7 +99,7 @@ void rrdcontext_updated_rrddim_multiplier(RRDDIM *rd); void rrdcontext_updated_rrddim_divisor(RRDDIM *rd); void rrdcontext_updated_rrddim_flags(RRDDIM *rd); void rrdcontext_collected_rrddim(RRDDIM *rd); -int rrdcontext_find_dimension_uuid(RRDSET *st, const char *id, uuid_t *store_uuid); +int rrdcontext_find_dimension_uuid(RRDSET *st, const char *id, nd_uuid_t *store_uuid); // ---------------------------------------------------------------------------- // public API for rrdsets @@ -110,7 +110,7 @@ void rrdcontext_updated_rrdset_name(RRDSET *st); void rrdcontext_updated_rrdset_flags(RRDSET *st); void rrdcontext_updated_retention_rrdset(RRDSET *st); void rrdcontext_collected_rrdset(RRDSET *st); -int rrdcontext_find_chart_uuid(RRDSET *st, uuid_t *store_uuid); +int rrdcontext_find_chart_uuid(RRDSET *st, nd_uuid_t *store_uuid); // ---------------------------------------------------------------------------- // public API for ACLK @@ -165,7 +165,7 @@ typedef struct query_alerts_counts { // counts the number of alerts related t size_t other; // number of alerts in any other state } QUERY_ALERTS_COUNTS; -typedef struct query_node { +typedef struct _query_node { uint32_t slot; RRDHOST *rrdhost; char node_id[UUID_STR_LEN]; @@ -177,7 +177,7 @@ typedef struct query_node { QUERY_ALERTS_COUNTS alerts; } QUERY_NODE; -typedef struct query_context { +typedef struct _query_context { uint32_t slot; RRDCONTEXT_ACQUIRED *rca; @@ -187,7 +187,7 @@ typedef struct query_context { QUERY_ALERTS_COUNTS alerts; } QUERY_CONTEXT; -typedef struct query_instance { +typedef struct _query_instance { uint32_t slot; uint32_t query_host_id; RRDINSTANCE_ACQUIRED *ria; @@ -199,18 +199,18 @@ typedef struct query_instance { QUERY_ALERTS_COUNTS alerts; } QUERY_INSTANCE; -typedef struct query_dimension { +typedef struct _query_dimension { uint32_t slot; uint32_t priority; RRDMETRIC_ACQUIRED *rma; QUERY_STATUS status; } QUERY_DIMENSION; -typedef struct query_metric { +typedef struct _query_metric { RRDR_DIMENSION_FLAGS status; struct query_metric_tier { - STORAGE_METRIC_HANDLE *db_metric_handle; + STORAGE_METRIC_HANDLE *smh; time_t db_first_time_s; // the oldest timestamp available for this tier time_t db_last_time_s; // the latest timestamp available for this tier time_t db_update_every_s; // latest update every for this tier @@ -299,6 +299,8 @@ typedef struct query_target_request { qt_interrupt_callback_t interrupt_callback; void *interrupt_callback_data; + + nd_uuid_t *transaction; } QUERY_TARGET_REQUEST; #define GROUP_BY_MAX_LABEL_KEYS 10 @@ -419,9 +421,9 @@ typedef struct query_target { struct sql_alert_transition_data { usec_t global_id; - uuid_t *transition_id; - uuid_t *host_id; - uuid_t *config_hash_id; + nd_uuid_t *transition_id; + nd_uuid_t *host_id; + nd_uuid_t *config_hash_id; uint32_t alarm_id; const char *alert_name; const char *chart; @@ -452,21 +454,16 @@ struct sql_alert_transition_data { }; struct sql_alert_config_data { - uuid_t *config_hash_id; + nd_uuid_t *config_hash_id; const char *name; struct { const char *on_template; const char *on_key; - const char *os; - const char *hosts; const char *families; - const char *plugin; - const char *module; const char *host_labels; const char *chart_labels; - const char *charts; } selectors; const char *info; @@ -479,6 +476,10 @@ struct sql_alert_config_data { struct { const char *dimensions; const char *method; + ALERT_LOOKUP_TIME_GROUP_CONDITION time_group_condition; + NETDATA_DOUBLE time_group_value; + ALERT_LOOKUP_DIMS_GROUPING dims_group; + ALERT_LOOKUP_DATA_SOURCE data_source; uint32_t options; int32_t after; @@ -538,9 +539,9 @@ struct sql_alert_instance_v2_entry { time_t last_updated; time_t last_status_change; NETDATA_DOUBLE last_status_change_value; - uuid_t config_hash_id; + nd_uuid_t config_hash_id; usec_t global_id; - uuid_t last_transition_id; + nd_uuid_t last_transition_id; uint32_t alarm_id; RRDHOST *host; size_t ni; diff --git a/database/contexts/worker.c b/src/database/contexts/worker.c index 9d7c18863..6012c14f5 100644 --- a/database/contexts/worker.c +++ b/src/database/contexts/worker.c @@ -239,7 +239,7 @@ bool rrdmetric_update_retention(RRDMETRIC *rm) { STORAGE_ENGINE *eng = rrdhost->db[tier].eng; time_t first_time_t, last_time_t; - if (eng->api.metric_retention_by_uuid(rrdhost->db[tier].instance, &rm->uuid, &first_time_t, &last_time_t)) { + if (eng->api.metric_retention_by_uuid(rrdhost->db[tier].si, &rm->uuid, &first_time_t, &last_time_t)) { if (first_time_t < min_first_time_t) min_first_time_t = first_time_t; @@ -753,30 +753,26 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function __maybe_unused, RRD_FLAGS flags __maybe_unused) { if(unlikely(!rc->rrdhost->rrdctx.pp_queue)) return; - if(!rrd_flag_check(rc, RRD_FLAG_QUEUED_FOR_PP)) { - dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx.pp_queue, - string2str(rc->id), - rc, - sizeof(*rc)); - -#if(defined(NETDATA_INTERNAL_CHECKS) && defined(LOG_POST_PROCESSING_QUEUE_INSERTIONS)) - { - BUFFER *wb_flags = buffer_create(1000); - rrd_flags_to_buffer(flags, wb_flags); - - BUFFER *wb_reasons = buffer_create(1000); - rrd_reasons_to_buffer(flags, wb_reasons); - - internal_error(true, "RRDCONTEXT: '%s' update triggered by function %s(), due to flags: %s, reasons: %s", - string2str(rc->id), function, - buffer_tostring(wb_flags), - buffer_tostring(wb_reasons)); - - buffer_free(wb_reasons); - buffer_free(wb_flags); - } -#endif +#if 0 + if(string_strcmp(rc->id, "system.cpu") == 0) { + CLEAN_BUFFER *wb = buffer_create(0, NULL); + buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY); + buffer_json_member_add_array(wb, "flags"); + rrd_flags_to_buffer_json_array_items(rc->flags, wb); + buffer_json_array_close(wb); + buffer_json_member_add_array(wb, "reasons"); + rrd_reasons_to_buffer_json_array_items(rc->flags, wb); + buffer_json_array_close(wb); + buffer_json_finalize(wb); + nd_log(NDLS_DAEMON, NDLP_EMERG, "%s() context '%s', triggered: %s", + function, string2str(rc->id), buffer_tostring(wb)); } +#endif + + dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx.pp_queue, + string2str(rc->id), + rc, + sizeof(*rc)); } static void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc) { @@ -960,16 +956,13 @@ static void rrdcontext_dequeue_from_hub_queue(RRDCONTEXT *rc) { static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now_ut) { // check if we have received a streaming command for this host - if(!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS) || !aclk_connected || !host->rrdctx.hub_queue) + if(!host->node_id || !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS) || !aclk_connected || !host->rrdctx.hub_queue) return; // check if there are queued items to send if(!dictionary_entries(host->rrdctx.hub_queue)) return; - if(!host->node_id) - return; - size_t messages_added = 0; contexts_updated_t bundle = NULL; @@ -1060,8 +1053,10 @@ static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now // ---------------------------------------------------------------------------- // worker thread -static void rrdcontext_main_cleanup(void *ptr) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; +static void rrdcontext_main_cleanup(void *pptr) { + struct netdata_static_thread *static_thread = CLEANUP_FUNCTION_GET_PTR(pptr); + if(!static_thread) return; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; // custom code @@ -1071,83 +1066,82 @@ static void rrdcontext_main_cleanup(void *ptr) { } void *rrdcontext_main(void *ptr) { - netdata_thread_cleanup_push(rrdcontext_main_cleanup, ptr); - - worker_register("RRDCONTEXT"); - worker_register_job_name(WORKER_JOB_HOSTS, "hosts"); - worker_register_job_name(WORKER_JOB_CHECK, "dedup checks"); - worker_register_job_name(WORKER_JOB_SEND, "sent contexts"); - worker_register_job_name(WORKER_JOB_DEQUEUE, "deduplicated contexts"); - worker_register_job_name(WORKER_JOB_RETENTION, "metrics retention"); - worker_register_job_name(WORKER_JOB_QUEUED, "queued contexts"); - worker_register_job_name(WORKER_JOB_CLEANUP, "cleanups"); - worker_register_job_name(WORKER_JOB_CLEANUP_DELETE, "deletes"); - worker_register_job_name(WORKER_JOB_PP_METRIC, "check metrics"); - worker_register_job_name(WORKER_JOB_PP_INSTANCE, "check instances"); - worker_register_job_name(WORKER_JOB_PP_CONTEXT, "check contexts"); - - worker_register_job_custom_metric(WORKER_JOB_HUB_QUEUE_SIZE, "hub queue size", "contexts", WORKER_METRIC_ABSOLUTE); - worker_register_job_custom_metric(WORKER_JOB_PP_QUEUE_SIZE, "post processing queue size", "contexts", WORKER_METRIC_ABSOLUTE); - - heartbeat_t hb; - heartbeat_init(&hb); - usec_t step = RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC; - - while (service_running(SERVICE_CONTEXT)) { - worker_is_idle(); - heartbeat_next(&hb, step); - - if(unlikely(!service_running(SERVICE_CONTEXT))) break; - - usec_t now_ut = now_realtime_usec(); - - if(rrdcontext_next_db_rotation_ut && now_ut > rrdcontext_next_db_rotation_ut) { - rrdcontext_recalculate_retention_all_hosts(); - rrdcontext_garbage_collect_for_all_hosts(); - rrdcontext_next_db_rotation_ut = 0; - } + CLEANUP_FUNCTION_REGISTER(rrdcontext_main_cleanup) cleanup_ptr = ptr; + + worker_register("RRDCONTEXT"); + worker_register_job_name(WORKER_JOB_HOSTS, "hosts"); + worker_register_job_name(WORKER_JOB_CHECK, "dedup checks"); + worker_register_job_name(WORKER_JOB_SEND, "sent contexts"); + worker_register_job_name(WORKER_JOB_DEQUEUE, "deduplicated contexts"); + worker_register_job_name(WORKER_JOB_RETENTION, "metrics retention"); + worker_register_job_name(WORKER_JOB_QUEUED, "queued contexts"); + worker_register_job_name(WORKER_JOB_CLEANUP, "cleanups"); + worker_register_job_name(WORKER_JOB_CLEANUP_DELETE, "deletes"); + worker_register_job_name(WORKER_JOB_PP_METRIC, "check metrics"); + worker_register_job_name(WORKER_JOB_PP_INSTANCE, "check instances"); + worker_register_job_name(WORKER_JOB_PP_CONTEXT, "check contexts"); + + worker_register_job_custom_metric(WORKER_JOB_HUB_QUEUE_SIZE, "hub queue size", "contexts", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(WORKER_JOB_PP_QUEUE_SIZE, "post processing queue size", "contexts", WORKER_METRIC_ABSOLUTE); + + heartbeat_t hb; + heartbeat_init(&hb); + usec_t step = RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC; + + while (service_running(SERVICE_CONTEXT)) { + worker_is_idle(); + heartbeat_next(&hb, step); + + if(unlikely(!service_running(SERVICE_CONTEXT))) break; + + usec_t now_ut = now_realtime_usec(); + + if(rrdcontext_next_db_rotation_ut && now_ut > rrdcontext_next_db_rotation_ut) { + rrdcontext_recalculate_retention_all_hosts(); + rrdcontext_garbage_collect_for_all_hosts(); + rrdcontext_next_db_rotation_ut = 0; + } - size_t hub_queued_contexts_for_all_hosts = 0; - size_t pp_queued_contexts_for_all_hosts = 0; + size_t hub_queued_contexts_for_all_hosts = 0; + size_t pp_queued_contexts_for_all_hosts = 0; - RRDHOST *host; - dfe_start_reentrant(rrdhost_root_index, host) { - if(unlikely(!service_running(SERVICE_CONTEXT))) break; + RRDHOST *host; + dfe_start_reentrant(rrdhost_root_index, host) { + if(unlikely(!service_running(SERVICE_CONTEXT))) break; - worker_is_busy(WORKER_JOB_HOSTS); + worker_is_busy(WORKER_JOB_HOSTS); - if(host->rrdctx.pp_queue) { - pp_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.pp_queue); - rrdcontext_post_process_queued_contexts(host); - dictionary_garbage_collect(host->rrdctx.pp_queue); - } - - if(host->rrdctx.hub_queue) { - hub_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.hub_queue); - rrdcontext_dispatch_queued_contexts_to_hub(host, now_ut); - dictionary_garbage_collect(host->rrdctx.hub_queue); - } + if(host->rrdctx.pp_queue) { + pp_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.pp_queue); + rrdcontext_post_process_queued_contexts(host); + dictionary_garbage_collect(host->rrdctx.pp_queue); + } - if (host->rrdctx.contexts) - dictionary_garbage_collect(host->rrdctx.contexts); + if(host->rrdctx.hub_queue) { + hub_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.hub_queue); + rrdcontext_dispatch_queued_contexts_to_hub(host, now_ut); + dictionary_garbage_collect(host->rrdctx.hub_queue); + } - // calculate the number of metrics and instances in the host - RRDCONTEXT *rc; - uint32_t metrics = 0, instances = 0; - dfe_start_read(host->rrdctx.contexts, rc) { - metrics += rc->stats.metrics; - instances += dictionary_entries(rc->rrdinstances); - } - dfe_done(rc); - host->rrdctx.metrics = metrics; - host->rrdctx.instances = instances; - } - dfe_done(host); + if (host->rrdctx.contexts) + dictionary_garbage_collect(host->rrdctx.contexts); - worker_set_metric(WORKER_JOB_HUB_QUEUE_SIZE, (NETDATA_DOUBLE)hub_queued_contexts_for_all_hosts); - worker_set_metric(WORKER_JOB_PP_QUEUE_SIZE, (NETDATA_DOUBLE)pp_queued_contexts_for_all_hosts); + // calculate the number of metrics and instances in the host + RRDCONTEXT *rc; + uint32_t metrics = 0, instances = 0; + dfe_start_read(host->rrdctx.contexts, rc) { + metrics += rc->stats.metrics; + instances += dictionary_entries(rc->rrdinstances); } + dfe_done(rc); + host->rrdctx.metrics = metrics; + host->rrdctx.instances = instances; + } + dfe_done(host); + + worker_set_metric(WORKER_JOB_HUB_QUEUE_SIZE, (NETDATA_DOUBLE)hub_queued_contexts_for_all_hosts); + worker_set_metric(WORKER_JOB_PP_QUEUE_SIZE, (NETDATA_DOUBLE)pp_queued_contexts_for_all_hosts); + } - netdata_thread_cleanup_pop(1); return NULL; } |