summaryrefslogtreecommitdiffstats
path: root/database/rrdcontext.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/rrdcontext.c502
1 files changed, 279 insertions, 223 deletions
diff --git a/database/rrdcontext.c b/database/rrdcontext.c
index 3413d1ea8..9fc605f32 100644
--- a/database/rrdcontext.c
+++ b/database/rrdcontext.c
@@ -221,8 +221,8 @@ typedef struct rrdmetric {
RRDDIM *rrddim;
- time_t first_time_t;
- time_t last_time_t;
+ time_t first_time_s;
+ time_t last_time_s;
RRD_FLAGS flags;
struct rrdinstance *ri;
@@ -240,10 +240,10 @@ typedef struct rrdinstance {
RRDSET_TYPE chart_type;
RRD_FLAGS flags; // flags related to this instance
- time_t first_time_t;
- time_t last_time_t;
+ time_t first_time_s;
+ time_t last_time_s;
- int update_every; // data collection frequency
+ time_t update_every_s; // data collection frequency
RRDSET *rrdset; // pointer to RRDSET when collected, or NULL
DICTIONARY *rrdlabels; // linked to RRDSET->chart_labels or own version
@@ -269,8 +269,8 @@ typedef struct rrdcontext {
RRDSET_TYPE chart_type;
RRD_FLAGS flags;
- time_t first_time_t;
- time_t last_time_t;
+ time_t first_time_s;
+ time_t last_time_s;
VERSIONED_CONTEXT_DATA hub;
@@ -522,20 +522,20 @@ static bool rrdmetric_conflict_callback(const DICTIONARY_ITEM *item __maybe_unus
uuid_unparse(rm->uuid, uuid1);
uuid_unparse(rm_new->uuid, uuid2);
- time_t old_first_time_t = 0;
- time_t old_last_time_t = 0;
+ time_t old_first_time_s = 0;
+ time_t old_last_time_s = 0;
if(rrdmetric_update_retention(rm)) {
- old_first_time_t = rm->first_time_t;
- old_last_time_t = rm->last_time_t;
+ old_first_time_s = rm->first_time_s;
+ old_last_time_s = rm->last_time_s;
}
uuid_copy(rm->uuid, rm_new->uuid);
- time_t new_first_time_t = 0;
- time_t new_last_time_t = 0;
+ time_t new_first_time_s = 0;
+ time_t new_last_time_s = 0;
if(rrdmetric_update_retention(rm)) {
- new_first_time_t = rm->first_time_t;
- new_last_time_t = rm->last_time_t;
+ new_first_time_s = rm->first_time_s;
+ new_last_time_s = rm->last_time_s;
}
internal_error(true,
@@ -543,8 +543,8 @@ static bool rrdmetric_conflict_callback(const DICTIONARY_ITEM *item __maybe_unus
, string2str(rm->id)
, string2str(rm->ri->id)
, rrdhost_hostname(rm->ri->rc->rrdhost)
- , uuid1, old_first_time_t, old_last_time_t, old_last_time_t - old_first_time_t
- , uuid2, new_first_time_t, new_last_time_t, new_last_time_t - new_first_time_t
+ , uuid1, old_first_time_s, old_last_time_s, old_last_time_s - old_first_time_s
+ , uuid2, new_first_time_s, new_last_time_s, new_last_time_s - new_first_time_s
);
#else
uuid_copy(rm->uuid, rm_new->uuid);
@@ -576,13 +576,13 @@ static bool rrdmetric_conflict_callback(const DICTIONARY_ITEM *item __maybe_unus
rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
}
- if(!rm->first_time_t || (rm_new->first_time_t && rm_new->first_time_t < rm->first_time_t)) {
- rm->first_time_t = rm_new->first_time_t;
+ if(!rm->first_time_s || (rm_new->first_time_s && rm_new->first_time_s < rm->first_time_s)) {
+ rm->first_time_s = rm_new->first_time_s;
rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
}
- if(!rm->last_time_t || (rm_new->last_time_t && rm_new->last_time_t > rm->last_time_t)) {
- rm->last_time_t = rm_new->last_time_t;
+ if(!rm->last_time_s || (rm_new->last_time_s && rm_new->last_time_s > rm->last_time_s)) {
+ rm->last_time_s = rm_new->last_time_s;
rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
}
@@ -611,7 +611,9 @@ static void rrdmetrics_create_in_rrdinstance(RRDINSTANCE *ri) {
if(unlikely(!ri)) return;
if(likely(ri->rrdmetrics)) return;
- ri->rrdmetrics = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ ri->rrdmetrics = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ &dictionary_stats_category_rrdcontext, sizeof(RRDMETRIC));
+
dictionary_register_insert_callback(ri->rrdmetrics, rrdmetric_insert_callback, ri);
dictionary_register_delete_callback(ri->rrdmetrics, rrdmetric_delete_callback, ri);
dictionary_register_conflict_callback(ri->rrdmetrics, rrdmetric_conflict_callback, ri);
@@ -750,11 +752,6 @@ static void rrdinstance_free(RRDINSTANCE *ri) {
}
static void rrdinstance_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdcontext) {
- static STRING *ml_anomaly_rates_id = NULL;
-
- if(unlikely(!ml_anomaly_rates_id))
- ml_anomaly_rates_id = string_strdupz(ML_ANOMALY_RATES_CHART_ID);
-
RRDINSTANCE *ri = value;
// link it to its parent
@@ -781,10 +778,6 @@ static void rrdinstance_insert_callback(const DICTIONARY_ITEM *item __maybe_unus
ri->flags &= ~RRD_FLAG_HIDDEN; // no need of atomics at the constructor
}
- // we need this when loading from SQL
- if(unlikely(ri->id == ml_anomaly_rates_id))
- ri->flags |= RRD_FLAG_HIDDEN; // no need of atomics at the constructor
-
rrdmetrics_create_in_rrdinstance(ri);
// signal the react callback to do the job
@@ -872,8 +865,8 @@ static bool rrdinstance_conflict_callback(const DICTIONARY_ITEM *item __maybe_un
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
}
- if(ri->update_every != ri_new->update_every) {
- ri->update_every = ri_new->update_every;
+ if(ri->update_every_s != ri_new->update_every_s) {
+ ri->update_every_s = ri_new->update_every_s;
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
}
@@ -923,7 +916,9 @@ static void rrdinstance_react_callback(const DICTIONARY_ITEM *item __maybe_unuse
void rrdinstances_create_in_rrdcontext(RRDCONTEXT *rc) {
if(unlikely(!rc || rc->rrdinstances)) return;
- rc->rrdinstances = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ rc->rrdinstances = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ &dictionary_stats_category_rrdcontext, sizeof(RRDINSTANCE));
+
dictionary_register_insert_callback(rc->rrdinstances, rrdinstance_insert_callback, rc);
dictionary_register_delete_callback(rc->rrdinstances, rrdinstance_delete_callback, rc);
dictionary_register_conflict_callback(rc->rrdinstances, rrdinstance_conflict_callback, rc);
@@ -945,8 +940,8 @@ static void rrdinstance_trigger_updates(RRDINSTANCE *ri, const char *function) {
ri->priority = st->priority;
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
}
- if(unlikely(st->update_every != ri->update_every)) {
- ri->update_every = st->update_every;
+ if(unlikely(st->update_every != ri->update_every_s)) {
+ ri->update_every_s = st->update_every;
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
}
}
@@ -989,7 +984,7 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) {
.title = string_dup(st->title),
.chart_type = st->chart_type,
.priority = st->priority,
- .update_every = st->update_every,
+ .update_every_s = st->update_every,
.flags = RRD_FLAG_NONE, // no need for atomics
.rrdset = st,
};
@@ -1027,8 +1022,8 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) {
RRDMETRIC *rm_old = rrdmetric_acquired_value(rd->rrdmetric);
rrd_flags_replace(rm_old, RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
rm_old->rrddim = NULL;
- rm_old->first_time_t = 0;
- rm_old->last_time_t = 0;
+ rm_old->first_time_s = 0;
+ rm_old->last_time_s = 0;
rrdmetric_release(rd->rrdmetric);
rd->rrdmetric = NULL;
@@ -1043,8 +1038,8 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) {
rrd_flags_replace(ri_old, RRD_FLAG_OWN_LABELS|RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
ri_old->rrdset = NULL;
- ri_old->first_time_t = 0;
- ri_old->last_time_t = 0;
+ ri_old->first_time_s = 0;
+ ri_old->last_time_s = 0;
rrdinstance_trigger_updates(ri_old, __FUNCTION__ );
rrdinstance_release(ria_old);
@@ -1054,8 +1049,8 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) {
if(!dictionary_entries(rc_old->rrdinstances) && !dictionary_stats_referenced_items(rc_old->rrdinstances)) {
rrdcontext_lock(rc_old);
rc_old->flags = ((rc_old->flags & RRD_FLAG_QUEUED)?RRD_FLAG_QUEUED:RRD_FLAG_NONE)|RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION;
- rc_old->first_time_t = 0;
- rc_old->last_time_t = 0;
+ rc_old->first_time_s = 0;
+ rc_old->last_time_s = 0;
rrdcontext_unlock(rc_old);
rrdcontext_trigger_updates(rc_old, __FUNCTION__ );
}
@@ -1233,13 +1228,13 @@ static void rrdcontext_insert_callback(const DICTIONARY_ITEM *item __maybe_unuse
rc->version = rc->hub.version;
rc->priority = rc->hub.priority;
- rc->first_time_t = (time_t)rc->hub.first_time_t;
- rc->last_time_t = (time_t)rc->hub.last_time_t;
+ rc->first_time_s = (time_t)rc->hub.first_time_s;
+ rc->last_time_s = (time_t)rc->hub.last_time_s;
- if(rc->hub.deleted || !rc->hub.first_time_t)
+ if(rc->hub.deleted || !rc->hub.first_time_s)
rrd_flag_set_deleted(rc, RRD_FLAG_NONE);
else {
- if (rc->last_time_t == 0)
+ if (rc->last_time_s == 0)
rrd_flag_set_collected(rc);
else
rrd_flag_set_archived(rc);
@@ -1401,18 +1396,20 @@ void rrdhost_create_rrdcontexts(RRDHOST *host) {
if(unlikely(!host)) return;
if(likely(host->rrdctx)) return;
- host->rrdctx = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ host->rrdctx = (RRDCONTEXTS *)dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ &dictionary_stats_category_rrdcontext, sizeof(RRDCONTEXT));
+
dictionary_register_insert_callback((DICTIONARY *)host->rrdctx, rrdcontext_insert_callback, host);
dictionary_register_delete_callback((DICTIONARY *)host->rrdctx, rrdcontext_delete_callback, host);
dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx, rrdcontext_conflict_callback, host);
dictionary_register_react_callback((DICTIONARY *)host->rrdctx, rrdcontext_react_callback, host);
- host->rrdctx_hub_queue = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE);
+ host->rrdctx_hub_queue = (RRDCONTEXTS *)dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0);
dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_insert_callback, NULL);
dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_delete_callback, NULL);
dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_conflict_callback, NULL);
- host->rrdctx_post_processing_queue = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE);
+ host->rrdctx_post_processing_queue = (RRDCONTEXTS *)dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0);
dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_post_processing_queue, rrdcontext_post_processing_queue_insert_callback, NULL);
dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_post_processing_queue, rrdcontext_post_processing_queue_delete_callback, NULL);
dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx_post_processing_queue, rrdcontext_post_processing_queue_conflict_callback, NULL);
@@ -1743,8 +1740,8 @@ struct rrdcontext_to_json {
SIMPLE_PATTERN *chart_dimensions;
size_t written;
time_t now;
- time_t combined_first_time_t;
- time_t combined_last_time_t;
+ time_t combined_first_time_s;
+ time_t combined_last_time_s;
RRD_FLAGS combined_flags;
};
@@ -1760,10 +1757,10 @@ static inline int rrdmetric_to_json_callback(const DICTIONARY_ITEM *item, void *
if(unlikely(rrd_flag_is_deleted(rm) && !(options & RRDCONTEXT_OPTION_SHOW_DELETED)))
return 0;
- if(after && (!rm->last_time_t || after > rm->last_time_t))
+ if(after && (!rm->last_time_s || after > rm->last_time_s))
return 0;
- if(before && (!rm->first_time_t || before < rm->first_time_t))
+ if(before && (!rm->first_time_s || before < rm->first_time_s))
return 0;
if(t->chart_dimensions
@@ -1773,14 +1770,14 @@ static inline int rrdmetric_to_json_callback(const DICTIONARY_ITEM *item, void *
if(t->written) {
buffer_strcat(wb, ",\n");
- t->combined_first_time_t = MIN(t->combined_first_time_t, rm->first_time_t);
- t->combined_last_time_t = MAX(t->combined_last_time_t, rm->last_time_t);
+ t->combined_first_time_s = MIN(t->combined_first_time_s, rm->first_time_s);
+ t->combined_last_time_s = MAX(t->combined_last_time_s, rm->last_time_s);
t->combined_flags |= rrd_flags_get(rm);
}
else {
buffer_strcat(wb, "\n");
- t->combined_first_time_t = rm->first_time_t;
- t->combined_last_time_t = rm->last_time_t;
+ t->combined_first_time_s = rm->first_time_s;
+ t->combined_last_time_s = rm->last_time_s;
t->combined_flags = rrd_flags_get(rm);
}
@@ -1798,8 +1795,8 @@ static inline int rrdmetric_to_json_callback(const DICTIONARY_ITEM *item, void *
",\n\t\t\t\t\t\t\t\"last_time_t\":%lld"
",\n\t\t\t\t\t\t\t\"collected\":%s"
, string2str(rm->name)
- , (long long)rm->first_time_t
- , rrd_flag_is_collected(rm) ? (long long)t->now : (long long)rm->last_time_t
+ , (long long)rm->first_time_s
+ , rrd_flag_is_collected(rm) ? (long long)t->now : (long long)rm->last_time_s
, rrd_flag_is_collected(rm) ? "true" : "false"
);
@@ -1835,10 +1832,10 @@ static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void
if(unlikely(rrd_flag_is_deleted(ri) && !(options & RRDCONTEXT_OPTION_SHOW_DELETED)))
return 0;
- if(after && (!ri->last_time_t || after > ri->last_time_t))
+ if(after && (!ri->last_time_s || after > ri->last_time_s))
return 0;
- if(before && (!ri->first_time_t || before < ri->first_time_t))
+ if(before && (!ri->first_time_s || before < ri->first_time_s))
return 0;
if(t_parent->chart_label_key && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, t_parent->chart_label_key, '\0'))
@@ -1847,14 +1844,14 @@ static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void
if(t_parent->chart_labels_filter && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, t_parent->chart_labels_filter, ':'))
return 0;
- time_t first_time_t = ri->first_time_t;
- time_t last_time_t = ri->last_time_t;
+ time_t first_time_s = ri->first_time_s;
+ time_t last_time_s = ri->last_time_s;
RRD_FLAGS flags = rrd_flags_get(ri);
BUFFER *wb_metrics = NULL;
if(options & RRDCONTEXT_OPTION_SHOW_METRICS || t_parent->chart_dimensions) {
- wb_metrics = buffer_create(4096);
+ wb_metrics = buffer_create(4096, &netdata_buffers_statistics.buffers_api);
struct rrdcontext_to_json t_metrics = {
.wb = wb_metrics,
@@ -1874,21 +1871,21 @@ static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void
return 0;
}
- first_time_t = t_metrics.combined_first_time_t;
- last_time_t = t_metrics.combined_last_time_t;
+ first_time_s = t_metrics.combined_first_time_s;
+ last_time_s = t_metrics.combined_last_time_s;
flags = t_metrics.combined_flags;
}
if(t_parent->written) {
buffer_strcat(wb, ",\n");
- t_parent->combined_first_time_t = MIN(t_parent->combined_first_time_t, first_time_t);
- t_parent->combined_last_time_t = MAX(t_parent->combined_last_time_t, last_time_t);
+ t_parent->combined_first_time_s = MIN(t_parent->combined_first_time_s, first_time_s);
+ t_parent->combined_last_time_s = MAX(t_parent->combined_last_time_s, last_time_s);
t_parent->combined_flags |= flags;
}
else {
buffer_strcat(wb, "\n");
- t_parent->combined_first_time_t = first_time_t;
- t_parent->combined_last_time_t = last_time_t;
+ t_parent->combined_first_time_s = first_time_s;
+ t_parent->combined_last_time_s = last_time_s;
t_parent->combined_flags = flags;
}
@@ -1908,7 +1905,7 @@ static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void
",\n\t\t\t\t\t\"family\":\"%s\""
",\n\t\t\t\t\t\"chart_type\":\"%s\""
",\n\t\t\t\t\t\"priority\":%u"
- ",\n\t\t\t\t\t\"update_every\":%d"
+ ",\n\t\t\t\t\t\"update_every\":%ld"
",\n\t\t\t\t\t\"first_time_t\":%lld"
",\n\t\t\t\t\t\"last_time_t\":%lld"
",\n\t\t\t\t\t\"collected\":%s"
@@ -1919,9 +1916,9 @@ static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void
, string2str(ri->family)
, rrdset_type_name(ri->chart_type)
, ri->priority
- , ri->update_every
- , (long long)first_time_t
- , (flags & RRD_FLAG_COLLECTED) ? (long long)t_parent->now : (long long)last_time_t
+ , ri->update_every_s
+ , (long long)first_time_s
+ , (flags & RRD_FLAG_COLLECTED) ? (long long)t_parent->now : (long long)last_time_s
, (flags & RRD_FLAG_COLLECTED) ? "true" : "false"
);
@@ -1976,14 +1973,14 @@ static inline int rrdcontext_to_json_callback(const DICTIONARY_ITEM *item, void
if(options & RRDCONTEXT_OPTION_DEEPSCAN)
rrdcontext_recalculate_context_retention(rc, RRD_FLAG_NONE, false);
- if(after && (!rc->last_time_t || after > rc->last_time_t))
+ if(after && (!rc->last_time_s || after > rc->last_time_s))
return 0;
- if(before && (!rc->first_time_t || before < rc->first_time_t))
+ if(before && (!rc->first_time_s || before < rc->first_time_s))
return 0;
- time_t first_time_t = rc->first_time_t;
- time_t last_time_t = rc->last_time_t;
+ time_t first_time_s = rc->first_time_s;
+ time_t last_time_s = rc->last_time_s;
RRD_FLAGS flags = rrd_flags_get(rc);
BUFFER *wb_instances = NULL;
@@ -1992,7 +1989,7 @@ static inline int rrdcontext_to_json_callback(const DICTIONARY_ITEM *item, void
|| t_parent->chart_labels_filter
|| t_parent->chart_dimensions) {
- wb_instances = buffer_create(4096);
+ wb_instances = buffer_create(4096, &netdata_buffers_statistics.buffers_api);
struct rrdcontext_to_json t_instances = {
.wb = wb_instances,
@@ -2012,8 +2009,8 @@ static inline int rrdcontext_to_json_callback(const DICTIONARY_ITEM *item, void
return 0;
}
- first_time_t = t_instances.combined_first_time_t;
- last_time_t = t_instances.combined_last_time_t;
+ first_time_s = t_instances.combined_first_time_s;
+ last_time_s = t_instances.combined_last_time_s;
flags = t_instances.combined_flags;
}
@@ -2043,8 +2040,8 @@ static inline int rrdcontext_to_json_callback(const DICTIONARY_ITEM *item, void
, string2str(rc->family)
, rrdset_type_name(rc->chart_type)
, rc->priority
- , (long long)first_time_t
- , (flags & RRD_FLAG_COLLECTED) ? (long long)t_parent->now : (long long)last_time_t
+ , (long long)first_time_s
+ , (flags & RRD_FLAG_COLLECTED) ? (long long)t_parent->now : (long long)last_time_s
, (flags & RRD_FLAG_COLLECTED) ? "true" : "false"
);
@@ -2220,7 +2217,7 @@ DICTIONARY *rrdcontext_all_metrics_to_dict(RRDHOST *host, SIMPLE_PATTERN *contex
if(!host || !host->rrdctx)
return NULL;
- DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_DONT_OVERWRITE_VALUE);
+ DICTIONARY *dict = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_rrdcontext, 0);
dictionary_register_insert_callback(dict, metric_entry_insert_callback, NULL);
dictionary_register_delete_callback(dict, metric_entry_delete_callback, NULL);
dictionary_register_conflict_callback(dict, metric_entry_conflict_callback, NULL);
@@ -2328,11 +2325,25 @@ void query_target_release(QUERY_TARGET *qt) {
string_freez(qt->query.array[i].chart.name);
qt->query.array[i].chart.name = NULL;
+ // reset the plans
+ for(size_t p = 0; p < qt->query.array[i].plan.used; p++) {
+ internal_fatal(qt->query.array[i].plan.array[p].initialized &&
+ !qt->query.array[i].plan.array[p].finalized,
+ "QUERY: left-over initialized plan");
+
+ qt->query.array[i].plan.array[p].initialized = false;
+ qt->query.array[i].plan.array[p].finalized = false;
+ }
+ qt->query.array[i].plan.used = 0;
+
+ // reset the tiers
for(size_t tier = 0; tier < storage_tiers ;tier++) {
if(qt->query.array[i].tiers[tier].db_metric_handle) {
STORAGE_ENGINE *eng = qt->query.array[i].tiers[tier].eng;
eng->api.metric_release(qt->query.array[i].tiers[tier].db_metric_handle);
qt->query.array[i].tiers[tier].db_metric_handle = NULL;
+ qt->query.array[i].tiers[tier].weight = 0;
+ qt->query.array[i].tiers[tier].eng = NULL;
}
}
}
@@ -2366,37 +2377,44 @@ void query_target_release(QUERY_TARGET *qt) {
qt->contexts.used = 0;
qt->hosts.used = 0;
- qt->db.minimum_latest_update_every = 0;
- qt->db.first_time_t = 0;
- qt->db.last_time_t = 0;
+ qt->db.minimum_latest_update_every_s = 0;
+ qt->db.first_time_s = 0;
+ qt->db.last_time_s = 0;
qt->id[0] = '\0';
qt->used = false;
}
void query_target_free(void) {
- if(thread_query_target.used)
- query_target_release(&thread_query_target);
+ QUERY_TARGET *qt = &thread_query_target;
- freez(thread_query_target.query.array);
- thread_query_target.query.array = NULL;
- thread_query_target.query.size = 0;
+ if(qt->used)
+ query_target_release(qt);
- freez(thread_query_target.metrics.array);
- thread_query_target.metrics.array = NULL;
- thread_query_target.metrics.size = 0;
+ __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->query.size * sizeof(QUERY_METRIC), __ATOMIC_RELAXED);
+ freez(qt->query.array);
+ qt->query.array = NULL;
+ qt->query.size = 0;
- freez(thread_query_target.instances.array);
- thread_query_target.instances.array = NULL;
- thread_query_target.instances.size = 0;
+ __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->metrics.size * sizeof(RRDMETRIC_ACQUIRED *), __ATOMIC_RELAXED);
+ freez(qt->metrics.array);
+ qt->metrics.array = NULL;
+ qt->metrics.size = 0;
- freez(thread_query_target.contexts.array);
- thread_query_target.contexts.array = NULL;
- thread_query_target.contexts.size = 0;
+ __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *), __ATOMIC_RELAXED);
+ freez(qt->instances.array);
+ qt->instances.array = NULL;
+ qt->instances.size = 0;
- freez(thread_query_target.hosts.array);
- thread_query_target.hosts.array = NULL;
- thread_query_target.hosts.size = 0;
+ __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *), __ATOMIC_RELAXED);
+ freez(qt->contexts.array);
+ qt->contexts.array = NULL;
+ qt->contexts.size = 0;
+
+ __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->hosts.size * sizeof(RRDHOST *), __ATOMIC_RELAXED);
+ freez(qt->hosts.array);
+ qt->hosts.array = NULL;
+ qt->hosts.size = 0;
}
static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED *rma, RRDINSTANCE *ri,
@@ -2408,69 +2426,73 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
return;
if(qt->metrics.used == qt->metrics.size) {
+ size_t old_mem = qt->metrics.size * sizeof(RRDMETRIC_ACQUIRED *);
qt->metrics.size = (qt->metrics.size) ? qt->metrics.size * 2 : 1;
- qt->metrics.array = reallocz(qt->metrics.array, qt->metrics.size * sizeof(RRDMETRIC_ACQUIRED *));
+ size_t new_mem = qt->metrics.size * sizeof(RRDMETRIC_ACQUIRED *);
+ qt->metrics.array = reallocz(qt->metrics.array, new_mem);
+
+ __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED);
}
qt->metrics.array[qt->metrics.used++] = rrdmetric_acquired_dup(rma);
if(!queryable_instance)
return;
- time_t common_first_time_t = 0;
- time_t common_last_time_t = 0;
- time_t common_update_every = 0;
+ time_t common_first_time_s = 0;
+ time_t common_last_time_s = 0;
+ time_t common_update_every_s = 0;
size_t tiers_added = 0;
struct {
STORAGE_ENGINE *eng;
STORAGE_METRIC_HANDLE *db_metric_handle;
- time_t db_first_time_t;
- time_t db_last_time_t;
- time_t db_update_every;
+ time_t db_first_time_s;
+ time_t db_last_time_s;
+ time_t db_update_every_s;
} tier_retention[storage_tiers];
for (size_t tier = 0; tier < storage_tiers; tier++) {
STORAGE_ENGINE *eng = qtl->host->db[tier].eng;
tier_retention[tier].eng = eng;
- tier_retention[tier].db_update_every = (time_t) (qtl->host->db[tier].tier_grouping * ri->update_every);
+ tier_retention[tier].db_update_every_s = (time_t) (qtl->host->db[tier].tier_grouping * ri->update_every_s);
- if(rm->rrddim && rm->rrddim->tiers[tier] && rm->rrddim->tiers[tier]->db_metric_handle)
- tier_retention[tier].db_metric_handle = eng->api.metric_dup(rm->rrddim->tiers[tier]->db_metric_handle);
+ if(rm->rrddim && rm->rrddim->tiers[tier].db_metric_handle)
+ tier_retention[tier].db_metric_handle = eng->api.metric_dup(rm->rrddim->tiers[tier].db_metric_handle);
else
tier_retention[tier].db_metric_handle = eng->api.metric_get(qtl->host->db[tier].instance, &rm->uuid);
if(tier_retention[tier].db_metric_handle) {
- tier_retention[tier].db_first_time_t = tier_retention[tier].eng->api.query_ops.oldest_time(tier_retention[tier].db_metric_handle);
- tier_retention[tier].db_last_time_t = tier_retention[tier].eng->api.query_ops.latest_time(tier_retention[tier].db_metric_handle);
+ tier_retention[tier].db_first_time_s = tier_retention[tier].eng->api.query_ops.oldest_time_s(tier_retention[tier].db_metric_handle);
+ tier_retention[tier].db_last_time_s = tier_retention[tier].eng->api.query_ops.latest_time_s(tier_retention[tier].db_metric_handle);
- if(!common_first_time_t)
- common_first_time_t = tier_retention[tier].db_first_time_t;
- else if(tier_retention[tier].db_first_time_t)
- common_first_time_t = MIN(common_first_time_t, tier_retention[tier].db_first_time_t);
+ if(!common_first_time_s)
+ common_first_time_s = tier_retention[tier].db_first_time_s;
+ else if(tier_retention[tier].db_first_time_s)
+ common_first_time_s = MIN(common_first_time_s, tier_retention[tier].db_first_time_s);
- if(!common_last_time_t)
- common_last_time_t = tier_retention[tier].db_last_time_t;
+ if(!common_last_time_s)
+ common_last_time_s = tier_retention[tier].db_last_time_s;
else
- common_last_time_t = MAX(common_last_time_t, tier_retention[tier].db_last_time_t);
+ common_last_time_s = MAX(common_last_time_s, tier_retention[tier].db_last_time_s);
- if(!common_update_every)
- common_update_every = tier_retention[tier].db_update_every;
- else if(tier_retention[tier].db_update_every)
- common_update_every = MIN(common_update_every, tier_retention[tier].db_update_every);
+ if(!common_update_every_s)
+ common_update_every_s = tier_retention[tier].db_update_every_s;
+ else if(tier_retention[tier].db_update_every_s)
+ common_update_every_s = MIN(common_update_every_s, tier_retention[tier].db_update_every_s);
tiers_added++;
}
else {
- tier_retention[tier].db_first_time_t = 0;
- tier_retention[tier].db_last_time_t = 0;
- tier_retention[tier].db_update_every = 0;
+ tier_retention[tier].db_first_time_s = 0;
+ tier_retention[tier].db_last_time_s = 0;
+ tier_retention[tier].db_update_every_s = 0;
}
}
bool release_retention = true;
bool timeframe_matches =
(tiers_added
- && (common_first_time_t - common_update_every * 2) <= qt->window.before
- && (common_last_time_t + common_update_every * 2) >= qt->window.after
+ && (common_first_time_s - common_update_every_s * 2) <= qt->window.before
+ && (common_last_time_s + common_update_every_s * 2) >= qt->window.after
) ? true : false;
if(timeframe_matches) {
@@ -2515,14 +2537,19 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
// let's add it to the query metrics
if(ri->rrdset)
- ri->rrdset->last_accessed_time = qtl->start_s;
+ ri->rrdset->last_accessed_time_s = qtl->start_s;
if (qt->query.used == qt->query.size) {
+ size_t old_mem = qt->query.size * sizeof(QUERY_METRIC);
qt->query.size = (qt->query.size) ? qt->query.size * 2 : 1;
- qt->query.array = reallocz(qt->query.array, qt->query.size * sizeof(QUERY_METRIC));
+ size_t new_mem = qt->query.size * sizeof(QUERY_METRIC);
+ qt->query.array = reallocz(qt->query.array, new_mem);
+
+ __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED);
}
QUERY_METRIC *qm = &qt->query.array[qt->query.used++];
+ qm->plan.used = 0;
qm->dimension.options = options;
qm->link.host = qtl->host;
@@ -2536,18 +2563,18 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
qm->dimension.id = string_dup(rm->id);
qm->dimension.name = string_dup(rm->name);
- if (!qt->db.first_time_t || common_first_time_t < qt->db.first_time_t)
- qt->db.first_time_t = common_first_time_t;
+ if (!qt->db.first_time_s || common_first_time_s < qt->db.first_time_s)
+ qt->db.first_time_s = common_first_time_s;
- if (!qt->db.last_time_t || common_last_time_t > qt->db.last_time_t)
- qt->db.last_time_t = common_last_time_t;
+ if (!qt->db.last_time_s || common_last_time_s > qt->db.last_time_s)
+ qt->db.last_time_s = common_last_time_s;
for (size_t tier = 0; tier < storage_tiers; tier++) {
qm->tiers[tier].eng = tier_retention[tier].eng;
qm->tiers[tier].db_metric_handle = tier_retention[tier].db_metric_handle;
- qm->tiers[tier].db_first_time_t = tier_retention[tier].db_first_time_t;
- qm->tiers[tier].db_last_time_t = tier_retention[tier].db_last_time_t;
- qm->tiers[tier].db_update_every = tier_retention[tier].db_update_every;
+ qm->tiers[tier].db_first_time_s = tier_retention[tier].db_first_time_s;
+ qm->tiers[tier].db_last_time_s = tier_retention[tier].db_last_time_s;
+ qm->tiers[tier].db_update_every_s = tier_retention[tier].db_update_every_s;
}
release_retention = false;
}
@@ -2572,14 +2599,18 @@ static void query_target_add_instance(QUERY_TARGET_LOCALS *qtl, RRDINSTANCE_ACQU
return;
if(qt->instances.used == qt->instances.size) {
+ size_t old_mem = qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *);
qt->instances.size = (qt->instances.size) ? qt->instances.size * 2 : 1;
- qt->instances.array = reallocz(qt->instances.array, qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *));
+ size_t new_mem = qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *);
+ qt->instances.array = reallocz(qt->instances.array, new_mem);
+
+ __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED);
}
qtl->ria = qt->instances.array[qt->instances.used++] = rrdinstance_acquired_dup(ria);
- if(qt->db.minimum_latest_update_every == 0 || ri->update_every < qt->db.minimum_latest_update_every)
- qt->db.minimum_latest_update_every = ri->update_every;
+ if(qt->db.minimum_latest_update_every_s == 0 || ri->update_every_s < qt->db.minimum_latest_update_every_s)
+ qt->db.minimum_latest_update_every_s = ri->update_every_s;
if(queryable_instance) {
if ((qt->instances.chart_label_key_pattern && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, qt->instances.chart_label_key_pattern, ':')) ||
@@ -2616,8 +2647,12 @@ static void query_target_add_context(QUERY_TARGET_LOCALS *qtl, RRDCONTEXT_ACQUIR
return;
if(qt->contexts.used == qt->contexts.size) {
+ size_t old_mem = qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *);
qt->contexts.size = (qt->contexts.size) ? qt->contexts.size * 2 : 1;
- qt->contexts.array = reallocz(qt->contexts.array, qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *));
+ size_t new_mem = qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *);
+ qt->contexts.array = reallocz(qt->contexts.array, new_mem);
+
+ __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED);
}
qtl->rca = qt->contexts.array[qt->contexts.used++] = rrdcontext_acquired_dup(rca);
@@ -2656,8 +2691,12 @@ static void query_target_add_host(QUERY_TARGET_LOCALS *qtl, RRDHOST *host) {
QUERY_TARGET *qt = qtl->qt;
if(qt->hosts.used == qt->hosts.size) {
+ size_t old_mem = qt->hosts.size * sizeof(RRDHOST *);
qt->hosts.size = (qt->hosts.size) ? qt->hosts.size * 2 : 1;
- qt->hosts.array = reallocz(qt->hosts.array, qt->hosts.size * sizeof(RRDHOST *));
+ size_t new_mem = qt->hosts.size * sizeof(RRDHOST *);
+ qt->hosts.array = reallocz(qt->hosts.array, new_mem);
+
+ __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED);
}
qtl->host = qt->hosts.array[qt->hosts.used++] = host;
@@ -2770,6 +2809,9 @@ void query_target_generate_name(QUERY_TARGET *qt) {
}
QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) {
+ if(!service_running(ABILITY_DATA_QUERIES))
+ return NULL;
+
QUERY_TARGET *qt = &thread_query_target;
if(qt->used)
@@ -2800,7 +2842,7 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) {
.charts_labels_filter = qt->request.charts_labels_filter,
};
- qt->db.minimum_latest_update_every = 0; // it will be updated by query_target_add_query()
+ qt->db.minimum_latest_update_every_s = 0; // it will be updated by query_target_add_query()
// prepare all the patterns
qt->hosts.pattern = is_valid_sp(qtl.hosts) ? simple_pattern_create(qtl.hosts, ",|\t\r\n\f\v", SIMPLE_PATTERN_EXACT) : NULL;
@@ -2922,7 +2964,7 @@ static void rrdinstance_load_chart_callback(SQL_CHART_DATA *sc, void *data) {
.family = string_strdupz(sc->family),
.chart_type = sc->chart_type,
.priority = sc->priority,
- .update_every = sc->update_every,
+ .update_every_s = sc->update_every,
.flags = RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_LOAD_SQL, // no need for atomics
};
uuid_copy(tri.uuid, sc->chart_id);
@@ -3012,7 +3054,7 @@ static uint64_t rrdcontext_version_hash_with_callback(
// when the context is being collected,
// rc->hub.last_time_t is already zero
- hash += rc->hub.version + rc->hub.last_time_t - rc->hub.first_time_t;
+ hash += rc->hub.version + rc->hub.last_time_s - rc->hub.first_time_s;
rrdcontext_unlock(rc);
@@ -3057,17 +3099,16 @@ static bool rrdmetric_update_retention(RRDMETRIC *rm) {
time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
if(rm->rrddim) {
- min_first_time_t = rrddim_first_entry_t(rm->rrddim);
- max_last_time_t = rrddim_last_entry_t(rm->rrddim);
+ min_first_time_t = rrddim_first_entry_s(rm->rrddim);
+ max_last_time_t = rrddim_last_entry_s(rm->rrddim);
}
-#ifdef ENABLE_DBENGINE
- else if (dbengine_enabled) {
+ else {
RRDHOST *rrdhost = rm->ri->rc->rrdhost;
for (size_t tier = 0; tier < storage_tiers; tier++) {
- if(!rrdhost->db[tier].instance) continue;
+ STORAGE_ENGINE *eng = rrdhost->db[tier].eng;
time_t first_time_t, last_time_t;
- if (rrdeng_metric_retention_by_uuid(rrdhost->db[tier].instance, &rm->uuid, &first_time_t, &last_time_t) == 0) {
+ if (eng->api.metric_retention_by_uuid(rrdhost->db[tier].instance, &rm->uuid, &first_time_t, &last_time_t)) {
if (first_time_t < min_first_time_t)
min_first_time_t = first_time_t;
@@ -3076,17 +3117,15 @@ static bool rrdmetric_update_retention(RRDMETRIC *rm) {
}
}
}
- else {
- // cannot get retention
+
+ if((min_first_time_t == LONG_MAX || min_first_time_t == 0) && max_last_time_t == 0)
return false;
- }
-#endif
if(min_first_time_t == LONG_MAX)
min_first_time_t = 0;
if(min_first_time_t > max_last_time_t) {
- internal_error(true, "RRDMETRIC: retention of '%s' is flipped", string2str(rm->id));
+ internal_error(true, "RRDMETRIC: retention of '%s' is flipped, first_time_t = %ld, last_time_t = %ld", string2str(rm->id), min_first_time_t, max_last_time_t);
time_t tmp = min_first_time_t;
min_first_time_t = max_last_time_t;
max_last_time_t = tmp;
@@ -3094,17 +3133,17 @@ static bool rrdmetric_update_retention(RRDMETRIC *rm) {
// check if retention changed
- if (min_first_time_t != rm->first_time_t) {
- rm->first_time_t = min_first_time_t;
+ if (min_first_time_t != rm->first_time_s) {
+ rm->first_time_s = min_first_time_t;
rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
}
- if (max_last_time_t != rm->last_time_t) {
- rm->last_time_t = max_last_time_t;
+ if (max_last_time_t != rm->last_time_s) {
+ rm->last_time_s = max_last_time_t;
rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
}
- if(unlikely(!rm->first_time_t && !rm->last_time_t))
+ if(unlikely(!rm->first_time_s && !rm->last_time_s))
rrd_flag_set_deleted(rm, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
rrd_flag_set(rm, RRD_FLAG_LIVE_RETENTION);
@@ -3123,7 +3162,7 @@ static inline bool rrdmetric_should_be_deleted(RRDMETRIC *rm) {
return false;
rrdmetric_update_retention(rm);
- if(rm->first_time_t || rm->last_time_t)
+ if(rm->first_time_s || rm->last_time_s)
return false;
return true;
@@ -3145,7 +3184,7 @@ static inline bool rrdinstance_should_be_deleted(RRDINSTANCE *ri) {
if(unlikely(dictionary_entries(ri->rrdmetrics) != 0))
return false;
- if(ri->first_time_t || ri->last_time_t)
+ if(ri->first_time_s || ri->last_time_s)
return false;
return true;
@@ -3164,7 +3203,7 @@ static inline bool rrdcontext_should_be_deleted(RRDCONTEXT *rc) {
if(unlikely(dictionary_entries(rc->rrdinstances) != 0))
return false;
- if(unlikely(rc->first_time_t || rc->last_time_t))
+ if(unlikely(rc->first_time_s || rc->last_time_s))
return false;
return true;
@@ -3189,7 +3228,7 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo
RRDCONTEXT *rc;
dfe_start_reentrant((DICTIONARY *)host->rrdctx, rc) {
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP);
@@ -3197,7 +3236,7 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo
RRDINSTANCE *ri;
dfe_start_reentrant(rc->rrdinstances, ri) {
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
RRDMETRIC *rm;
dfe_start_write(ri->rrdmetrics, rm) {
@@ -3313,7 +3352,7 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL
if(dictionary_entries(ri->rrdmetrics) > 0) {
RRDMETRIC *rm;
dfe_start_read((DICTIONARY *)ri->rrdmetrics, rm) {
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
RRD_FLAGS reason_to_pass = reason;
if(rrd_flag_check(ri, RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION))
@@ -3329,16 +3368,16 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL
continue;
}
- if(!currently_collected && rrd_flag_check(rm, RRD_FLAG_COLLECTED) && rm->first_time_t)
+ if(!currently_collected && rrd_flag_check(rm, RRD_FLAG_COLLECTED) && rm->first_time_s)
currently_collected = true;
metrics_active++;
- if (rm->first_time_t && rm->first_time_t < min_first_time_t)
- min_first_time_t = rm->first_time_t;
+ if (rm->first_time_s && rm->first_time_s < min_first_time_t)
+ min_first_time_t = rm->first_time_s;
- if (rm->last_time_t && rm->last_time_t > max_last_time_t)
- max_last_time_t = rm->last_time_t;
+ if (rm->last_time_s && rm->last_time_s > max_last_time_t)
+ max_last_time_t = rm->last_time_s;
}
dfe_done(rm);
}
@@ -3351,13 +3390,13 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL
if(unlikely(!metrics_active)) {
// no metrics available
- if(ri->first_time_t) {
- ri->first_time_t = 0;
+ if(ri->first_time_s) {
+ ri->first_time_s = 0;
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
}
- if(ri->last_time_t) {
- ri->last_time_t = 0;
+ if(ri->last_time_s) {
+ ri->last_time_s = 0;
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
}
@@ -3370,13 +3409,13 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL
min_first_time_t = 0;
if (unlikely(min_first_time_t == 0 || max_last_time_t == 0)) {
- if(ri->first_time_t) {
- ri->first_time_t = 0;
+ if(ri->first_time_s) {
+ ri->first_time_s = 0;
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
}
- if(ri->last_time_t) {
- ri->last_time_t = 0;
+ if(ri->last_time_s) {
+ ri->last_time_s = 0;
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
}
@@ -3386,13 +3425,13 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL
else {
rrd_flag_clear(ri, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
- if (unlikely(ri->first_time_t != min_first_time_t)) {
- ri->first_time_t = min_first_time_t;
+ if (unlikely(ri->first_time_s != min_first_time_t)) {
+ ri->first_time_s = min_first_time_t;
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
}
- if (unlikely(ri->last_time_t != max_last_time_t)) {
- ri->last_time_t = max_last_time_t;
+ if (unlikely(ri->last_time_s != max_last_time_t)) {
+ ri->last_time_s = max_last_time_t;
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
}
@@ -3413,6 +3452,8 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
if(worker_jobs)
worker_is_busy(WORKER_JOB_PP_CONTEXT);
+ size_t min_priority_collected = LONG_MAX;
+ size_t min_priority_not_collected = LONG_MAX;
size_t min_priority = LONG_MAX;
time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
size_t instances_active = 0, instances_deleted = 0;
@@ -3420,7 +3461,7 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
if(dictionary_entries(rc->rrdinstances) > 0) {
RRDINSTANCE *ri;
dfe_start_reentrant(rc->rrdinstances, ri) {
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
RRD_FLAGS reason_to_pass = reason;
if(rrd_flag_check(rc, RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION))
@@ -3439,7 +3480,7 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
continue;
}
- if(unlikely(!currently_collected && rrd_flag_is_collected(ri) && ri->first_time_t))
+ if(unlikely(!currently_collected && rrd_flag_is_collected(ri) && ri->first_time_s))
currently_collected = true;
internal_error(rc->units != ri->units,
@@ -3449,16 +3490,31 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
instances_active++;
- if (ri->priority >= RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY && ri->priority < min_priority)
- min_priority = ri->priority;
+ if (ri->priority >= RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY) {
+ if(rrd_flag_check(ri, RRD_FLAG_COLLECTED)) {
+ if(ri->priority < min_priority_collected)
+ min_priority_collected = ri->priority;
+ }
+ else {
+ if(ri->priority < min_priority_not_collected)
+ min_priority_not_collected = ri->priority;
+ }
+ }
- if (ri->first_time_t && ri->first_time_t < min_first_time_t)
- min_first_time_t = ri->first_time_t;
+ if (ri->first_time_s && ri->first_time_s < min_first_time_t)
+ min_first_time_t = ri->first_time_s;
- if (ri->last_time_t && ri->last_time_t > max_last_time_t)
- max_last_time_t = ri->last_time_t;
+ if (ri->last_time_s && ri->last_time_s > max_last_time_t)
+ max_last_time_t = ri->last_time_s;
}
dfe_done(ri);
+
+ if(min_priority_collected != LONG_MAX)
+ // use the collected priority
+ min_priority = min_priority_collected;
+ else
+ // use the non-collected priority
+ min_priority = min_priority_not_collected;
}
{
@@ -3485,13 +3541,13 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
if(unlikely(!instances_active)) {
// we had some instances, but they are gone now...
- if(rc->first_time_t) {
- rc->first_time_t = 0;
+ if(rc->first_time_s) {
+ rc->first_time_s = 0;
rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
}
- if(rc->last_time_t) {
- rc->last_time_t = 0;
+ if(rc->last_time_s) {
+ rc->last_time_s = 0;
rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
}
@@ -3504,13 +3560,13 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
min_first_time_t = 0;
if (unlikely(min_first_time_t == 0 && max_last_time_t == 0)) {
- if(rc->first_time_t) {
- rc->first_time_t = 0;
+ if(rc->first_time_s) {
+ rc->first_time_s = 0;
rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
}
- if(rc->last_time_t) {
- rc->last_time_t = 0;
+ if(rc->last_time_s) {
+ rc->last_time_s = 0;
rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
}
@@ -3519,13 +3575,13 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
else {
rrd_flag_clear(rc, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
- if (unlikely(rc->first_time_t != min_first_time_t)) {
- rc->first_time_t = min_first_time_t;
+ if (unlikely(rc->first_time_s != min_first_time_t)) {
+ rc->first_time_s = min_first_time_t;
rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
}
- if (rc->last_time_t != max_last_time_t) {
- rc->last_time_t = max_last_time_t;
+ if (rc->last_time_s != max_last_time_t) {
+ rc->last_time_s = max_last_time_t;
rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
}
@@ -3592,7 +3648,7 @@ static void rrdcontext_post_process_queued_contexts(RRDHOST *host) {
RRDCONTEXT *rc;
dfe_start_reentrant((DICTIONARY *)host->rrdctx_post_processing_queue, rc) {
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
rrdcontext_dequeue_from_post_processing(rc);
rrdcontext_post_process_updates(rc, false, RRD_FLAG_NONE, true);
@@ -3621,8 +3677,8 @@ static void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe
rc->hub.family = string2str(rc->family);
rc->hub.chart_type = rrdset_type_name(rc->chart_type);
rc->hub.priority = rc->priority;
- rc->hub.first_time_t = rc->first_time_t;
- rc->hub.last_time_t = rrd_flag_is_collected(rc) ? 0 : rc->last_time_t;
+ rc->hub.first_time_s = rc->first_time_s;
+ rc->hub.last_time_s = rrd_flag_is_collected(rc) ? 0 : rc->last_time_s;
rc->hub.deleted = rrd_flag_is_deleted(rc) ? true : false;
#ifdef ENABLE_ACLK
@@ -3634,8 +3690,8 @@ static void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe
.family = rc->hub.family,
.chart_type = rc->hub.chart_type,
.priority = rc->hub.priority,
- .first_entry = rc->hub.first_time_t,
- .last_entry = rc->hub.last_time_t,
+ .first_entry = rc->hub.first_time_s,
+ .last_entry = rc->hub.last_time_s,
.deleted = rc->hub.deleted,
};
@@ -3689,10 +3745,10 @@ static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending _
if(unlikely(rc->priority != rc->hub.priority))
priority_changed = true;
- if(unlikely((uint64_t)rc->first_time_t != rc->hub.first_time_t))
+ if(unlikely((uint64_t)rc->first_time_s != rc->hub.first_time_s))
first_time_changed = true;
- if(unlikely((uint64_t)((flags & RRD_FLAG_COLLECTED) ? 0 : rc->last_time_t) != rc->hub.last_time_t))
+ if(unlikely((uint64_t)((flags & RRD_FLAG_COLLECTED) ? 0 : rc->last_time_s) != rc->hub.last_time_s))
last_time_changed = true;
if(unlikely(((flags & RRD_FLAG_DELETED) ? true : false) != rc->hub.deleted))
@@ -3711,8 +3767,8 @@ static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending _
string2str(rc->family), family_changed ? " (CHANGED)" : "",
rrdset_type_name(rc->chart_type), chart_type_changed ? " (CHANGED)" : "",
rc->priority, priority_changed ? " (CHANGED)" : "",
- rc->first_time_t, first_time_changed ? " (CHANGED)" : "",
- (flags & RRD_FLAG_COLLECTED) ? 0 : rc->last_time_t, last_time_changed ? " (CHANGED)" : "",
+ rc->first_time_s, first_time_changed ? " (CHANGED)" : "",
+ (flags & RRD_FLAG_COLLECTED) ? 0 : rc->last_time_s, last_time_changed ? " (CHANGED)" : "",
(flags & RRD_FLAG_DELETED) ? "true" : "false", deleted_changed ? " (CHANGED)" : "",
sending ? (now_realtime_usec() - rc->queue.queued_ut) / USEC_PER_MS : 0,
sending ? (rc->queue.scheduled_dispatch_ut - rc->queue.queued_ut) / USEC_PER_MS : 0
@@ -3773,7 +3829,7 @@ static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now
RRDCONTEXT *rc;
dfe_start_reentrant((DICTIONARY *)host->rrdctx_hub_queue, rc) {
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
if(unlikely(messages_added >= MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST))
break;
@@ -3840,7 +3896,7 @@ static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now
dfe_done(rc);
#ifdef ENABLE_ACLK
- if(!netdata_exit && bundle) {
+ if(service_running(SERVICE_CONTEXT) && bundle) {
// we have a bundle to send messages
// update the version hash
@@ -3891,11 +3947,11 @@ void *rrdcontext_main(void *ptr) {
heartbeat_init(&hb);
usec_t step = RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC;
- while (!netdata_exit) {
+ while (service_running(SERVICE_CONTEXT)) {
worker_is_idle();
heartbeat_next(&hb, step);
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
usec_t now_ut = now_realtime_usec();
@@ -3911,7 +3967,7 @@ void *rrdcontext_main(void *ptr) {
rrd_rdlock();
RRDHOST *host;
rrdhost_foreach_read(host) {
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
worker_is_busy(WORKER_JOB_HOSTS);