diff options
Diffstat (limited to '')
-rw-r--r-- | database/rrdcontext.c | 502 |
1 files changed, 279 insertions, 223 deletions
diff --git a/database/rrdcontext.c b/database/rrdcontext.c index 3413d1ea8..9fc605f32 100644 --- a/database/rrdcontext.c +++ b/database/rrdcontext.c @@ -221,8 +221,8 @@ typedef struct rrdmetric { RRDDIM *rrddim; - time_t first_time_t; - time_t last_time_t; + time_t first_time_s; + time_t last_time_s; RRD_FLAGS flags; struct rrdinstance *ri; @@ -240,10 +240,10 @@ typedef struct rrdinstance { RRDSET_TYPE chart_type; RRD_FLAGS flags; // flags related to this instance - time_t first_time_t; - time_t last_time_t; + time_t first_time_s; + time_t last_time_s; - int update_every; // data collection frequency + time_t update_every_s; // data collection frequency RRDSET *rrdset; // pointer to RRDSET when collected, or NULL DICTIONARY *rrdlabels; // linked to RRDSET->chart_labels or own version @@ -269,8 +269,8 @@ typedef struct rrdcontext { RRDSET_TYPE chart_type; RRD_FLAGS flags; - time_t first_time_t; - time_t last_time_t; + time_t first_time_s; + time_t last_time_s; VERSIONED_CONTEXT_DATA hub; @@ -522,20 +522,20 @@ static bool rrdmetric_conflict_callback(const DICTIONARY_ITEM *item __maybe_unus uuid_unparse(rm->uuid, uuid1); uuid_unparse(rm_new->uuid, uuid2); - time_t old_first_time_t = 0; - time_t old_last_time_t = 0; + time_t old_first_time_s = 0; + time_t old_last_time_s = 0; if(rrdmetric_update_retention(rm)) { - old_first_time_t = rm->first_time_t; - old_last_time_t = rm->last_time_t; + old_first_time_s = rm->first_time_s; + old_last_time_s = rm->last_time_s; } uuid_copy(rm->uuid, rm_new->uuid); - time_t new_first_time_t = 0; - time_t new_last_time_t = 0; + time_t new_first_time_s = 0; + time_t new_last_time_s = 0; if(rrdmetric_update_retention(rm)) { - new_first_time_t = rm->first_time_t; - new_last_time_t = rm->last_time_t; + new_first_time_s = rm->first_time_s; + new_last_time_s = rm->last_time_s; } internal_error(true, @@ -543,8 +543,8 @@ static bool rrdmetric_conflict_callback(const DICTIONARY_ITEM *item __maybe_unus , string2str(rm->id) , string2str(rm->ri->id) , rrdhost_hostname(rm->ri->rc->rrdhost) - , uuid1, old_first_time_t, old_last_time_t, old_last_time_t - old_first_time_t - , uuid2, new_first_time_t, new_last_time_t, new_last_time_t - new_first_time_t + , uuid1, old_first_time_s, old_last_time_s, old_last_time_s - old_first_time_s + , uuid2, new_first_time_s, new_last_time_s, new_last_time_s - new_first_time_s ); #else uuid_copy(rm->uuid, rm_new->uuid); @@ -576,13 +576,13 @@ static bool rrdmetric_conflict_callback(const DICTIONARY_ITEM *item __maybe_unus rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); } - if(!rm->first_time_t || (rm_new->first_time_t && rm_new->first_time_t < rm->first_time_t)) { - rm->first_time_t = rm_new->first_time_t; + if(!rm->first_time_s || (rm_new->first_time_s && rm_new->first_time_s < rm->first_time_s)) { + rm->first_time_s = rm_new->first_time_s; rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); } - if(!rm->last_time_t || (rm_new->last_time_t && rm_new->last_time_t > rm->last_time_t)) { - rm->last_time_t = rm_new->last_time_t; + if(!rm->last_time_s || (rm_new->last_time_s && rm_new->last_time_s > rm->last_time_s)) { + rm->last_time_s = rm_new->last_time_s; rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); } @@ -611,7 +611,9 @@ static void rrdmetrics_create_in_rrdinstance(RRDINSTANCE *ri) { if(unlikely(!ri)) return; if(likely(ri->rrdmetrics)) return; - ri->rrdmetrics = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + ri->rrdmetrics = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + &dictionary_stats_category_rrdcontext, sizeof(RRDMETRIC)); + dictionary_register_insert_callback(ri->rrdmetrics, rrdmetric_insert_callback, ri); dictionary_register_delete_callback(ri->rrdmetrics, rrdmetric_delete_callback, ri); dictionary_register_conflict_callback(ri->rrdmetrics, rrdmetric_conflict_callback, ri); @@ -750,11 +752,6 @@ static void rrdinstance_free(RRDINSTANCE *ri) { } static void rrdinstance_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdcontext) { - static STRING *ml_anomaly_rates_id = NULL; - - if(unlikely(!ml_anomaly_rates_id)) - ml_anomaly_rates_id = string_strdupz(ML_ANOMALY_RATES_CHART_ID); - RRDINSTANCE *ri = value; // link it to its parent @@ -781,10 +778,6 @@ static void rrdinstance_insert_callback(const DICTIONARY_ITEM *item __maybe_unus ri->flags &= ~RRD_FLAG_HIDDEN; // no need of atomics at the constructor } - // we need this when loading from SQL - if(unlikely(ri->id == ml_anomaly_rates_id)) - ri->flags |= RRD_FLAG_HIDDEN; // no need of atomics at the constructor - rrdmetrics_create_in_rrdinstance(ri); // signal the react callback to do the job @@ -872,8 +865,8 @@ static bool rrdinstance_conflict_callback(const DICTIONARY_ITEM *item __maybe_un rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); } - if(ri->update_every != ri_new->update_every) { - ri->update_every = ri_new->update_every; + if(ri->update_every_s != ri_new->update_every_s) { + ri->update_every_s = ri_new->update_every_s; rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); } @@ -923,7 +916,9 @@ static void rrdinstance_react_callback(const DICTIONARY_ITEM *item __maybe_unuse void rrdinstances_create_in_rrdcontext(RRDCONTEXT *rc) { if(unlikely(!rc || rc->rrdinstances)) return; - rc->rrdinstances = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + rc->rrdinstances = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + &dictionary_stats_category_rrdcontext, sizeof(RRDINSTANCE)); + dictionary_register_insert_callback(rc->rrdinstances, rrdinstance_insert_callback, rc); dictionary_register_delete_callback(rc->rrdinstances, rrdinstance_delete_callback, rc); dictionary_register_conflict_callback(rc->rrdinstances, rrdinstance_conflict_callback, rc); @@ -945,8 +940,8 @@ static void rrdinstance_trigger_updates(RRDINSTANCE *ri, const char *function) { ri->priority = st->priority; rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); } - if(unlikely(st->update_every != ri->update_every)) { - ri->update_every = st->update_every; + if(unlikely(st->update_every != ri->update_every_s)) { + ri->update_every_s = st->update_every; rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); } } @@ -989,7 +984,7 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) { .title = string_dup(st->title), .chart_type = st->chart_type, .priority = st->priority, - .update_every = st->update_every, + .update_every_s = st->update_every, .flags = RRD_FLAG_NONE, // no need for atomics .rrdset = st, }; @@ -1027,8 +1022,8 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) { RRDMETRIC *rm_old = rrdmetric_acquired_value(rd->rrdmetric); rrd_flags_replace(rm_old, RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); rm_old->rrddim = NULL; - rm_old->first_time_t = 0; - rm_old->last_time_t = 0; + rm_old->first_time_s = 0; + rm_old->last_time_s = 0; rrdmetric_release(rd->rrdmetric); rd->rrdmetric = NULL; @@ -1043,8 +1038,8 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) { rrd_flags_replace(ri_old, RRD_FLAG_OWN_LABELS|RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); ri_old->rrdset = NULL; - ri_old->first_time_t = 0; - ri_old->last_time_t = 0; + ri_old->first_time_s = 0; + ri_old->last_time_s = 0; rrdinstance_trigger_updates(ri_old, __FUNCTION__ ); rrdinstance_release(ria_old); @@ -1054,8 +1049,8 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) { if(!dictionary_entries(rc_old->rrdinstances) && !dictionary_stats_referenced_items(rc_old->rrdinstances)) { rrdcontext_lock(rc_old); rc_old->flags = ((rc_old->flags & RRD_FLAG_QUEUED)?RRD_FLAG_QUEUED:RRD_FLAG_NONE)|RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION; - rc_old->first_time_t = 0; - rc_old->last_time_t = 0; + rc_old->first_time_s = 0; + rc_old->last_time_s = 0; rrdcontext_unlock(rc_old); rrdcontext_trigger_updates(rc_old, __FUNCTION__ ); } @@ -1233,13 +1228,13 @@ static void rrdcontext_insert_callback(const DICTIONARY_ITEM *item __maybe_unuse rc->version = rc->hub.version; rc->priority = rc->hub.priority; - rc->first_time_t = (time_t)rc->hub.first_time_t; - rc->last_time_t = (time_t)rc->hub.last_time_t; + rc->first_time_s = (time_t)rc->hub.first_time_s; + rc->last_time_s = (time_t)rc->hub.last_time_s; - if(rc->hub.deleted || !rc->hub.first_time_t) + if(rc->hub.deleted || !rc->hub.first_time_s) rrd_flag_set_deleted(rc, RRD_FLAG_NONE); else { - if (rc->last_time_t == 0) + if (rc->last_time_s == 0) rrd_flag_set_collected(rc); else rrd_flag_set_archived(rc); @@ -1401,18 +1396,20 @@ void rrdhost_create_rrdcontexts(RRDHOST *host) { if(unlikely(!host)) return; if(likely(host->rrdctx)) return; - host->rrdctx = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + host->rrdctx = (RRDCONTEXTS *)dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + &dictionary_stats_category_rrdcontext, sizeof(RRDCONTEXT)); + dictionary_register_insert_callback((DICTIONARY *)host->rrdctx, rrdcontext_insert_callback, host); dictionary_register_delete_callback((DICTIONARY *)host->rrdctx, rrdcontext_delete_callback, host); dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx, rrdcontext_conflict_callback, host); dictionary_register_react_callback((DICTIONARY *)host->rrdctx, rrdcontext_react_callback, host); - host->rrdctx_hub_queue = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE); + host->rrdctx_hub_queue = (RRDCONTEXTS *)dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0); dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_insert_callback, NULL); dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_delete_callback, NULL); dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_conflict_callback, NULL); - host->rrdctx_post_processing_queue = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE); + host->rrdctx_post_processing_queue = (RRDCONTEXTS *)dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0); dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_post_processing_queue, rrdcontext_post_processing_queue_insert_callback, NULL); dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_post_processing_queue, rrdcontext_post_processing_queue_delete_callback, NULL); dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx_post_processing_queue, rrdcontext_post_processing_queue_conflict_callback, NULL); @@ -1743,8 +1740,8 @@ struct rrdcontext_to_json { SIMPLE_PATTERN *chart_dimensions; size_t written; time_t now; - time_t combined_first_time_t; - time_t combined_last_time_t; + time_t combined_first_time_s; + time_t combined_last_time_s; RRD_FLAGS combined_flags; }; @@ -1760,10 +1757,10 @@ static inline int rrdmetric_to_json_callback(const DICTIONARY_ITEM *item, void * if(unlikely(rrd_flag_is_deleted(rm) && !(options & RRDCONTEXT_OPTION_SHOW_DELETED))) return 0; - if(after && (!rm->last_time_t || after > rm->last_time_t)) + if(after && (!rm->last_time_s || after > rm->last_time_s)) return 0; - if(before && (!rm->first_time_t || before < rm->first_time_t)) + if(before && (!rm->first_time_s || before < rm->first_time_s)) return 0; if(t->chart_dimensions @@ -1773,14 +1770,14 @@ static inline int rrdmetric_to_json_callback(const DICTIONARY_ITEM *item, void * if(t->written) { buffer_strcat(wb, ",\n"); - t->combined_first_time_t = MIN(t->combined_first_time_t, rm->first_time_t); - t->combined_last_time_t = MAX(t->combined_last_time_t, rm->last_time_t); + t->combined_first_time_s = MIN(t->combined_first_time_s, rm->first_time_s); + t->combined_last_time_s = MAX(t->combined_last_time_s, rm->last_time_s); t->combined_flags |= rrd_flags_get(rm); } else { buffer_strcat(wb, "\n"); - t->combined_first_time_t = rm->first_time_t; - t->combined_last_time_t = rm->last_time_t; + t->combined_first_time_s = rm->first_time_s; + t->combined_last_time_s = rm->last_time_s; t->combined_flags = rrd_flags_get(rm); } @@ -1798,8 +1795,8 @@ static inline int rrdmetric_to_json_callback(const DICTIONARY_ITEM *item, void * ",\n\t\t\t\t\t\t\t\"last_time_t\":%lld" ",\n\t\t\t\t\t\t\t\"collected\":%s" , string2str(rm->name) - , (long long)rm->first_time_t - , rrd_flag_is_collected(rm) ? (long long)t->now : (long long)rm->last_time_t + , (long long)rm->first_time_s + , rrd_flag_is_collected(rm) ? (long long)t->now : (long long)rm->last_time_s , rrd_flag_is_collected(rm) ? "true" : "false" ); @@ -1835,10 +1832,10 @@ static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void if(unlikely(rrd_flag_is_deleted(ri) && !(options & RRDCONTEXT_OPTION_SHOW_DELETED))) return 0; - if(after && (!ri->last_time_t || after > ri->last_time_t)) + if(after && (!ri->last_time_s || after > ri->last_time_s)) return 0; - if(before && (!ri->first_time_t || before < ri->first_time_t)) + if(before && (!ri->first_time_s || before < ri->first_time_s)) return 0; if(t_parent->chart_label_key && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, t_parent->chart_label_key, '\0')) @@ -1847,14 +1844,14 @@ static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void if(t_parent->chart_labels_filter && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, t_parent->chart_labels_filter, ':')) return 0; - time_t first_time_t = ri->first_time_t; - time_t last_time_t = ri->last_time_t; + time_t first_time_s = ri->first_time_s; + time_t last_time_s = ri->last_time_s; RRD_FLAGS flags = rrd_flags_get(ri); BUFFER *wb_metrics = NULL; if(options & RRDCONTEXT_OPTION_SHOW_METRICS || t_parent->chart_dimensions) { - wb_metrics = buffer_create(4096); + wb_metrics = buffer_create(4096, &netdata_buffers_statistics.buffers_api); struct rrdcontext_to_json t_metrics = { .wb = wb_metrics, @@ -1874,21 +1871,21 @@ static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void return 0; } - first_time_t = t_metrics.combined_first_time_t; - last_time_t = t_metrics.combined_last_time_t; + first_time_s = t_metrics.combined_first_time_s; + last_time_s = t_metrics.combined_last_time_s; flags = t_metrics.combined_flags; } if(t_parent->written) { buffer_strcat(wb, ",\n"); - t_parent->combined_first_time_t = MIN(t_parent->combined_first_time_t, first_time_t); - t_parent->combined_last_time_t = MAX(t_parent->combined_last_time_t, last_time_t); + t_parent->combined_first_time_s = MIN(t_parent->combined_first_time_s, first_time_s); + t_parent->combined_last_time_s = MAX(t_parent->combined_last_time_s, last_time_s); t_parent->combined_flags |= flags; } else { buffer_strcat(wb, "\n"); - t_parent->combined_first_time_t = first_time_t; - t_parent->combined_last_time_t = last_time_t; + t_parent->combined_first_time_s = first_time_s; + t_parent->combined_last_time_s = last_time_s; t_parent->combined_flags = flags; } @@ -1908,7 +1905,7 @@ static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void ",\n\t\t\t\t\t\"family\":\"%s\"" ",\n\t\t\t\t\t\"chart_type\":\"%s\"" ",\n\t\t\t\t\t\"priority\":%u" - ",\n\t\t\t\t\t\"update_every\":%d" + ",\n\t\t\t\t\t\"update_every\":%ld" ",\n\t\t\t\t\t\"first_time_t\":%lld" ",\n\t\t\t\t\t\"last_time_t\":%lld" ",\n\t\t\t\t\t\"collected\":%s" @@ -1919,9 +1916,9 @@ static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void , string2str(ri->family) , rrdset_type_name(ri->chart_type) , ri->priority - , ri->update_every - , (long long)first_time_t - , (flags & RRD_FLAG_COLLECTED) ? (long long)t_parent->now : (long long)last_time_t + , ri->update_every_s + , (long long)first_time_s + , (flags & RRD_FLAG_COLLECTED) ? (long long)t_parent->now : (long long)last_time_s , (flags & RRD_FLAG_COLLECTED) ? "true" : "false" ); @@ -1976,14 +1973,14 @@ static inline int rrdcontext_to_json_callback(const DICTIONARY_ITEM *item, void if(options & RRDCONTEXT_OPTION_DEEPSCAN) rrdcontext_recalculate_context_retention(rc, RRD_FLAG_NONE, false); - if(after && (!rc->last_time_t || after > rc->last_time_t)) + if(after && (!rc->last_time_s || after > rc->last_time_s)) return 0; - if(before && (!rc->first_time_t || before < rc->first_time_t)) + if(before && (!rc->first_time_s || before < rc->first_time_s)) return 0; - time_t first_time_t = rc->first_time_t; - time_t last_time_t = rc->last_time_t; + time_t first_time_s = rc->first_time_s; + time_t last_time_s = rc->last_time_s; RRD_FLAGS flags = rrd_flags_get(rc); BUFFER *wb_instances = NULL; @@ -1992,7 +1989,7 @@ static inline int rrdcontext_to_json_callback(const DICTIONARY_ITEM *item, void || t_parent->chart_labels_filter || t_parent->chart_dimensions) { - wb_instances = buffer_create(4096); + wb_instances = buffer_create(4096, &netdata_buffers_statistics.buffers_api); struct rrdcontext_to_json t_instances = { .wb = wb_instances, @@ -2012,8 +2009,8 @@ static inline int rrdcontext_to_json_callback(const DICTIONARY_ITEM *item, void return 0; } - first_time_t = t_instances.combined_first_time_t; - last_time_t = t_instances.combined_last_time_t; + first_time_s = t_instances.combined_first_time_s; + last_time_s = t_instances.combined_last_time_s; flags = t_instances.combined_flags; } @@ -2043,8 +2040,8 @@ static inline int rrdcontext_to_json_callback(const DICTIONARY_ITEM *item, void , string2str(rc->family) , rrdset_type_name(rc->chart_type) , rc->priority - , (long long)first_time_t - , (flags & RRD_FLAG_COLLECTED) ? (long long)t_parent->now : (long long)last_time_t + , (long long)first_time_s + , (flags & RRD_FLAG_COLLECTED) ? (long long)t_parent->now : (long long)last_time_s , (flags & RRD_FLAG_COLLECTED) ? "true" : "false" ); @@ -2220,7 +2217,7 @@ DICTIONARY *rrdcontext_all_metrics_to_dict(RRDHOST *host, SIMPLE_PATTERN *contex if(!host || !host->rrdctx) return NULL; - DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_DONT_OVERWRITE_VALUE); + DICTIONARY *dict = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_rrdcontext, 0); dictionary_register_insert_callback(dict, metric_entry_insert_callback, NULL); dictionary_register_delete_callback(dict, metric_entry_delete_callback, NULL); dictionary_register_conflict_callback(dict, metric_entry_conflict_callback, NULL); @@ -2328,11 +2325,25 @@ void query_target_release(QUERY_TARGET *qt) { string_freez(qt->query.array[i].chart.name); qt->query.array[i].chart.name = NULL; + // reset the plans + for(size_t p = 0; p < qt->query.array[i].plan.used; p++) { + internal_fatal(qt->query.array[i].plan.array[p].initialized && + !qt->query.array[i].plan.array[p].finalized, + "QUERY: left-over initialized plan"); + + qt->query.array[i].plan.array[p].initialized = false; + qt->query.array[i].plan.array[p].finalized = false; + } + qt->query.array[i].plan.used = 0; + + // reset the tiers for(size_t tier = 0; tier < storage_tiers ;tier++) { if(qt->query.array[i].tiers[tier].db_metric_handle) { STORAGE_ENGINE *eng = qt->query.array[i].tiers[tier].eng; eng->api.metric_release(qt->query.array[i].tiers[tier].db_metric_handle); qt->query.array[i].tiers[tier].db_metric_handle = NULL; + qt->query.array[i].tiers[tier].weight = 0; + qt->query.array[i].tiers[tier].eng = NULL; } } } @@ -2366,37 +2377,44 @@ void query_target_release(QUERY_TARGET *qt) { qt->contexts.used = 0; qt->hosts.used = 0; - qt->db.minimum_latest_update_every = 0; - qt->db.first_time_t = 0; - qt->db.last_time_t = 0; + qt->db.minimum_latest_update_every_s = 0; + qt->db.first_time_s = 0; + qt->db.last_time_s = 0; qt->id[0] = '\0'; qt->used = false; } void query_target_free(void) { - if(thread_query_target.used) - query_target_release(&thread_query_target); + QUERY_TARGET *qt = &thread_query_target; - freez(thread_query_target.query.array); - thread_query_target.query.array = NULL; - thread_query_target.query.size = 0; + if(qt->used) + query_target_release(qt); - freez(thread_query_target.metrics.array); - thread_query_target.metrics.array = NULL; - thread_query_target.metrics.size = 0; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->query.size * sizeof(QUERY_METRIC), __ATOMIC_RELAXED); + freez(qt->query.array); + qt->query.array = NULL; + qt->query.size = 0; - freez(thread_query_target.instances.array); - thread_query_target.instances.array = NULL; - thread_query_target.instances.size = 0; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->metrics.size * sizeof(RRDMETRIC_ACQUIRED *), __ATOMIC_RELAXED); + freez(qt->metrics.array); + qt->metrics.array = NULL; + qt->metrics.size = 0; - freez(thread_query_target.contexts.array); - thread_query_target.contexts.array = NULL; - thread_query_target.contexts.size = 0; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *), __ATOMIC_RELAXED); + freez(qt->instances.array); + qt->instances.array = NULL; + qt->instances.size = 0; - freez(thread_query_target.hosts.array); - thread_query_target.hosts.array = NULL; - thread_query_target.hosts.size = 0; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *), __ATOMIC_RELAXED); + freez(qt->contexts.array); + qt->contexts.array = NULL; + qt->contexts.size = 0; + + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->hosts.size * sizeof(RRDHOST *), __ATOMIC_RELAXED); + freez(qt->hosts.array); + qt->hosts.array = NULL; + qt->hosts.size = 0; } static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED *rma, RRDINSTANCE *ri, @@ -2408,69 +2426,73 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED return; if(qt->metrics.used == qt->metrics.size) { + size_t old_mem = qt->metrics.size * sizeof(RRDMETRIC_ACQUIRED *); qt->metrics.size = (qt->metrics.size) ? qt->metrics.size * 2 : 1; - qt->metrics.array = reallocz(qt->metrics.array, qt->metrics.size * sizeof(RRDMETRIC_ACQUIRED *)); + size_t new_mem = qt->metrics.size * sizeof(RRDMETRIC_ACQUIRED *); + qt->metrics.array = reallocz(qt->metrics.array, new_mem); + + __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED); } qt->metrics.array[qt->metrics.used++] = rrdmetric_acquired_dup(rma); if(!queryable_instance) return; - time_t common_first_time_t = 0; - time_t common_last_time_t = 0; - time_t common_update_every = 0; + time_t common_first_time_s = 0; + time_t common_last_time_s = 0; + time_t common_update_every_s = 0; size_t tiers_added = 0; struct { STORAGE_ENGINE *eng; STORAGE_METRIC_HANDLE *db_metric_handle; - time_t db_first_time_t; - time_t db_last_time_t; - time_t db_update_every; + time_t db_first_time_s; + time_t db_last_time_s; + time_t db_update_every_s; } tier_retention[storage_tiers]; for (size_t tier = 0; tier < storage_tiers; tier++) { STORAGE_ENGINE *eng = qtl->host->db[tier].eng; tier_retention[tier].eng = eng; - tier_retention[tier].db_update_every = (time_t) (qtl->host->db[tier].tier_grouping * ri->update_every); + tier_retention[tier].db_update_every_s = (time_t) (qtl->host->db[tier].tier_grouping * ri->update_every_s); - if(rm->rrddim && rm->rrddim->tiers[tier] && rm->rrddim->tiers[tier]->db_metric_handle) - tier_retention[tier].db_metric_handle = eng->api.metric_dup(rm->rrddim->tiers[tier]->db_metric_handle); + if(rm->rrddim && rm->rrddim->tiers[tier].db_metric_handle) + tier_retention[tier].db_metric_handle = eng->api.metric_dup(rm->rrddim->tiers[tier].db_metric_handle); else tier_retention[tier].db_metric_handle = eng->api.metric_get(qtl->host->db[tier].instance, &rm->uuid); if(tier_retention[tier].db_metric_handle) { - tier_retention[tier].db_first_time_t = tier_retention[tier].eng->api.query_ops.oldest_time(tier_retention[tier].db_metric_handle); - tier_retention[tier].db_last_time_t = tier_retention[tier].eng->api.query_ops.latest_time(tier_retention[tier].db_metric_handle); + tier_retention[tier].db_first_time_s = tier_retention[tier].eng->api.query_ops.oldest_time_s(tier_retention[tier].db_metric_handle); + tier_retention[tier].db_last_time_s = tier_retention[tier].eng->api.query_ops.latest_time_s(tier_retention[tier].db_metric_handle); - if(!common_first_time_t) - common_first_time_t = tier_retention[tier].db_first_time_t; - else if(tier_retention[tier].db_first_time_t) - common_first_time_t = MIN(common_first_time_t, tier_retention[tier].db_first_time_t); + if(!common_first_time_s) + common_first_time_s = tier_retention[tier].db_first_time_s; + else if(tier_retention[tier].db_first_time_s) + common_first_time_s = MIN(common_first_time_s, tier_retention[tier].db_first_time_s); - if(!common_last_time_t) - common_last_time_t = tier_retention[tier].db_last_time_t; + if(!common_last_time_s) + common_last_time_s = tier_retention[tier].db_last_time_s; else - common_last_time_t = MAX(common_last_time_t, tier_retention[tier].db_last_time_t); + common_last_time_s = MAX(common_last_time_s, tier_retention[tier].db_last_time_s); - if(!common_update_every) - common_update_every = tier_retention[tier].db_update_every; - else if(tier_retention[tier].db_update_every) - common_update_every = MIN(common_update_every, tier_retention[tier].db_update_every); + if(!common_update_every_s) + common_update_every_s = tier_retention[tier].db_update_every_s; + else if(tier_retention[tier].db_update_every_s) + common_update_every_s = MIN(common_update_every_s, tier_retention[tier].db_update_every_s); tiers_added++; } else { - tier_retention[tier].db_first_time_t = 0; - tier_retention[tier].db_last_time_t = 0; - tier_retention[tier].db_update_every = 0; + tier_retention[tier].db_first_time_s = 0; + tier_retention[tier].db_last_time_s = 0; + tier_retention[tier].db_update_every_s = 0; } } bool release_retention = true; bool timeframe_matches = (tiers_added - && (common_first_time_t - common_update_every * 2) <= qt->window.before - && (common_last_time_t + common_update_every * 2) >= qt->window.after + && (common_first_time_s - common_update_every_s * 2) <= qt->window.before + && (common_last_time_s + common_update_every_s * 2) >= qt->window.after ) ? true : false; if(timeframe_matches) { @@ -2515,14 +2537,19 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED // let's add it to the query metrics if(ri->rrdset) - ri->rrdset->last_accessed_time = qtl->start_s; + ri->rrdset->last_accessed_time_s = qtl->start_s; if (qt->query.used == qt->query.size) { + size_t old_mem = qt->query.size * sizeof(QUERY_METRIC); qt->query.size = (qt->query.size) ? qt->query.size * 2 : 1; - qt->query.array = reallocz(qt->query.array, qt->query.size * sizeof(QUERY_METRIC)); + size_t new_mem = qt->query.size * sizeof(QUERY_METRIC); + qt->query.array = reallocz(qt->query.array, new_mem); + + __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED); } QUERY_METRIC *qm = &qt->query.array[qt->query.used++]; + qm->plan.used = 0; qm->dimension.options = options; qm->link.host = qtl->host; @@ -2536,18 +2563,18 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED qm->dimension.id = string_dup(rm->id); qm->dimension.name = string_dup(rm->name); - if (!qt->db.first_time_t || common_first_time_t < qt->db.first_time_t) - qt->db.first_time_t = common_first_time_t; + if (!qt->db.first_time_s || common_first_time_s < qt->db.first_time_s) + qt->db.first_time_s = common_first_time_s; - if (!qt->db.last_time_t || common_last_time_t > qt->db.last_time_t) - qt->db.last_time_t = common_last_time_t; + if (!qt->db.last_time_s || common_last_time_s > qt->db.last_time_s) + qt->db.last_time_s = common_last_time_s; for (size_t tier = 0; tier < storage_tiers; tier++) { qm->tiers[tier].eng = tier_retention[tier].eng; qm->tiers[tier].db_metric_handle = tier_retention[tier].db_metric_handle; - qm->tiers[tier].db_first_time_t = tier_retention[tier].db_first_time_t; - qm->tiers[tier].db_last_time_t = tier_retention[tier].db_last_time_t; - qm->tiers[tier].db_update_every = tier_retention[tier].db_update_every; + qm->tiers[tier].db_first_time_s = tier_retention[tier].db_first_time_s; + qm->tiers[tier].db_last_time_s = tier_retention[tier].db_last_time_s; + qm->tiers[tier].db_update_every_s = tier_retention[tier].db_update_every_s; } release_retention = false; } @@ -2572,14 +2599,18 @@ static void query_target_add_instance(QUERY_TARGET_LOCALS *qtl, RRDINSTANCE_ACQU return; if(qt->instances.used == qt->instances.size) { + size_t old_mem = qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *); qt->instances.size = (qt->instances.size) ? qt->instances.size * 2 : 1; - qt->instances.array = reallocz(qt->instances.array, qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *)); + size_t new_mem = qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *); + qt->instances.array = reallocz(qt->instances.array, new_mem); + + __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED); } qtl->ria = qt->instances.array[qt->instances.used++] = rrdinstance_acquired_dup(ria); - if(qt->db.minimum_latest_update_every == 0 || ri->update_every < qt->db.minimum_latest_update_every) - qt->db.minimum_latest_update_every = ri->update_every; + if(qt->db.minimum_latest_update_every_s == 0 || ri->update_every_s < qt->db.minimum_latest_update_every_s) + qt->db.minimum_latest_update_every_s = ri->update_every_s; if(queryable_instance) { if ((qt->instances.chart_label_key_pattern && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, qt->instances.chart_label_key_pattern, ':')) || @@ -2616,8 +2647,12 @@ static void query_target_add_context(QUERY_TARGET_LOCALS *qtl, RRDCONTEXT_ACQUIR return; if(qt->contexts.used == qt->contexts.size) { + size_t old_mem = qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *); qt->contexts.size = (qt->contexts.size) ? qt->contexts.size * 2 : 1; - qt->contexts.array = reallocz(qt->contexts.array, qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *)); + size_t new_mem = qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *); + qt->contexts.array = reallocz(qt->contexts.array, new_mem); + + __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED); } qtl->rca = qt->contexts.array[qt->contexts.used++] = rrdcontext_acquired_dup(rca); @@ -2656,8 +2691,12 @@ static void query_target_add_host(QUERY_TARGET_LOCALS *qtl, RRDHOST *host) { QUERY_TARGET *qt = qtl->qt; if(qt->hosts.used == qt->hosts.size) { + size_t old_mem = qt->hosts.size * sizeof(RRDHOST *); qt->hosts.size = (qt->hosts.size) ? qt->hosts.size * 2 : 1; - qt->hosts.array = reallocz(qt->hosts.array, qt->hosts.size * sizeof(RRDHOST *)); + size_t new_mem = qt->hosts.size * sizeof(RRDHOST *); + qt->hosts.array = reallocz(qt->hosts.array, new_mem); + + __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED); } qtl->host = qt->hosts.array[qt->hosts.used++] = host; @@ -2770,6 +2809,9 @@ void query_target_generate_name(QUERY_TARGET *qt) { } QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) { + if(!service_running(ABILITY_DATA_QUERIES)) + return NULL; + QUERY_TARGET *qt = &thread_query_target; if(qt->used) @@ -2800,7 +2842,7 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) { .charts_labels_filter = qt->request.charts_labels_filter, }; - qt->db.minimum_latest_update_every = 0; // it will be updated by query_target_add_query() + qt->db.minimum_latest_update_every_s = 0; // it will be updated by query_target_add_query() // prepare all the patterns qt->hosts.pattern = is_valid_sp(qtl.hosts) ? simple_pattern_create(qtl.hosts, ",|\t\r\n\f\v", SIMPLE_PATTERN_EXACT) : NULL; @@ -2922,7 +2964,7 @@ static void rrdinstance_load_chart_callback(SQL_CHART_DATA *sc, void *data) { .family = string_strdupz(sc->family), .chart_type = sc->chart_type, .priority = sc->priority, - .update_every = sc->update_every, + .update_every_s = sc->update_every, .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_LOAD_SQL, // no need for atomics }; uuid_copy(tri.uuid, sc->chart_id); @@ -3012,7 +3054,7 @@ static uint64_t rrdcontext_version_hash_with_callback( // when the context is being collected, // rc->hub.last_time_t is already zero - hash += rc->hub.version + rc->hub.last_time_t - rc->hub.first_time_t; + hash += rc->hub.version + rc->hub.last_time_s - rc->hub.first_time_s; rrdcontext_unlock(rc); @@ -3057,17 +3099,16 @@ static bool rrdmetric_update_retention(RRDMETRIC *rm) { time_t min_first_time_t = LONG_MAX, max_last_time_t = 0; if(rm->rrddim) { - min_first_time_t = rrddim_first_entry_t(rm->rrddim); - max_last_time_t = rrddim_last_entry_t(rm->rrddim); + min_first_time_t = rrddim_first_entry_s(rm->rrddim); + max_last_time_t = rrddim_last_entry_s(rm->rrddim); } -#ifdef ENABLE_DBENGINE - else if (dbengine_enabled) { + else { RRDHOST *rrdhost = rm->ri->rc->rrdhost; for (size_t tier = 0; tier < storage_tiers; tier++) { - if(!rrdhost->db[tier].instance) continue; + STORAGE_ENGINE *eng = rrdhost->db[tier].eng; time_t first_time_t, last_time_t; - if (rrdeng_metric_retention_by_uuid(rrdhost->db[tier].instance, &rm->uuid, &first_time_t, &last_time_t) == 0) { + if (eng->api.metric_retention_by_uuid(rrdhost->db[tier].instance, &rm->uuid, &first_time_t, &last_time_t)) { if (first_time_t < min_first_time_t) min_first_time_t = first_time_t; @@ -3076,17 +3117,15 @@ static bool rrdmetric_update_retention(RRDMETRIC *rm) { } } } - else { - // cannot get retention + + if((min_first_time_t == LONG_MAX || min_first_time_t == 0) && max_last_time_t == 0) return false; - } -#endif if(min_first_time_t == LONG_MAX) min_first_time_t = 0; if(min_first_time_t > max_last_time_t) { - internal_error(true, "RRDMETRIC: retention of '%s' is flipped", string2str(rm->id)); + internal_error(true, "RRDMETRIC: retention of '%s' is flipped, first_time_t = %ld, last_time_t = %ld", string2str(rm->id), min_first_time_t, max_last_time_t); time_t tmp = min_first_time_t; min_first_time_t = max_last_time_t; max_last_time_t = tmp; @@ -3094,17 +3133,17 @@ static bool rrdmetric_update_retention(RRDMETRIC *rm) { // check if retention changed - if (min_first_time_t != rm->first_time_t) { - rm->first_time_t = min_first_time_t; + if (min_first_time_t != rm->first_time_s) { + rm->first_time_s = min_first_time_t; rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); } - if (max_last_time_t != rm->last_time_t) { - rm->last_time_t = max_last_time_t; + if (max_last_time_t != rm->last_time_s) { + rm->last_time_s = max_last_time_t; rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); } - if(unlikely(!rm->first_time_t && !rm->last_time_t)) + if(unlikely(!rm->first_time_s && !rm->last_time_s)) rrd_flag_set_deleted(rm, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); rrd_flag_set(rm, RRD_FLAG_LIVE_RETENTION); @@ -3123,7 +3162,7 @@ static inline bool rrdmetric_should_be_deleted(RRDMETRIC *rm) { return false; rrdmetric_update_retention(rm); - if(rm->first_time_t || rm->last_time_t) + if(rm->first_time_s || rm->last_time_s) return false; return true; @@ -3145,7 +3184,7 @@ static inline bool rrdinstance_should_be_deleted(RRDINSTANCE *ri) { if(unlikely(dictionary_entries(ri->rrdmetrics) != 0)) return false; - if(ri->first_time_t || ri->last_time_t) + if(ri->first_time_s || ri->last_time_s) return false; return true; @@ -3164,7 +3203,7 @@ static inline bool rrdcontext_should_be_deleted(RRDCONTEXT *rc) { if(unlikely(dictionary_entries(rc->rrdinstances) != 0)) return false; - if(unlikely(rc->first_time_t || rc->last_time_t)) + if(unlikely(rc->first_time_s || rc->last_time_s)) return false; return true; @@ -3189,7 +3228,7 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo RRDCONTEXT *rc; dfe_start_reentrant((DICTIONARY *)host->rrdctx, rc) { - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_CONTEXT))) break; if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP); @@ -3197,7 +3236,7 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo RRDINSTANCE *ri; dfe_start_reentrant(rc->rrdinstances, ri) { - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_CONTEXT))) break; RRDMETRIC *rm; dfe_start_write(ri->rrdmetrics, rm) { @@ -3313,7 +3352,7 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL if(dictionary_entries(ri->rrdmetrics) > 0) { RRDMETRIC *rm; dfe_start_read((DICTIONARY *)ri->rrdmetrics, rm) { - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_CONTEXT))) break; RRD_FLAGS reason_to_pass = reason; if(rrd_flag_check(ri, RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION)) @@ -3329,16 +3368,16 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL continue; } - if(!currently_collected && rrd_flag_check(rm, RRD_FLAG_COLLECTED) && rm->first_time_t) + if(!currently_collected && rrd_flag_check(rm, RRD_FLAG_COLLECTED) && rm->first_time_s) currently_collected = true; metrics_active++; - if (rm->first_time_t && rm->first_time_t < min_first_time_t) - min_first_time_t = rm->first_time_t; + if (rm->first_time_s && rm->first_time_s < min_first_time_t) + min_first_time_t = rm->first_time_s; - if (rm->last_time_t && rm->last_time_t > max_last_time_t) - max_last_time_t = rm->last_time_t; + if (rm->last_time_s && rm->last_time_s > max_last_time_t) + max_last_time_t = rm->last_time_s; } dfe_done(rm); } @@ -3351,13 +3390,13 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL if(unlikely(!metrics_active)) { // no metrics available - if(ri->first_time_t) { - ri->first_time_t = 0; + if(ri->first_time_s) { + ri->first_time_s = 0; rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); } - if(ri->last_time_t) { - ri->last_time_t = 0; + if(ri->last_time_s) { + ri->last_time_s = 0; rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); } @@ -3370,13 +3409,13 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL min_first_time_t = 0; if (unlikely(min_first_time_t == 0 || max_last_time_t == 0)) { - if(ri->first_time_t) { - ri->first_time_t = 0; + if(ri->first_time_s) { + ri->first_time_s = 0; rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); } - if(ri->last_time_t) { - ri->last_time_t = 0; + if(ri->last_time_s) { + ri->last_time_s = 0; rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); } @@ -3386,13 +3425,13 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL else { rrd_flag_clear(ri, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); - if (unlikely(ri->first_time_t != min_first_time_t)) { - ri->first_time_t = min_first_time_t; + if (unlikely(ri->first_time_s != min_first_time_t)) { + ri->first_time_s = min_first_time_t; rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); } - if (unlikely(ri->last_time_t != max_last_time_t)) { - ri->last_time_t = max_last_time_t; + if (unlikely(ri->last_time_s != max_last_time_t)) { + ri->last_time_s = max_last_time_t; rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); } @@ -3413,6 +3452,8 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG if(worker_jobs) worker_is_busy(WORKER_JOB_PP_CONTEXT); + size_t min_priority_collected = LONG_MAX; + size_t min_priority_not_collected = LONG_MAX; size_t min_priority = LONG_MAX; time_t min_first_time_t = LONG_MAX, max_last_time_t = 0; size_t instances_active = 0, instances_deleted = 0; @@ -3420,7 +3461,7 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG if(dictionary_entries(rc->rrdinstances) > 0) { RRDINSTANCE *ri; dfe_start_reentrant(rc->rrdinstances, ri) { - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_CONTEXT))) break; RRD_FLAGS reason_to_pass = reason; if(rrd_flag_check(rc, RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION)) @@ -3439,7 +3480,7 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG continue; } - if(unlikely(!currently_collected && rrd_flag_is_collected(ri) && ri->first_time_t)) + if(unlikely(!currently_collected && rrd_flag_is_collected(ri) && ri->first_time_s)) currently_collected = true; internal_error(rc->units != ri->units, @@ -3449,16 +3490,31 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG instances_active++; - if (ri->priority >= RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY && ri->priority < min_priority) - min_priority = ri->priority; + if (ri->priority >= RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY) { + if(rrd_flag_check(ri, RRD_FLAG_COLLECTED)) { + if(ri->priority < min_priority_collected) + min_priority_collected = ri->priority; + } + else { + if(ri->priority < min_priority_not_collected) + min_priority_not_collected = ri->priority; + } + } - if (ri->first_time_t && ri->first_time_t < min_first_time_t) - min_first_time_t = ri->first_time_t; + if (ri->first_time_s && ri->first_time_s < min_first_time_t) + min_first_time_t = ri->first_time_s; - if (ri->last_time_t && ri->last_time_t > max_last_time_t) - max_last_time_t = ri->last_time_t; + if (ri->last_time_s && ri->last_time_s > max_last_time_t) + max_last_time_t = ri->last_time_s; } dfe_done(ri); + + if(min_priority_collected != LONG_MAX) + // use the collected priority + min_priority = min_priority_collected; + else + // use the non-collected priority + min_priority = min_priority_not_collected; } { @@ -3485,13 +3541,13 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG if(unlikely(!instances_active)) { // we had some instances, but they are gone now... - if(rc->first_time_t) { - rc->first_time_t = 0; + if(rc->first_time_s) { + rc->first_time_s = 0; rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); } - if(rc->last_time_t) { - rc->last_time_t = 0; + if(rc->last_time_s) { + rc->last_time_s = 0; rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); } @@ -3504,13 +3560,13 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG min_first_time_t = 0; if (unlikely(min_first_time_t == 0 && max_last_time_t == 0)) { - if(rc->first_time_t) { - rc->first_time_t = 0; + if(rc->first_time_s) { + rc->first_time_s = 0; rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); } - if(rc->last_time_t) { - rc->last_time_t = 0; + if(rc->last_time_s) { + rc->last_time_s = 0; rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); } @@ -3519,13 +3575,13 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG else { rrd_flag_clear(rc, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); - if (unlikely(rc->first_time_t != min_first_time_t)) { - rc->first_time_t = min_first_time_t; + if (unlikely(rc->first_time_s != min_first_time_t)) { + rc->first_time_s = min_first_time_t; rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); } - if (rc->last_time_t != max_last_time_t) { - rc->last_time_t = max_last_time_t; + if (rc->last_time_s != max_last_time_t) { + rc->last_time_s = max_last_time_t; rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); } @@ -3592,7 +3648,7 @@ static void rrdcontext_post_process_queued_contexts(RRDHOST *host) { RRDCONTEXT *rc; dfe_start_reentrant((DICTIONARY *)host->rrdctx_post_processing_queue, rc) { - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_CONTEXT))) break; rrdcontext_dequeue_from_post_processing(rc); rrdcontext_post_process_updates(rc, false, RRD_FLAG_NONE, true); @@ -3621,8 +3677,8 @@ static void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe rc->hub.family = string2str(rc->family); rc->hub.chart_type = rrdset_type_name(rc->chart_type); rc->hub.priority = rc->priority; - rc->hub.first_time_t = rc->first_time_t; - rc->hub.last_time_t = rrd_flag_is_collected(rc) ? 0 : rc->last_time_t; + rc->hub.first_time_s = rc->first_time_s; + rc->hub.last_time_s = rrd_flag_is_collected(rc) ? 0 : rc->last_time_s; rc->hub.deleted = rrd_flag_is_deleted(rc) ? true : false; #ifdef ENABLE_ACLK @@ -3634,8 +3690,8 @@ static void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe .family = rc->hub.family, .chart_type = rc->hub.chart_type, .priority = rc->hub.priority, - .first_entry = rc->hub.first_time_t, - .last_entry = rc->hub.last_time_t, + .first_entry = rc->hub.first_time_s, + .last_entry = rc->hub.last_time_s, .deleted = rc->hub.deleted, }; @@ -3689,10 +3745,10 @@ static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending _ if(unlikely(rc->priority != rc->hub.priority)) priority_changed = true; - if(unlikely((uint64_t)rc->first_time_t != rc->hub.first_time_t)) + if(unlikely((uint64_t)rc->first_time_s != rc->hub.first_time_s)) first_time_changed = true; - if(unlikely((uint64_t)((flags & RRD_FLAG_COLLECTED) ? 0 : rc->last_time_t) != rc->hub.last_time_t)) + if(unlikely((uint64_t)((flags & RRD_FLAG_COLLECTED) ? 0 : rc->last_time_s) != rc->hub.last_time_s)) last_time_changed = true; if(unlikely(((flags & RRD_FLAG_DELETED) ? true : false) != rc->hub.deleted)) @@ -3711,8 +3767,8 @@ static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending _ string2str(rc->family), family_changed ? " (CHANGED)" : "", rrdset_type_name(rc->chart_type), chart_type_changed ? " (CHANGED)" : "", rc->priority, priority_changed ? " (CHANGED)" : "", - rc->first_time_t, first_time_changed ? " (CHANGED)" : "", - (flags & RRD_FLAG_COLLECTED) ? 0 : rc->last_time_t, last_time_changed ? " (CHANGED)" : "", + rc->first_time_s, first_time_changed ? " (CHANGED)" : "", + (flags & RRD_FLAG_COLLECTED) ? 0 : rc->last_time_s, last_time_changed ? " (CHANGED)" : "", (flags & RRD_FLAG_DELETED) ? "true" : "false", deleted_changed ? " (CHANGED)" : "", sending ? (now_realtime_usec() - rc->queue.queued_ut) / USEC_PER_MS : 0, sending ? (rc->queue.scheduled_dispatch_ut - rc->queue.queued_ut) / USEC_PER_MS : 0 @@ -3773,7 +3829,7 @@ static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now RRDCONTEXT *rc; dfe_start_reentrant((DICTIONARY *)host->rrdctx_hub_queue, rc) { - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_CONTEXT))) break; if(unlikely(messages_added >= MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST)) break; @@ -3840,7 +3896,7 @@ static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now dfe_done(rc); #ifdef ENABLE_ACLK - if(!netdata_exit && bundle) { + if(service_running(SERVICE_CONTEXT) && bundle) { // we have a bundle to send messages // update the version hash @@ -3891,11 +3947,11 @@ void *rrdcontext_main(void *ptr) { heartbeat_init(&hb); usec_t step = RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC; - while (!netdata_exit) { + while (service_running(SERVICE_CONTEXT)) { worker_is_idle(); heartbeat_next(&hb, step); - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_CONTEXT))) break; usec_t now_ut = now_realtime_usec(); @@ -3911,7 +3967,7 @@ void *rrdcontext_main(void *ptr) { rrd_rdlock(); RRDHOST *host; rrdhost_foreach_read(host) { - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_CONTEXT))) break; worker_is_busy(WORKER_JOB_HOSTS); |