summaryrefslogtreecommitdiffstats
path: root/database/rrdset.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/rrdset.c417
1 files changed, 219 insertions, 198 deletions
diff --git a/database/rrdset.c b/database/rrdset.c
index 6eb3c7105..57f962cd6 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -28,6 +28,8 @@ static inline void rrdset_index_del_name(RRDHOST *host, RRDSET *st) {
}
static inline RRDSET *rrdset_index_find_name(RRDHOST *host, const char *name) {
+ if (unlikely(!host->rrdset_root_index_name))
+ return NULL;
return dictionary_get(host->rrdset_root_index_name, name);
}
@@ -126,15 +128,15 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
st->module_name = rrd_string_strdupz(ctr->module);
st->priority = ctr->priority;
- st->cache_dir = rrdset_cache_dir(host, chart_full_id);
st->entries = (ctr->memory_mode != RRD_MEMORY_MODE_DBENGINE) ? align_entries_to_pagesize(ctr->memory_mode, ctr->history_entries) : 5;
st->update_every = ctr->update_every;
st->rrd_memory_mode = ctr->memory_mode;
st->chart_type = ctr->chart_type;
- st->gap_when_lost_iterations_above = (int) (gap_when_lost_iterations_above + 2);
st->rrdhost = host;
+ netdata_spinlock_init(&st->data_collection_lock);
+
st->flags = RRDSET_FLAG_SYNC_CLOCK
| RRDSET_FLAG_INDEXED_ID
| RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED
@@ -165,7 +167,7 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
// chart variables - we need this for data collection to work (collector given chart variables) - not only health
rrdsetvar_index_init(st);
- if (host->health_enabled) {
+ if (host->health.health_enabled) {
st->rrdfamily = rrdfamily_add_and_acquire(host, rrdset_family(st));
st->rrdvars = rrdvariables_create();
rrddimvar_index_init(st);
@@ -178,6 +180,31 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
st->red = NAN;
ctr->react_action = RRDSET_REACT_NEW;
+
+ ml_chart_new(st);
+}
+
+void rrdset_finalize_collection(RRDSET *st, bool dimensions_too) {
+ RRDHOST *host = st->rrdhost;
+
+ rrdset_flag_set(st, RRDSET_FLAG_COLLECTION_FINISHED);
+
+ if(dimensions_too) {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st)
+ rrddim_finalize_collection_and_check_retention(rd);
+ rrddim_foreach_done(rd);
+ }
+
+ for(size_t tier = 0; tier < storage_tiers ; tier++) {
+ STORAGE_ENGINE *eng = st->rrdhost->db[tier].eng;
+ if(!eng) continue;
+
+ if(st->storage_metrics_groups[tier]) {
+ eng->api.collect_ops.metrics_group_release(host->db[tier].instance, st->storage_metrics_groups[tier]);
+ st->storage_metrics_groups[tier] = NULL;
+ }
+ }
}
// the destructor - the dictionary is write locked while this runs
@@ -187,15 +214,7 @@ static void rrdset_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v
rrdset_flag_clear(st, RRDSET_FLAG_INDEXED_ID);
- // cleanup storage engines
- {
- for(size_t tier = 0; tier < storage_tiers ; tier++) {
- STORAGE_ENGINE *eng = st->rrdhost->db[tier].eng;
- if(!eng) continue;
-
- eng->api.collect_ops.metrics_group_release(host->db[tier].instance, st->storage_metrics_groups[tier]);
- }
- }
+ rrdset_finalize_collection(st, false);
// remove it from the name index
rrdset_index_del_name(host, st);
@@ -232,6 +251,9 @@ static void rrdset_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v
// 7. destroy the chart labels
rrdlabels_destroy(st->rrdlabels); // destroy the labels, after letting the contexts know
+ // 8. destroy the ml handle
+ ml_chart_delete(st);
+
rrdset_memory_file_free(st); // remove files of db mode save and map
// ------------------------------------------------------------------------
@@ -282,7 +304,7 @@ static bool rrdset_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused,
}
if (unlikely(st->update_every != ctr->update_every)) {
- rrdset_set_update_every(st, ctr->update_every);
+ rrdset_set_update_every_s(st, ctr->update_every);
ctr->react_action |= RRDSET_REACT_UPDATED;
}
@@ -356,33 +378,17 @@ static void rrdset_react_callback(const DICTIONARY_ITEM *item __maybe_unused, vo
RRDSET *st = rrdset;
RRDHOST *host = st->rrdhost;
- st->last_accessed_time = now_realtime_sec();
+ st->last_accessed_time_s = now_realtime_sec();
- if(host->health_enabled && (ctr->react_action & (RRDSET_REACT_NEW | RRDSET_REACT_CHART_ACTIVATED))) {
+ if(host->health.health_enabled && (ctr->react_action & (RRDSET_REACT_NEW | RRDSET_REACT_CHART_ACTIVATED))) {
rrdset_flag_set(st, RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION);
rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION);
}
if(ctr->react_action & (RRDSET_REACT_NEW | RRDSET_REACT_PLUGIN_UPDATED | RRDSET_REACT_MODULE_UPDATED)) {
if (ctr->react_action & RRDSET_REACT_NEW) {
- if(unlikely(rrdcontext_find_chart_uuid(st, &st->chart_uuid))) {
+ if(unlikely(rrdcontext_find_chart_uuid(st, &st->chart_uuid)))
uuid_generate(st->chart_uuid);
- bool found_in_sql = false; (void)found_in_sql;
-
-// bool found_in_sql = true;
-// if(unlikely(sql_find_chart_uuid(host, st, &st->chart_uuid))) {
-// uuid_generate(st->chart_uuid);
-// found_in_sql = false;
-// }
-
-#ifdef NETDATA_INTERNAL_CHECKS
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower(st->chart_uuid, uuid_str);
- error_report("Chart UUID for host %s chart [%s] not found in context. It is now set to %s (%s)",
- string2str(host->hostname),
- string2str(st->name), uuid_str, found_in_sql ? "found in sqlite" : "newly generated");
-#endif
- }
}
rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
@@ -393,7 +399,8 @@ static void rrdset_react_callback(const DICTIONARY_ITEM *item __maybe_unused, vo
void rrdset_index_init(RRDHOST *host) {
if(!host->rrdset_root_index) {
- host->rrdset_root_index = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ host->rrdset_root_index = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ &dictionary_stats_category_rrdset_rrddim, sizeof(RRDSET));
dictionary_register_insert_callback(host->rrdset_root_index, rrdset_insert_callback, NULL);
dictionary_register_conflict_callback(host->rrdset_root_index, rrdset_conflict_callback, NULL);
@@ -402,8 +409,9 @@ void rrdset_index_init(RRDHOST *host) {
}
if(!host->rrdset_root_index_name) {
- host->rrdset_root_index_name = dictionary_create(
- DICT_OPTION_NAME_LINK_DONT_CLONE | DICT_OPTION_VALUE_LINK_DONT_CLONE | DICT_OPTION_DONT_OVERWRITE_VALUE);
+ host->rrdset_root_index_name = dictionary_create_advanced(
+ DICT_OPTION_NAME_LINK_DONT_CLONE | DICT_OPTION_VALUE_LINK_DONT_CLONE | DICT_OPTION_DONT_OVERWRITE_VALUE,
+ &dictionary_stats_category_rrdset_rrddim, 0);
dictionary_register_insert_callback(host->rrdset_root_index_name, rrdset_name_insert_callback, host);
dictionary_register_delete_callback(host->rrdset_root_index_name, rrdset_name_delete_callback, host);
@@ -431,6 +439,8 @@ static inline void rrdset_index_del(RRDHOST *host, RRDSET *st) {
static RRDSET *rrdset_index_find(RRDHOST *host, const char *id) {
// TODO - the name index should have an acquired dictionary item, not just a pointer to RRDSET
+ if (unlikely(!host->rrdset_root_index))
+ return NULL;
return dictionary_get(host->rrdset_root_index, id);
}
@@ -442,7 +452,7 @@ inline RRDSET *rrdset_find(RRDHOST *host, const char *id) {
RRDSET *st = rrdset_index_find(host, id);
if(st)
- st->last_accessed_time = now_realtime_sec();
+ st->last_accessed_time_s = now_realtime_sec();
return(st);
}
@@ -521,51 +531,106 @@ int rrdset_reset_name(RRDSET *st, const char *name) {
}
// get the timestamp of the last entry in the round-robin database
-time_t rrdset_last_entry_t(RRDSET *st) {
+time_t rrdset_last_entry_s(RRDSET *st) {
RRDDIM *rd;
- time_t last_entry_t = 0;
+ time_t last_entry_s = 0;
rrddim_foreach_read(rd, st) {
- time_t t = rrddim_last_entry_t(rd);
- if(t > last_entry_t) last_entry_t = t;
+ time_t t = rrddim_last_entry_s(rd);
+ if(t > last_entry_s) last_entry_s = t;
}
rrddim_foreach_done(rd);
- return last_entry_t;
+ return last_entry_s;
+}
+
+time_t rrdset_last_entry_s_of_tier(RRDSET *st, size_t tier) {
+ RRDDIM *rd;
+ time_t last_entry_s = 0;
+
+ rrddim_foreach_read(rd, st) {
+ time_t t = rrddim_last_entry_s_of_tier(rd, tier);
+ if(t > last_entry_s) last_entry_s = t;
+ }
+ rrddim_foreach_done(rd);
+
+ return last_entry_s;
}
// get the timestamp of first entry in the round-robin database
-time_t rrdset_first_entry_t(RRDSET *st) {
+time_t rrdset_first_entry_s(RRDSET *st) {
RRDDIM *rd;
- time_t first_entry_t = LONG_MAX;
+ time_t first_entry_s = LONG_MAX;
rrddim_foreach_read(rd, st) {
- time_t t = rrddim_first_entry_t(rd);
- if(t < first_entry_t)
- first_entry_t = t;
+ time_t t = rrddim_first_entry_s(rd);
+ if(t < first_entry_s)
+ first_entry_s = t;
}
rrddim_foreach_done(rd);
- if (unlikely(LONG_MAX == first_entry_t)) return 0;
- return first_entry_t;
+ if (unlikely(LONG_MAX == first_entry_s)) return 0;
+ return first_entry_s;
}
-time_t rrdset_first_entry_t_of_tier(RRDSET *st, size_t tier) {
+time_t rrdset_first_entry_s_of_tier(RRDSET *st, size_t tier) {
if(unlikely(tier > storage_tiers))
return 0;
RRDDIM *rd;
- time_t first_entry_t = LONG_MAX;
+ time_t first_entry_s = LONG_MAX;
rrddim_foreach_read(rd, st) {
- time_t t = rrddim_first_entry_t_of_tier(rd, tier);
- if(t && t < first_entry_t)
- first_entry_t = t;
+ time_t t = rrddim_first_entry_s_of_tier(rd, tier);
+ if(t && t < first_entry_s)
+ first_entry_s = t;
}
rrddim_foreach_done(rd);
- if (unlikely(LONG_MAX == first_entry_t)) return 0;
- return first_entry_t;
+ if (unlikely(LONG_MAX == first_entry_s)) return 0;
+ return first_entry_s;
+}
+
+void rrdset_get_retention_of_tier_for_collected_chart(RRDSET *st, time_t *first_time_s, time_t *last_time_s, time_t now_s, size_t tier) {
+ if(!now_s)
+ now_s = now_realtime_sec();
+
+ time_t db_first_entry_s = rrdset_first_entry_s_of_tier(st, tier);
+ time_t db_last_entry_s = st->last_updated.tv_sec; // we assume this is a collected RRDSET
+
+ if(unlikely(!db_last_entry_s)) {
+ db_last_entry_s = rrdset_last_entry_s_of_tier(st, tier);
+
+ if (unlikely(!db_last_entry_s)) {
+ // we assume this is a collected RRDSET
+ db_first_entry_s = 0;
+ db_last_entry_s = 0;
+ }
+ }
+
+ if(unlikely(db_last_entry_s > now_s)) {
+ internal_error(db_last_entry_s > now_s + 1,
+ "RRDSET: 'host:%s/chart:%s' latest db time %ld is in the future, adjusting it to now %ld",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ db_last_entry_s, now_s);
+ db_last_entry_s = now_s;
+ }
+
+ if(unlikely(db_first_entry_s && db_last_entry_s && db_first_entry_s >= db_last_entry_s)) {
+ internal_error(db_first_entry_s > db_last_entry_s,
+ "RRDSET: 'host:%s/chart:%s' oldest db time %ld is bigger than latest db time %ld, adjusting it to (latest time %ld - update every %ld)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ db_first_entry_s, db_last_entry_s,
+ db_last_entry_s, (time_t)st->update_every);
+ db_first_entry_s = db_last_entry_s - st->update_every;
+ }
+
+ if(unlikely(!db_first_entry_s && db_last_entry_s))
+ // this can be the case on the first data collection of a chart
+ db_first_entry_s = db_last_entry_s - st->update_every;
+
+ *first_time_s = db_first_entry_s;
+ *last_time_s = db_last_entry_s;
}
inline void rrdset_is_obsolete(RRDSET *st) {
@@ -578,7 +643,7 @@ inline void rrdset_is_obsolete(RRDSET *st) {
rrdset_flag_set(st, RRDSET_FLAG_OBSOLETE);
rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS);
- st->last_accessed_time = now_realtime_sec();
+ st->last_accessed_time_s = now_realtime_sec();
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
@@ -592,7 +657,7 @@ inline void rrdset_is_obsolete(RRDSET *st) {
inline void rrdset_isnot_obsolete(RRDSET *st) {
if(unlikely((rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))) {
rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE);
- st->last_accessed_time = now_realtime_sec();
+ st->last_accessed_time_s = now_realtime_sec();
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
@@ -673,8 +738,8 @@ void rrdset_reset(RRDSET *st) {
if(!rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) {
for(size_t tier = 0; tier < storage_tiers ;tier++) {
- if(rd->tiers[tier])
- rd->tiers[tier]->collect_ops->flush(rd->tiers[tier]->db_collection_handle);
+ if(rd->tiers[tier].db_collection_handle)
+ rd->tiers[tier].collect_ops->flush(rd->tiers[tier].db_collection_handle);
}
}
}
@@ -768,7 +833,8 @@ void rrdset_delete_files(RRDSET *st) {
}
rrddim_foreach_done(rd);
- recursively_delete_dir(st->cache_dir, "left-over chart");
+ if(st->cache_dir)
+ recursively_delete_dir(st->cache_dir, "left-over chart");
}
void rrdset_delete_obsolete_dimensions(RRDSET *st) {
@@ -809,7 +875,7 @@ RRDSET *rrdset_create_custom(
, long history_entries
) {
if (host != localhost)
- host->senders_last_chart_command = now_realtime_sec();
+ host->child_last_chart_command = now_realtime_sec();
if(!type || !type[0])
fatal("Cannot create rrd stats without a type: id '%s', name '%s', family '%s', context '%s', title '%s', units '%s', plugin '%s', module '%s'."
@@ -920,15 +986,8 @@ void rrdset_timed_next(RRDSET *st, struct timeval now, usec_t duration_since_las
);
#endif
- st->last_collected_time.tv_sec = now.tv_sec - st->update_every;
- st->last_collected_time.tv_usec = now.tv_usec;
- last_collected_time_align(st);
+ duration_since_last_update = 0;
- st->last_updated.tv_sec = now.tv_sec - st->update_every;
- st->last_updated.tv_usec = now.tv_usec;
- last_updated_time_align(st);
-
- duration_since_last_update = st->update_every * USEC_PER_SEC;
#ifdef NETDATA_INTERNAL_CHECKS
if(!discard_reason) discard_reason = "COLLECTION TIME IN FUTURE";
#endif
@@ -941,6 +1000,7 @@ void rrdset_timed_next(RRDSET *st, struct timeval now, usec_t duration_since_las
#endif
duration_since_last_update = (usec_t)since_last_usec;
+
#ifdef NETDATA_INTERNAL_CHECKS
if(!discard_reason) discard_reason = "COLLECTION TIME TOO FAR IN THE PAST";
#endif
@@ -949,16 +1009,16 @@ void rrdset_timed_next(RRDSET *st, struct timeval now, usec_t duration_since_las
#ifdef NETDATA_INTERNAL_CHECKS
if(since_last_usec > 0 && (susec_t) duration_since_last_update < since_last_usec) {
static __thread susec_t min_delta = USEC_PER_SEC * 3600, permanent_min_delta = 0;
- static __thread time_t last_t = 0;
+ static __thread time_t last_time_s = 0;
// the first time initialize it so that it will make the check later
- if(last_t == 0) last_t = now.tv_sec + 60;
+ if(last_time_s == 0) last_time_s = now.tv_sec + 60;
susec_t delta = since_last_usec - (susec_t) duration_since_last_update;
if(delta < min_delta) min_delta = delta;
- if(now.tv_sec >= last_t + 60) {
- last_t = now.tv_sec;
+ if(now.tv_sec >= last_time_s + 60) {
+ last_time_s = now.tv_sec;
if(min_delta > permanent_min_delta) {
info("MINIMUM MICROSECONDS DELTA of thread %d increased from %lld to %lld (+%lld)", gettid(), permanent_min_delta, min_delta, min_delta - permanent_min_delta);
@@ -1029,7 +1089,7 @@ static inline usec_t rrdset_update_last_collected_time(RRDSET *st) {
return last_collect_ut;
}
-static inline usec_t rrdset_init_last_updated_time(RRDSET *st) {
+static inline void rrdset_init_last_updated_time(RRDSET *st) {
// copy the last collected time to last updated time
st->last_updated.tv_sec = st->last_collected_time.tv_sec;
st->last_updated.tv_usec = st->last_collected_time.tv_usec;
@@ -1038,31 +1098,27 @@ static inline usec_t rrdset_init_last_updated_time(RRDSET *st) {
st->last_updated.tv_sec -= st->update_every;
last_updated_time_align(st);
-
- usec_t last_updated_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec;
-
- rrdset_debug(st, "initialized last updated time to %0.3" NETDATA_DOUBLE_MODIFIER, (NETDATA_DOUBLE)last_updated_ut / USEC_PER_SEC);
-
- return last_updated_ut;
}
static __thread size_t rrdset_done_statistics_points_stored_per_tier[RRD_STORAGE_TIERS];
-static inline time_t tier_next_point_time(RRDDIM *rd, struct rrddim_tier *t, time_t now) {
+static inline time_t tier_next_point_time_s(RRDDIM *rd, struct rrddim_tier *t, time_t now_s) {
time_t loop = (time_t)rd->update_every * (time_t)t->tier_grouping;
- return now + loop - ((now + loop) % loop);
+ return now_s + loop - ((now_s + loop) % loop);
}
void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut __maybe_unused) {
- if (unlikely(!t->next_point_time))
- t->next_point_time = tier_next_point_time(rd, t, sp.end_time);
+ if (unlikely(!t->next_point_end_time_s))
+ t->next_point_end_time_s = tier_next_point_time_s(rd, t, sp.end_time_s);
+
+ if(unlikely(sp.start_time_s >= t->next_point_end_time_s)) {
+ // flush the virtual point, it is done
- if(unlikely(sp.start_time > t->next_point_time)) {
if (likely(!storage_point_is_unset(t->virtual_point))) {
t->collect_ops->store_metric(
t->db_collection_handle,
- t->next_point_time * USEC_PER_SEC,
+ t->next_point_end_time_s * USEC_PER_SEC,
t->virtual_point.sum,
t->virtual_point.min,
t->virtual_point.max,
@@ -1073,7 +1129,7 @@ void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAG
else {
t->collect_ops->store_metric(
t->db_collection_handle,
- t->next_point_time * USEC_PER_SEC,
+ t->next_point_end_time_s * USEC_PER_SEC,
NAN,
NAN,
NAN,
@@ -1083,18 +1139,18 @@ void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAG
rrdset_done_statistics_points_stored_per_tier[tier]++;
t->virtual_point.count = 0; // make the point unset
- t->next_point_time = tier_next_point_time(rd, t, sp.end_time);
+ t->next_point_end_time_s = tier_next_point_time_s(rd, t, sp.end_time_s);
}
// merge the dates into our virtual point
- if (unlikely(sp.start_time < t->virtual_point.start_time))
- t->virtual_point.start_time = sp.start_time;
+ if (unlikely(sp.start_time_s < t->virtual_point.start_time_s))
+ t->virtual_point.start_time_s = sp.start_time_s;
- if (likely(sp.end_time > t->virtual_point.end_time))
- t->virtual_point.end_time = sp.end_time;
+ if (likely(sp.end_time_s > t->virtual_point.end_time_s))
+ t->virtual_point.end_time_s = sp.end_time_s;
// merge the values into our virtual point
- if (likely(!storage_point_is_empty(sp))) {
+ if (likely(!storage_point_is_gap(sp))) {
// we aggregate only non NULLs into higher tiers
if (likely(!storage_point_is_unset(t->virtual_point))) {
@@ -1143,14 +1199,14 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n,
#endif // NETDATA_LOG_COLLECTION_ERRORS
// store the metric on tier 0
- rd->tiers[0]->collect_ops->store_metric(rd->tiers[0]->db_collection_handle, point_end_time_ut, n, 0, 0, 1, 0, flags);
+ rd->tiers[0].collect_ops->store_metric(rd->tiers[0].db_collection_handle, point_end_time_ut, n, 0, 0, 1, 0, flags);
rrdset_done_statistics_points_stored_per_tier[0]++;
- time_t now = (time_t)(point_end_time_ut / USEC_PER_SEC);
+ time_t now_s = (time_t)(point_end_time_ut / USEC_PER_SEC);
STORAGE_POINT sp = {
- .start_time = now - rd->update_every,
- .end_time = now,
+ .start_time_s = now_s - rd->update_every,
+ .end_time_s = now_s,
.min = n,
.max = n,
.sum = n,
@@ -1160,14 +1216,14 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n,
};
for(size_t tier = 1; tier < storage_tiers ;tier++) {
- if(unlikely(!rd->tiers[tier])) continue;
+ if(unlikely(!rd->tiers[tier].db_metric_handle)) continue;
- struct rrddim_tier *t = rd->tiers[tier];
+ struct rrddim_tier *t = &rd->tiers[tier];
if(!rrddim_option_check(rd, RRDDIM_OPTION_BACKFILLED_HIGH_TIERS)) {
// we have not collected this tier before
// let's fill any gap that may exist
- rrdr_fill_tier_gap_from_smaller_tiers(rd, tier, now);
+ rrdr_fill_tier_gap_from_smaller_tiers(rd, tier, now_s);
rrddim_option_set(rd, RRDDIM_OPTION_BACKFILLED_HIGH_TIERS);
}
@@ -1188,12 +1244,16 @@ struct rda_item {
static __thread struct rda_item *thread_rda = NULL;
static __thread size_t thread_rda_entries = 0;
-struct rda_item *rrdset_thread_rda(size_t *dimensions) {
+struct rda_item *rrdset_thread_rda_get(size_t *dimensions) {
if(unlikely(!thread_rda || (*dimensions) > thread_rda_entries)) {
+ size_t old_mem = thread_rda_entries * sizeof(struct rda_item);
freez(thread_rda);
- thread_rda = mallocz((*dimensions) * sizeof(struct rda_item));
thread_rda_entries = *dimensions;
+ size_t new_mem = thread_rda_entries * sizeof(struct rda_item);
+ thread_rda = mallocz(new_mem);
+
+ __atomic_add_fetch(&netdata_buffers_statistics.rrdset_done_rda_size, new_mem - old_mem, __ATOMIC_RELAXED);
}
*dimensions = thread_rda_entries;
@@ -1201,6 +1261,8 @@ struct rda_item *rrdset_thread_rda(size_t *dimensions) {
}
void rrdset_thread_rda_free(void) {
+ __atomic_sub_fetch(&netdata_buffers_statistics.rrdset_done_rda_size, thread_rda_entries * sizeof(struct rda_item), __ATOMIC_RELAXED);
+
freez(thread_rda);
thread_rda = NULL;
thread_rda_entries = 0;
@@ -1253,6 +1315,8 @@ static inline size_t rrdset_done_interpolate(
last_ut = next_store_ut;
+ ml_chart_update_begin(st);
+
struct rda_item *rda;
size_t dim_id;
for(dim_id = 0, rda = rda_base ; dim_id < rda_slots ; ++dim_id, ++rda) {
@@ -1332,17 +1396,20 @@ static inline size_t rrdset_done_interpolate(
break;
}
+ time_t current_time_s = (time_t) (next_store_ut / USEC_PER_SEC);
+
if(unlikely(!store_this_entry)) {
- (void) ml_is_anomalous(rd, 0, false);
+ (void) ml_is_anomalous(rd, current_time_s, 0, false);
+
rrddim_store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE);
rrdcontext_collected_rrddim(rd);
continue;
}
- if(likely(rd->updated && rd->collections_counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
+ if(likely(rd->updated && rd->collections_counter > 1 && iterations < gap_when_lost_iterations_above)) {
uint32_t dim_storage_flags = storage_flags;
- if (ml_is_anomalous(rd, new_value, true)) {
+ if (ml_is_anomalous(rd, current_time_s, new_value, true)) {
// clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
dim_storage_flags &= ~((storage_number)SN_FLAG_NOT_ANOMALOUS);
}
@@ -1352,7 +1419,7 @@ static inline size_t rrdset_done_interpolate(
rd->last_stored_value = new_value;
}
else {
- (void) ml_is_anomalous(rd, 0, false);
+ (void) ml_is_anomalous(rd, current_time_s, 0, false);
rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING ", rrddim_name(rd), current_entry);
@@ -1364,6 +1431,8 @@ static inline size_t rrdset_done_interpolate(
stored_entries++;
}
+ ml_chart_update_end(st);
+
// reset the storage flags for the next point, if any;
storage_flags = SN_DEFAULT_FLAGS;
@@ -1389,36 +1458,6 @@ static inline size_t rrdset_done_interpolate(
return stored_entries;
}
-static inline void rrdset_done_fill_the_gap(RRDSET *st) {
- usec_t update_every_ut = st->update_every * USEC_PER_SEC;
- usec_t now_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
-
- long c = 0, entries = st->entries;
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- usec_t next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC;
- long current_entry = st->current_entry;
-
- for(c = 0; c < entries && next_store_ut <= now_collect_ut ; next_store_ut += update_every_ut, c++) {
- rd->db[current_entry] = pack_storage_number(NAN, SN_FLAG_NONE);
- current_entry = ((current_entry + 1) >= entries) ? 0 : current_entry + 1;
-
- rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING (FILLED THE GAP)", rrddim_name(rd), current_entry);
- }
- }
- rrddim_foreach_done(rd);
-
- if(c > 0) {
- c--;
- st->last_updated.tv_sec += c * st->update_every;
-
- st->current_entry += c;
- st->counter += c;
- if(st->current_entry >= st->entries)
- st->current_entry -= st->entries;
- }
-}
-
void rrdset_done(RRDSET *st) {
struct timeval now;
@@ -1427,10 +1466,12 @@ void rrdset_done(RRDSET *st) {
}
void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) {
- if(unlikely(netdata_exit)) return;
+ if(unlikely(!service_running(SERVICE_COLLECTORS))) return;
+
+ netdata_spinlock_lock(&st->data_collection_lock);
if (pending_rrdset_next)
- rrdset_next(st);
+ rrdset_timed_next(st, now, 0ULL);
debug(D_RRD_CALLS, "rrdset_done() for chart '%s'", rrdset_name(st));
@@ -1447,9 +1488,13 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
next_store_ut = 0, // the timestamp in microseconds, of the next entry to store in the db
update_every_ut = st->update_every * USEC_PER_SEC; // st->update_every in microseconds
+ RRDSET_FLAGS rrdset_flags = rrdset_flag_check(st, ~0);
+ if(unlikely(rrdset_flags & RRDSET_FLAG_COLLECTION_FINISHED))
+ return;
+
netdata_thread_disable_cancelability();
- if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) {
+ if (unlikely(rrdset_flags & RRDSET_FLAG_OBSOLETE)) {
error("Chart '%s' has the OBSOLETE flag set, but it is collected.", rrdset_id(st));
rrdset_isnot_obsolete(st);
}
@@ -1519,29 +1564,6 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
first_entry = 1;
}
-#ifdef ENABLE_DBENGINE
- // check if we will re-write the entire page
- if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE &&
- dt_usec(&st->last_collected_time, &st->last_updated) > (RRDENG_BLOCK_SIZE / sizeof(storage_number)) * update_every_ut)) {
- info(
- "'%s': too old data (last updated at %" PRId64 ".%" PRId64 ", last collected at %" PRId64 ".%" PRId64 "). "
- "Resetting it. Will not store the next entry.",
- rrdset_id(st),
- (int64_t)st->last_updated.tv_sec,
- (int64_t)st->last_updated.tv_usec,
- (int64_t)st->last_collected_time.tv_sec,
- (int64_t)st->last_collected_time.tv_usec);
- rrdset_reset(st);
- rrdset_init_last_updated_time(st);
-
- st->usec_since_last_update = update_every_ut;
-
- // the first entry should not be stored
- store_this_entry = 0;
- first_entry = 1;
- }
-#endif
-
// these are the 3 variables that will help us in interpolation
// last_stored_ut = the last time we added a value to the storage
// now_collect_ut = the time the current value has been collected
@@ -1551,23 +1573,13 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC;
if(unlikely(!st->counter_done)) {
- // if we have not collected metrics this session (st->counter_done == 0)
- // and we have collected metrics for this chart in the past (st->counter != 0)
- // fill the gap (the chart has been just loaded from disk)
- if(unlikely(st->counter) && st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) {
- // TODO this should be inside the storage engine
- rrdset_done_fill_the_gap(st);
- last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec;
- next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC;
- }
- if (st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
- // set a fake last_updated to jump to current time
- rrdset_init_last_updated_time(st);
- last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec;
- next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC;
- }
+ // set a fake last_updated to jump to current time
+ rrdset_init_last_updated_time(st);
+
+ last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec;
+ next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC;
- if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST))) {
+ if(unlikely(rrdset_flags & RRDSET_FLAG_STORE_FIRST)) {
store_this_entry = 1;
last_collect_ut = next_store_ut - update_every_ut;
@@ -1589,7 +1601,7 @@ after_first_database_work:
uint32_t has_reset_value = 0;
size_t rda_slots = dictionary_entries(st->rrddim_root_index);
- struct rda_item *rda_base = rrdset_thread_rda(&rda_slots);
+ struct rda_item *rda_base = rrdset_thread_rda_get(&rda_slots);
size_t dim_id;
size_t dimensions = 0;
@@ -1915,6 +1927,8 @@ after_second_database_work:
);
}
+ netdata_spinlock_unlock(&st->data_collection_lock);
+
// ALL DONE ABOUT THE DATA UPDATE
// --------------------------------------------------------------------
@@ -1946,29 +1960,29 @@ after_second_database_work:
store_metric_collection_completed();
}
-time_t rrdset_set_update_every(RRDSET *st, time_t update_every) {
+time_t rrdset_set_update_every_s(RRDSET *st, time_t update_every_s) {
internal_error(true, "RRDSET '%s' switching update every from %d to %d",
- rrdset_id(st), (int)st->update_every, (int)update_every);
+ rrdset_id(st), (int)st->update_every, (int)update_every_s);
- time_t prev_update_every = st->update_every;
- st->update_every = update_every;
+ time_t prev_update_every_s = st->update_every;
+ st->update_every = update_every_s;
// switch update every to the storage engine
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
for (size_t tier = 0; tier < storage_tiers; tier++) {
- if (rd->tiers[tier] && rd->tiers[tier]->db_collection_handle)
- rd->tiers[tier]->collect_ops->change_collection_frequency(rd->tiers[tier]->db_collection_handle, (int)(st->rrdhost->db[tier].tier_grouping * st->update_every));
+ if (rd->tiers[tier].db_collection_handle)
+ rd->tiers[tier].collect_ops->change_collection_frequency(rd->tiers[tier].db_collection_handle, (int)(st->rrdhost->db[tier].tier_grouping * st->update_every));
}
- assert(rd->update_every == prev_update_every &&
+ assert(rd->update_every == prev_update_every_s &&
"chart's update every differs from the update every of its dimensions");
rd->update_every = st->update_every;
}
rrddim_foreach_done(rd);
- return prev_update_every;
+ return prev_update_every_s;
}
// ----------------------------------------------------------------------------
@@ -2016,8 +2030,8 @@ struct rrdset_map_save_v019 {
size_t counter; // NEEDS TO BE UPDATED - maintained on load
size_t counter_done; // ignored
union { //
- time_t last_accessed_time; // ignored
- time_t last_entry_t; // ignored
+ time_t last_accessed_time_s; // ignored
+ time_t last_entry_s; // ignored
}; //
time_t upstream_resync_time; // ignored
void *plugin_name; // ignored
@@ -2064,6 +2078,13 @@ const char *rrdset_cache_filename(RRDSET *st) {
return st_on_file->cache_filename;
}
+const char *rrdset_cache_dir(RRDSET *st) {
+ if(!st->cache_dir)
+ st->cache_dir = rrdhost_cache_dir_for_rrdset_alloc(st->rrdhost, rrdset_id(st));
+
+ return st->cache_dir;
+}
+
void rrdset_memory_file_free(RRDSET *st) {
if(!st->st_on_file) return;
@@ -2071,6 +2092,7 @@ void rrdset_memory_file_free(RRDSET *st) {
rrdset_memory_file_update(st);
struct rrdset_map_save_v019 *st_on_file = st->st_on_file;
+ __atomic_sub_fetch(&rrddim_db_memory_size, st_on_file->memsize, __ATOMIC_RELAXED);
netdata_munmap(st_on_file, st_on_file->memsize);
// remove the pointers from the RRDDIM
@@ -2093,17 +2115,15 @@ bool rrdset_memory_load_or_create_map_save(RRDSET *st, RRD_MEMORY_MODE memory_mo
return false;
char fullfilename[FILENAME_MAX + 1];
- snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", st->cache_dir);
+ snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", rrdset_cache_dir(st));
unsigned long size = sizeof(struct rrdset_map_save_v019);
struct rrdset_map_save_v019 *st_on_file = (struct rrdset_map_save_v019 *)netdata_mmap(
- fullfilename, size,
- ((memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE),
- 0);
+ fullfilename, size, ((memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE), 0, false, NULL);
if(!st_on_file) return false;
- time_t now = now_realtime_sec();
+ time_t now_s = now_realtime_sec();
st_on_file->magic[sizeof(RRDSET_MAGIC_V019)] = '\0';
if(strcmp(st_on_file->magic, RRDSET_MAGIC_V019) != 0) {
@@ -2122,13 +2142,13 @@ bool rrdset_memory_load_or_create_map_save(RRDSET *st, RRD_MEMORY_MODE memory_mo
error("File '%s' does not have the desired granularity. Clearing it.", fullfilename);
memset(st_on_file, 0, size);
}
- else if((now - st_on_file->last_updated.tv_sec) > st->update_every * st->entries) {
+ else if((now_s - st_on_file->last_updated.tv_sec) > st->update_every * st->entries) {
info("File '%s' is too old. Clearing it.", fullfilename);
memset(st_on_file, 0, size);
}
- else if(st_on_file->last_updated.tv_sec > now + st->update_every) {
- error("File '%s' refers to the future by %zd secs. Resetting it to now.", fullfilename, (ssize_t)(st_on_file->last_updated.tv_sec - now));
- st_on_file->last_updated.tv_sec = now;
+ else if(st_on_file->last_updated.tv_sec > now_s + st->update_every) {
+ error("File '%s' refers to the future by %zd secs. Resetting it to now.", fullfilename, (ssize_t)(st_on_file->last_updated.tv_sec - now_s));
+ st_on_file->last_updated.tv_sec = now_s;
}
if(st_on_file->current_entry >= st_on_file->entries)
@@ -2169,5 +2189,6 @@ bool rrdset_memory_load_or_create_map_save(RRDSET *st, RRD_MEMORY_MODE memory_mo
// copy the useful values back to st_on_file
rrdset_memory_file_update(st);
+ __atomic_add_fetch(&rrddim_db_memory_size, st_on_file->memsize, __ATOMIC_RELAXED);
return true;
}