summaryrefslogtreecommitdiffstats
path: root/database/rrdset.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/rrdset.c (renamed from src/rrdset.c)111
1 files changed, 94 insertions, 17 deletions
diff --git a/src/rrdset.c b/database/rrdset.c
index bbd0ae728..3f5ba73b6 100644
--- a/src/rrdset.c
+++ b/database/rrdset.c
@@ -1,5 +1,7 @@
-#define NETDATA_RRD_INTERNALS 1
-#include "common.h"
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#define NETDATA_RRD_INTERNALS
+#include "rrd.h"
void __rrdset_check_rdlock(RRDSET *st, const char *file, const char *function, const unsigned long line) {
debug(D_RRD_CALLS, "Checking read lock on chart '%s'", st->id);
@@ -172,27 +174,30 @@ int rrdset_set_name(RRDSET *st, const char *name) {
if(unlikely(rrdset_index_add_name(host, st) != st))
error("RRDSET: INTERNAL ERROR: attempted to index duplicate chart name '%s'", st->name);
+ rrdset_flag_clear(st, RRDSET_FLAG_BACKEND_SEND);
+ rrdset_flag_clear(st, RRDSET_FLAG_BACKEND_IGNORE);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+
return 1;
}
inline void rrdset_is_obsolete(RRDSET *st) {
- RRDHOST *host = st->rrdhost;
-
if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))) {
rrdset_flag_set(st, RRDSET_FLAG_OBSOLETE);
- rrdset_flag_clear(st, RRDSET_FLAG_EXPOSED_UPSTREAM);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
// the chart will not get more updates (data collection)
// so, we have to push its definition now
- if(unlikely(host->rrdpush_send_enabled))
- rrdset_push_chart_definition(st);
+ rrdset_push_chart_definition_now(st);
}
}
inline void rrdset_isnot_obsolete(RRDSET *st) {
if(unlikely((rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))) {
rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE);
- rrdset_flag_clear(st, RRDSET_FLAG_EXPOSED_UPSTREAM);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
// the chart will be pushed upstream automatically
// due to data collection
@@ -201,6 +206,8 @@ inline void rrdset_isnot_obsolete(RRDSET *st) {
inline void rrdset_update_heterogeneous_flag(RRDSET *st) {
RRDHOST *host = st->rrdhost;
+ (void)host;
+
RRDDIM *rd;
rrdset_flag_clear(st, RRDSET_FLAG_HOMEGENEOUS_CHECK);
@@ -476,13 +483,19 @@ RRDSET *rrdset_create_custom(
snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
RRDSET *st = rrdset_find_on_create(host, fullid);
- if(st) return st;
+ if(st) {
+ rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ return st;
+ }
rrdhost_wrlock(host);
st = rrdset_find_on_create(host, fullid);
if(st) {
rrdhost_unlock(host);
+ rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
return st;
}
@@ -581,8 +594,8 @@ RRDSET *rrdset_create_custom(
memset(st, 0, size);
}
else if(st->last_updated.tv_sec > now + update_every) {
- error("File %s refers to the future. Clearing it.", fullfilename);
- memset(st, 0, size);
+ error("File %s refers to the future by %zd secs. Resetting it to now.", fullfilename, (ssize_t)(st->last_updated.tv_sec - now));
+ st->last_updated.tv_sec = now;
}
// make sure the database is aligned
@@ -603,10 +616,10 @@ RRDSET *rrdset_create_custom(
st->rrd_memory_mode = (memory_mode == RRD_MEMORY_MODE_NONE) ? RRD_MEMORY_MODE_NONE : RRD_MEMORY_MODE_ALLOC;
}
- st->plugin_name = plugin?strdup(plugin):NULL;
- st->module_name = module?strdup(module):NULL;
+ st->plugin_name = plugin?strdupz(plugin):NULL;
+ st->module_name = module?strdupz(module):NULL;
- st->config_section = strdup(config_section);
+ st->config_section = strdupz(config_section);
st->rrdhost = host;
st->memsize = size;
st->entries = entries;
@@ -644,7 +657,12 @@ RRDSET *rrdset_create_custom(
rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
rrdset_flag_clear(st, RRDSET_FLAG_DEBUG);
rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE);
- rrdset_flag_clear(st, RRDSET_FLAG_EXPOSED_UPSTREAM);
+ rrdset_flag_clear(st, RRDSET_FLAG_BACKEND_SEND);
+ rrdset_flag_clear(st, RRDSET_FLAG_BACKEND_IGNORE);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
// if(!strcmp(st->id, "disk_util.dm-0")) {
// st->debug = 1;
@@ -710,7 +728,7 @@ RRDSET *rrdset_create_custom(
// RRDSET - data collection iteration control
inline void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) {
- if(unlikely(!st->last_collected_time.tv_sec || !microseconds)) {
+ if(unlikely(!st->last_collected_time.tv_sec || !microseconds || (rrdset_flag_check_noatomic(st, RRDSET_FLAG_SYNC_CLOCK)))) {
// call the full next_usec() function
rrdset_next_usec(st, microseconds);
return;
@@ -723,13 +741,36 @@ inline void rrdset_next_usec(RRDSET *st, usec_t microseconds) {
struct timeval now;
now_realtime_timeval(&now);
+ #ifdef NETDATA_INTERNAL_CHECKS
+ char *discard_reason = NULL;
+ usec_t discarded = microseconds;
+ #endif
+
+ if(unlikely(rrdset_flag_check_noatomic(st, RRDSET_FLAG_SYNC_CLOCK))) {
+ // the chart needs to be re-synced to current time
+ rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
+
+ // discard the microseconds supplied
+ microseconds = 0;
+
+ #ifdef NETDATA_INTERNAL_CHECKS
+ if(!discard_reason) discard_reason = "SYNC CLOCK FLAG";
+ #endif
+ }
+
if(unlikely(!st->last_collected_time.tv_sec)) {
// the first entry
microseconds = st->update_every * USEC_PER_SEC;
+ #ifdef NETDATA_INTERNAL_CHECKS
+ if(!discard_reason) discard_reason = "FIRST DATA COLLECTION";
+ #endif
}
else if(unlikely(!microseconds)) {
// no dt given by the plugin
microseconds = dt_usec(&now, &st->last_collected_time);
+ #ifdef NETDATA_INTERNAL_CHECKS
+ if(!discard_reason) discard_reason = "NO USEC GIVEN BY COLLECTOR";
+ #endif
}
else {
// microseconds has the time since the last collection
@@ -748,18 +789,52 @@ inline void rrdset_next_usec(RRDSET *st, usec_t microseconds) {
last_updated_time_align(st);
microseconds = st->update_every * USEC_PER_SEC;
+ #ifdef NETDATA_INTERNAL_CHECKS
+ if(!discard_reason) discard_reason = "COLLECTION TIME IN FUTURE";
+ #endif
}
- else if(unlikely((usec_t)since_last_usec > (usec_t)(st->update_every * 10 * USEC_PER_SEC))) {
+ else if(unlikely((usec_t)since_last_usec > (usec_t)(st->update_every * 5 * USEC_PER_SEC))) {
// oops! the database is too far behind
info("RRD database for chart '%s' on host '%s' is %0.5" LONG_DOUBLE_MODIFIER " secs in the past (counter #%zu, update #%zu). Adjusting it to current time.", st->id, st->rrdhost->hostname, (LONG_DOUBLE)since_last_usec / USEC_PER_SEC, st->counter, st->counter_done);
microseconds = (usec_t)since_last_usec;
+ #ifdef NETDATA_INTERNAL_CHECKS
+ if(!discard_reason) discard_reason = "COLLECTION TIME TOO FAR IN THE PAST";
+ #endif
+ }
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(since_last_usec > 0 && (susec_t)microseconds < since_last_usec) {
+ static __thread susec_t min_delta = USEC_PER_SEC * 3600, permanent_min_delta = 0;
+ static __thread time_t last_t = 0;
+
+ // the first time initialize it so that it will make the check later
+ if(last_t == 0) last_t = now.tv_sec + 60;
+
+ susec_t delta = since_last_usec - (susec_t)microseconds;
+ if(delta < min_delta) min_delta = delta;
+
+ if(now.tv_sec >= last_t + 60) {
+ last_t = now.tv_sec;
+
+ if(min_delta > permanent_min_delta) {
+ info("MINIMUM MICROSECONDS DELTA of thread %d increased from %lld to %lld (+%lld)", gettid(), permanent_min_delta, min_delta, min_delta - permanent_min_delta);
+ permanent_min_delta = min_delta;
+ }
+
+ min_delta = USEC_PER_SEC * 3600;
+ }
}
+#endif
}
#ifdef NETDATA_INTERNAL_CHECKS
debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds);
rrdset_debug(st, "NEXT: %llu microseconds", microseconds);
+
+ if(discarded && discarded != microseconds)
+ info("host '%s', chart '%s': discarded data collection time of %llu usec, replaced with %llu usec, reason: '%s'", st->rrdhost->hostname, st->id, discarded, microseconds, discard_reason?discard_reason:"UNDEFINED");
+
#endif
st->usec_since_last_update = microseconds;
@@ -857,6 +932,8 @@ static inline size_t rrdset_done_interpolate(
size_t stored_entries = 0; // the number of entries we have stored in the db, during this call to rrdset_done()
usec_t first_ut = last_stored_ut, last_ut = 0;
+ (void)first_ut;
+
ssize_t iterations = (ssize_t)((now_collect_ut - last_stored_ut) / (update_every_ut));
if((now_collect_ut % (update_every_ut)) == 0) iterations++;