summaryrefslogtreecommitdiffstats
path: root/database/rrdset.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/rrdset.c')
-rw-r--r--database/rrdset.c933
1 files changed, 493 insertions, 440 deletions
diff --git a/database/rrdset.c b/database/rrdset.c
index e7cb89df..9693ee21 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -80,17 +80,9 @@ static inline RRDSET *rrdset_index_find_name(RRDHOST *host, const char *name, ui
tmp.name = name;
tmp.hash_name = (hash)?hash:simple_hash(tmp.name);
- // fprintf(stderr, "SEARCHING: %s\n", name);
result = avl_search_lock(&host->rrdset_root_index_name, (avl_t *) (&(tmp.avlname)));
- if(result) {
- RRDSET *st = rrdset_from_avlname(result);
- if(strcmp(st->magic, RRDSET_MAGIC) != 0)
- error("Search for RRDSET %s returned an invalid RRDSET %s (name %s)", name, st->id, st->name);
+ if(result) return rrdset_from_avlname(result);
- // fprintf(stderr, "FOUND: %s\n", name);
- return rrdset_from_avlname(result);
- }
- // fprintf(stderr, "NOT FOUND: %s\n", name);
return NULL;
}
@@ -194,6 +186,7 @@ int rrdset_set_name(RRDSET *st, const char *name) {
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdcontext_updated_rrdset_name(st);
return 2;
}
@@ -212,6 +205,7 @@ inline void rrdset_is_obsolete(RRDSET *st) {
// the chart will not get more updates (data collection)
// so, we have to push its definition now
rrdset_push_chart_definition_now(st);
+ rrdcontext_updated_rrdset_flags(st);
}
}
@@ -224,6 +218,7 @@ inline void rrdset_isnot_obsolete(RRDSET *st) {
// the chart will be pushed upstream automatically
// due to data collection
+ rrdcontext_updated_rrdset_flags(st);
}
}
@@ -259,6 +254,7 @@ inline void rrdset_update_heterogeneous_flag(RRDSET *st) {
}
rrdset_flag_clear(st, RRDSET_FLAG_HETEROGENEOUS);
+ rrdcontext_updated_rrdset_flags(st);
}
// ----------------------------------------------------------------------------
@@ -281,12 +277,13 @@ void rrdset_reset(RRDSET *st) {
rd->last_collected_time.tv_sec = 0;
rd->last_collected_time.tv_usec = 0;
rd->collections_counter = 0;
- // memset(rd->values, 0, rd->entries * sizeof(storage_number));
-#ifdef ENABLE_DBENGINE
- if (RRD_MEMORY_MODE_DBENGINE == st->rrd_memory_mode && !rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) {
- rrdeng_store_metric_flush_current_page(rd);
+
+ if(!rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) {
+ for(int tier = 0; tier < storage_tiers ;tier++) {
+ if(rd->tiers[tier])
+ rd->tiers[tier]->collect_ops.flush(rd->tiers[tier]->db_collection_handle);
+ }
}
-#endif
}
}
@@ -294,20 +291,27 @@ void rrdset_reset(RRDSET *st) {
// RRDSET - helpers for rrdset_create()
inline long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries) {
- if(unlikely(entries < 5)) entries = 5;
- if(unlikely(entries > RRD_HISTORY_ENTRIES_MAX)) entries = RRD_HISTORY_ENTRIES_MAX;
+ if(mode == RRD_MEMORY_MODE_DBENGINE) return 0;
+ if(mode == RRD_MEMORY_MODE_NONE) return 5;
- if(unlikely(mode == RRD_MEMORY_MODE_NONE || mode == RRD_MEMORY_MODE_ALLOC))
- return entries;
+ if(entries < 5) entries = 5;
+ if(entries > RRD_HISTORY_ENTRIES_MAX) entries = RRD_HISTORY_ENTRIES_MAX;
- long page = (size_t)sysconf(_SC_PAGESIZE);
- long size = sizeof(RRDDIM) + entries * sizeof(storage_number);
- if(unlikely(size % page)) {
- size -= (size % page);
- size += page;
+ if(mode == RRD_MEMORY_MODE_MAP || mode == RRD_MEMORY_MODE_SAVE || mode == RRD_MEMORY_MODE_RAM) {
+ long header_size = 0;
- long n = (size - sizeof(RRDDIM)) / sizeof(storage_number);
- return n;
+ if(mode == RRD_MEMORY_MODE_MAP || mode == RRD_MEMORY_MODE_SAVE)
+ header_size = (long)rrddim_memory_file_header_size();
+
+ long page = (long)sysconf(_SC_PAGESIZE);
+ long size = (long)(header_size + entries * sizeof(storage_number));
+ if (unlikely(size % page)) {
+ size -= (size % page);
+ size += page;
+
+ long n = (long)((size - header_size) / sizeof(storage_number));
+ return n;
+ }
}
return entries;
@@ -383,11 +387,13 @@ void rrdset_free(RRDSET *st) {
rrdset_unlock(st);
+ // this has to be after the dimensions are freed
+ rrdcontext_removed_rrdset(st);
+
// ------------------------------------------------------------------------
// free it
netdata_rwlock_destroy(&st->rrdset_rwlock);
- netdata_rwlock_destroy(&st->state->labels.labels_rwlock);
// free directly allocated members
freez((void *)st->name);
@@ -402,77 +408,51 @@ void rrdset_free(RRDSET *st) {
freez(st->state->old_title);
freez(st->state->old_units);
freez(st->state->old_context);
- free_label_list(st->state->labels.head);
+ rrdlabels_destroy(st->state->chart_labels);
freez(st->state);
freez(st->chart_uuid);
- switch(st->rrd_memory_mode) {
- case RRD_MEMORY_MODE_SAVE:
- case RRD_MEMORY_MODE_MAP:
- case RRD_MEMORY_MODE_RAM:
- debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
- munmap(st, st->memsize);
- break;
-
- case RRD_MEMORY_MODE_ALLOC:
- case RRD_MEMORY_MODE_NONE:
- case RRD_MEMORY_MODE_DBENGINE:
- freez(st);
- break;
- }
-
+ rrdset_memory_file_free(st);
+ freez(st);
}
void rrdset_save(RRDSET *st) {
rrdset_check_rdlock(st);
- // info("Saving chart '%s' ('%s')", st->id, st->name);
-
- if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
- debug(D_RRD_STATS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
- memory_file_save(st->cache_filename, st, st->memsize);
- }
+ rrdset_memory_file_save(st);
RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE)) {
- debug(D_RRD_STATS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
- memory_file_save(rd->cache_filename, rd, rd->memsize);
- }
- }
+ rrddim_foreach_read(rd, st)
+ rrddim_memory_file_save(rd);
}
-void rrdset_delete_custom(RRDSET *st, int db_rotated) {
+void rrdset_delete_files(RRDSET *st) {
RRDDIM *rd;
-#ifndef ENABLE_ACLK
- UNUSED(db_rotated);
-#endif
rrdset_check_rdlock(st);
info("Deleting chart '%s' ('%s') from disk...", st->id, st->name);
if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || st->rrd_memory_mode == RRD_MEMORY_MODE_MAP) {
- info("Deleting chart header file '%s'.", st->cache_filename);
- if(unlikely(unlink(st->cache_filename) == -1))
- error("Cannot delete chart header file '%s'", st->cache_filename);
+ const char *cache_filename = rrdset_cache_filename(st);
+ if(cache_filename) {
+ info("Deleting chart header file '%s'.", cache_filename);
+ if (unlikely(unlink(cache_filename) == -1))
+ error("Cannot delete chart header file '%s'", cache_filename);
+ }
+ else
+ error("Cannot find the cache filename of chart '%s'", st->id);
}
rrddim_foreach_read(rd, st) {
- if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || rd->rrd_memory_mode == RRD_MEMORY_MODE_MAP)) {
- info("Deleting dimension file '%s'.", rd->cache_filename);
- if(unlikely(unlink(rd->cache_filename) == -1))
- error("Cannot delete dimension file '%s'", rd->cache_filename);
- }
- }
+ const char *cache_filename = rrddim_cache_filename(rd);
+ if(!cache_filename) continue;
- recursively_delete_dir(st->cache_dir, "left-over chart");
-#ifdef ENABLE_ACLK
- if ((netdata_cloud_setting) && (db_rotated || RRD_MEMORY_MODE_DBENGINE != st->rrd_memory_mode)) {
- aclk_del_collector(st->rrdhost, st->plugin_name, st->module_name);
- st->rrdhost->deleted_charts_count++;
+ info("Deleting dimension file '%s'.", cache_filename);
+ if(unlikely(unlink(cache_filename) == -1))
+ error("Cannot delete dimension file '%s'", cache_filename);
}
-#endif
+ recursively_delete_dir(st->cache_dir, "left-over chart");
}
void rrdset_delete_obsolete_dimensions(RRDSET *st) {
@@ -484,11 +464,11 @@ void rrdset_delete_obsolete_dimensions(RRDSET *st) {
rrddim_foreach_read(rd, st) {
if(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)) {
- if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || rd->rrd_memory_mode == RRD_MEMORY_MODE_MAP)) {
- info("Deleting dimension file '%s'.", rd->cache_filename);
- if(unlikely(unlink(rd->cache_filename) == -1))
- error("Cannot delete dimension file '%s'", rd->cache_filename);
- }
+ const char *cache_filename = rrddim_cache_filename(rd);
+ if(!cache_filename) continue;
+ info("Deleting dimension file '%s'.", cache_filename);
+ if(unlikely(unlink(cache_filename) == -1))
+ error("Cannot delete dimension file '%s'", cache_filename);
}
}
}
@@ -507,6 +487,14 @@ static inline RRDSET *rrdset_find_on_create(RRDHOST *host, const char *fullid) {
return NULL;
}
+static inline void rrdset_update_permanent_labels(RRDSET *st) {
+ if(!st->state || !st->state->chart_labels) return;
+
+ rrdlabels_add(st->state->chart_labels, "_collect_plugin", st->plugin_name, RRDLABEL_SRC_AUTO| RRDLABEL_FLAG_PERMANENT);
+ rrdlabels_add(st->state->chart_labels, "_collect_module", st->module_name, RRDLABEL_SRC_AUTO| RRDLABEL_FLAG_PERMANENT);
+ rrdlabels_add(st->state->chart_labels, "_instance_family", st->family, RRDLABEL_SRC_AUTO| RRDLABEL_FLAG_PERMANENT);
+}
+
RRDSET *rrdset_create_custom(
RRDHOST *host
, const char *type
@@ -660,22 +648,7 @@ RRDSET *rrdset_create_custom(
}
if (mark_rebuild) {
-#ifdef ENABLE_ACLK
- if (netdata_cloud_setting) {
- if (mark_rebuild & META_CHART_ACTIVATED) {
- aclk_add_collector(host, st->plugin_name, st->module_name);
- }
- else {
- if (mark_rebuild & (META_PLUGIN_UPDATED | META_MODULE_UPDATED)) {
- aclk_del_collector(
- host, mark_rebuild & META_PLUGIN_UPDATED ? old_plugin : st->plugin_name,
- mark_rebuild & META_MODULE_UPDATED ? old_module : st->module_name);
- aclk_add_collector(host, st->plugin_name, st->module_name);
- }
- }
- rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
- }
-#endif
+ rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
freez(old_plugin);
freez(old_module);
freez(old_title);
@@ -701,8 +674,11 @@ RRDSET *rrdset_create_custom(
}
}
/* Fall-through during switch from archived to active so that the host lock is taken and health is linked */
- if (!changed_from_archived_to_active)
+ if (!changed_from_archived_to_active) {
+ rrdset_update_permanent_labels(st);
+ rrdcontext_updated_rrdset(st);
return st;
+ }
}
rrdhost_wrlock(host);
@@ -722,11 +698,10 @@ RRDSET *rrdset_create_custom(
rrdhost_unlock(host);
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdcontext_updated_rrdset(st);
return st;
}
- char fullfilename[FILENAME_MAX + 1];
-
// ------------------------------------------------------------------------
// get the options from the config, we need to create it
@@ -734,126 +709,37 @@ RRDSET *rrdset_create_custom(
if (memory_mode != RRD_MEMORY_MODE_DBENGINE)
entries = align_entries_to_pagesize(memory_mode, history_entries);
- unsigned long size = sizeof(RRDSET);
char *cache_dir = rrdset_cache_dir(host, fullid);
- time_t now = now_realtime_sec();
-
// ------------------------------------------------------------------------
// load it or allocate it
debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id);
- snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir);
- if(memory_mode == RRD_MEMORY_MODE_SAVE || memory_mode == RRD_MEMORY_MODE_MAP ||
- memory_mode == RRD_MEMORY_MODE_RAM) {
- st = (RRDSET *)netdata_mmap(
- (memory_mode == RRD_MEMORY_MODE_RAM) ? NULL : fullfilename,
- size,
- ((memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE),
- 0);
-
- if(st) {
- memset(&st->avl, 0, sizeof(avl_t));
- memset(&st->avlname, 0, sizeof(avl_t));
- memset(&st->rrdvar_root_index, 0, sizeof(avl_tree_lock));
- memset(&st->dimensions_index, 0, sizeof(avl_tree_lock));
- memset(&st->rrdset_rwlock, 0, sizeof(netdata_rwlock_t));
-
- st->name = NULL;
- st->type = NULL;
- st->family = NULL;
- st->title = NULL;
- st->units = NULL;
- st->context = NULL;
- st->cache_dir = NULL;
- st->plugin_name = NULL;
- st->module_name = NULL;
- st->dimensions = NULL;
- st->rrdfamily = NULL;
- st->rrdhost = NULL;
- st->next = NULL;
- st->variables = NULL;
- st->alarms = NULL;
- st->flags = 0x00000000;
- st->exporting_flags = NULL;
-
- if(memory_mode == RRD_MEMORY_MODE_RAM) {
- memset(st, 0, size);
- }
- else {
- if(strcmp(st->magic, RRDSET_MAGIC) != 0) {
- info("Initializing file %s.", fullfilename);
- memset(st, 0, size);
- }
- else if(strcmp(st->id, fullid) != 0) {
- error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid);
- // munmap(st, size);
- // st = NULL;
- memset(st, 0, size);
- }
- else if(st->memsize != size || st->entries != entries) {
- error("File %s does not have the desired size. Clearing it.", fullfilename);
- memset(st, 0, size);
- }
- else if(st->update_every != update_every) {
- error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
- memset(st, 0, size);
- }
- else if((now - st->last_updated.tv_sec) > update_every * entries) {
- info("File %s is too old. Clearing it.", fullfilename);
- memset(st, 0, size);
- }
- else if(st->last_updated.tv_sec > now + update_every) {
- error("File %s refers to the future by %zd secs. Resetting it to now.", fullfilename, (ssize_t)(st->last_updated.tv_sec - now));
- st->last_updated.tv_sec = now;
- }
-
- // make sure the database is aligned
- if(st->last_updated.tv_sec) {
- st->update_every = update_every;
- last_updated_time_align(st);
- }
- }
-
- // make sure we have the right memory mode
- // even if we cleared the memory
- st->rrd_memory_mode = memory_mode;
- }
- }
-
- if(unlikely(!st)) {
- st = callocz(1, size);
- if (memory_mode == RRD_MEMORY_MODE_DBENGINE)
- st->rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
- else
- st->rrd_memory_mode = (memory_mode == RRD_MEMORY_MODE_NONE) ? RRD_MEMORY_MODE_NONE : RRD_MEMORY_MODE_ALLOC;
- }
-
- st->plugin_name = plugin?strdupz(plugin):NULL;
- st->module_name = module?strdupz(module):NULL;
-
- st->rrdhost = host;
- st->memsize = size;
- st->entries = entries;
- st->update_every = update_every;
-
- if(st->current_entry >= st->entries) st->current_entry = 0;
-
- strcpy(st->cache_filename, fullfilename);
- strcpy(st->magic, RRDSET_MAGIC);
+ st = callocz(1, sizeof(RRDSET));
+ st->state = callocz(1, sizeof(*st->state));
strcpy(st->id, fullid);
st->hash = simple_hash(st->id);
+ st->rrdhost = host;
st->cache_dir = cache_dir;
+ st->entries = entries;
+ st->update_every = update_every;
- st->chart_type = chart_type;
- st->type = strdupz(type);
-
- st->state = callocz(1, sizeof(*st->state));
+ if(memory_mode == RRD_MEMORY_MODE_SAVE || memory_mode == RRD_MEMORY_MODE_MAP) {
+ if(!rrdset_memory_load_or_create_map_save(st, memory_mode)) {
+ info("Failed to use memory mode %s for chart '%s', falling back to ram", (memory_mode == RRD_MEMORY_MODE_MAP)?"map":"save", st->name);
+ memory_mode = RRD_MEMORY_MODE_RAM;
+ }
+ }
+ st->rrd_memory_mode = memory_mode;
- st->family = family ? strdupz(family) : strdupz(st->type);
+ st->plugin_name = plugin?strdupz(plugin):NULL;
+ st->module_name = module?strdupz(module):NULL;
+ st->chart_type = chart_type;
+ st->type = strdupz(type);
+ st->family = family ? strdupz(family) : strdupz(st->type);
json_fix_string(st->family);
st->state->is_ar_chart = strcmp(st->id, ML_ANOMALY_RATES_CHART_ID) == 0;
@@ -874,21 +760,14 @@ RRDSET *rrdset_create_custom(
st->green = NAN;
st->red = NAN;
- st->last_collected_time.tv_sec = 0;
- st->last_collected_time.tv_usec = 0;
- st->counter_done = 0;
- st->rrddim_page_alignment = 0;
-
st->gap_when_lost_iterations_above = (int) (gap_when_lost_iterations_above + 2);
- st->last_accessed_time = 0;
- st->upstream_resync_time = 0;
-
avl_init_lock(&st->dimensions_index, rrddim_compare);
avl_init_lock(&st->rrdvar_root_index, rrdvar_compare);
netdata_rwlock_init(&st->rrdset_rwlock);
- netdata_rwlock_init(&st->state->labels.labels_rwlock);
+ st->state->chart_labels = rrdlabels_create();
+ rrdset_update_permanent_labels(st);
if(name && *name && rrdset_set_name(st, name))
// we did set the name
@@ -930,11 +809,7 @@ RRDSET *rrdset_create_custom(
compute_chart_hash(st);
rrdhost_unlock(host);
-#ifdef ENABLE_ACLK
- if (netdata_cloud_setting)
- aclk_add_collector(host, plugin, module);
- rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
-#endif
+ rrdcontext_updated_rrdset(st);
return(st);
}
@@ -994,7 +869,8 @@ inline void rrdset_next_usec(RRDSET *st, usec_t microseconds) {
if(unlikely(since_last_usec < 0)) {
// oops! the database is in the future
#ifdef NETDATA_INTERNAL_CHECKS
- info("RRD database for chart '%s' on host '%s' is %0.5" LONG_DOUBLE_MODIFIER " secs in the future (counter #%zu, update #%zu). Adjusting it to current time.", st->id, st->rrdhost->hostname, (LONG_DOUBLE)-since_last_usec / USEC_PER_SEC, st->counter, st->counter_done);
+ info("RRD database for chart '%s' on host '%s' is %0.5" NETDATA_DOUBLE_MODIFIER
+ " secs in the future (counter #%zu, update #%zu). Adjusting it to current time.", st->id, st->rrdhost->hostname, (NETDATA_DOUBLE)-since_last_usec / USEC_PER_SEC, st->counter, st->counter_done);
#endif
st->last_collected_time.tv_sec = now.tv_sec - st->update_every;
@@ -1013,7 +889,8 @@ inline void rrdset_next_usec(RRDSET *st, usec_t microseconds) {
else if(unlikely((usec_t)since_last_usec > (usec_t)(st->update_every * 5 * USEC_PER_SEC))) {
// oops! the database is too far behind
#ifdef NETDATA_INTERNAL_CHECKS
- info("RRD database for chart '%s' on host '%s' is %0.5" LONG_DOUBLE_MODIFIER " secs in the past (counter #%zu, update #%zu). Adjusting it to current time.", st->id, st->rrdhost->hostname, (LONG_DOUBLE)since_last_usec / USEC_PER_SEC, st->counter, st->counter_done);
+ info("RRD database for chart '%s' on host '%s' is %0.5" NETDATA_DOUBLE_MODIFIER
+ " secs in the past (counter #%zu, update #%zu). Adjusting it to current time.", st->id, st->rrdhost->hostname, (NETDATA_DOUBLE)since_last_usec / USEC_PER_SEC, st->counter, st->counter_done);
#endif
microseconds = (usec_t)since_last_usec;
@@ -1070,7 +947,7 @@ static inline usec_t rrdset_init_last_collected_time(RRDSET *st) {
usec_t last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "initialized last collected time to %0.3" LONG_DOUBLE_MODIFIER, (LONG_DOUBLE)last_collect_ut / USEC_PER_SEC);
+ rrdset_debug(st, "initialized last collected time to %0.3" NETDATA_DOUBLE_MODIFIER, (NETDATA_DOUBLE)last_collect_ut / USEC_PER_SEC);
#endif
return last_collect_ut;
@@ -1083,7 +960,7 @@ static inline usec_t rrdset_update_last_collected_time(RRDSET *st) {
st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "updated last collected time to %0.3" LONG_DOUBLE_MODIFIER, (LONG_DOUBLE)last_collect_ut / USEC_PER_SEC);
+ rrdset_debug(st, "updated last collected time to %0.3" NETDATA_DOUBLE_MODIFIER, (NETDATA_DOUBLE)last_collect_ut / USEC_PER_SEC);
#endif
return last_collect_ut;
@@ -1102,12 +979,110 @@ static inline usec_t rrdset_init_last_updated_time(RRDSET *st) {
usec_t last_updated_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec;
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "initialized last updated time to %0.3" LONG_DOUBLE_MODIFIER, (LONG_DOUBLE)last_updated_ut / USEC_PER_SEC);
+ rrdset_debug(st, "initialized last updated time to %0.3" NETDATA_DOUBLE_MODIFIER, (NETDATA_DOUBLE)last_updated_ut / USEC_PER_SEC);
#endif
return last_updated_ut;
}
+static inline time_t tier_next_point_time(RRDDIM *rd, struct rrddim_tier *t, time_t now) {
+ time_t loop = (time_t)rd->update_every * (time_t)t->tier_grouping;
+ return now + loop - ((now + loop) % loop);
+}
+
+void store_metric_at_tier(RRDDIM *rd, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut) {
+ if (unlikely(!t->next_point_time))
+ t->next_point_time = tier_next_point_time(rd, t, sp.end_time);
+
+ // merge the dates into our virtual point
+ if (unlikely(sp.start_time < t->virtual_point.start_time))
+ t->virtual_point.start_time = sp.start_time;
+
+ if (likely(sp.end_time > t->virtual_point.end_time))
+ t->virtual_point.end_time = sp.end_time;
+
+ // merge the values into our virtual point
+ if (likely(!storage_point_is_empty(sp))) {
+ // we aggregate only non NULLs into higher tiers
+
+ if (likely(!storage_point_is_unset(t->virtual_point))) {
+ // merge the collected point to our virtual one
+ t->virtual_point.sum += sp.sum;
+ t->virtual_point.min = MIN(t->virtual_point.min, sp.min);
+ t->virtual_point.max = MAX(t->virtual_point.max, sp.max);
+ t->virtual_point.count += sp.count;
+ t->virtual_point.anomaly_count += sp.anomaly_count;
+ t->virtual_point.flags |= sp.flags;
+ }
+ else {
+ // reset our virtual point to this one
+ t->virtual_point = sp;
+ }
+ }
+
+ if(unlikely(sp.end_time >= t->next_point_time)) {
+ if (likely(!storage_point_is_unset(t->virtual_point))) {
+
+ t->collect_ops.store_metric(
+ t->db_collection_handle,
+ now_ut,
+ t->virtual_point.sum,
+ t->virtual_point.min,
+ t->virtual_point.max,
+ t->virtual_point.count,
+ t->virtual_point.anomaly_count,
+ t->virtual_point.flags);
+ }
+ else {
+ t->collect_ops.store_metric(
+ t->db_collection_handle,
+ now_ut,
+ NAN,
+ NAN,
+ NAN,
+ 0,
+ 0, SN_FLAG_NONE);
+ }
+
+ t->virtual_point.count = 0;
+ t->next_point_time = tier_next_point_time(rd, t, sp.end_time);
+ }
+}
+
+static void store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) {
+
+ // store the metric on tier 0
+ rd->tiers[0]->collect_ops.store_metric(rd->tiers[0]->db_collection_handle, point_end_time_ut, n, 0, 0, 1, 0, flags);
+
+ for(int tier = 1; tier < storage_tiers ;tier++) {
+ if(unlikely(!rd->tiers[tier])) continue;
+
+ struct rrddim_tier *t = rd->tiers[tier];
+
+ time_t now = (time_t)(point_end_time_ut / USEC_PER_SEC);
+
+ if(!t->last_collected_ut) {
+ // we have not collected this tier before
+ // let's fill any gap that may exist
+ rrdr_fill_tier_gap_from_smaller_tiers(rd, tier, now);
+ }
+
+ STORAGE_POINT sp = {
+ .start_time = now - rd->update_every,
+ .end_time = now,
+ .min = n,
+ .max = n,
+ .sum = n,
+ .count = 1,
+ .anomaly_count = (flags & SN_FLAG_NOT_ANOMALOUS) ? 0 : 1,
+ .flags = flags
+ };
+
+ t->last_collected_ut = point_end_time_ut;
+ store_metric_at_tier(rd, t, sp, point_end_time_ut);
+ }
+}
+
static inline size_t rrdset_done_interpolate(
RRDSET *st
, usec_t update_every_ut
@@ -1131,17 +1106,17 @@ static inline size_t rrdset_done_interpolate(
size_t counter = st->counter;
long current_entry = st->current_entry;
- uint32_t storage_flags = SN_DEFAULT_FLAGS;
+ SN_FLAGS storage_flags = SN_DEFAULT_FLAGS;
if (has_reset_value)
- storage_flags |= SN_EXISTS_RESET;
+ storage_flags |= SN_FLAG_RESET;
for( ; next_store_ut <= now_collect_ut ; last_collect_ut = next_store_ut, next_store_ut += update_every_ut, iterations-- ) {
#ifdef NETDATA_INTERNAL_CHECKS
if(iterations < 0) { error("INTERNAL CHECK: %s: iterations calculation wrapped! first_ut = %llu, last_stored_ut = %llu, next_store_ut = %llu, now_collect_ut = %llu", st->name, first_ut, last_stored_ut, next_store_ut, now_collect_ut); }
- rrdset_debug(st, "last_stored_ut = %0.3" LONG_DOUBLE_MODIFIER " (last updated time)", (LONG_DOUBLE)last_stored_ut/USEC_PER_SEC);
- rrdset_debug(st, "next_store_ut = %0.3" LONG_DOUBLE_MODIFIER " (next interpolation point)", (LONG_DOUBLE)next_store_ut/USEC_PER_SEC);
+ rrdset_debug(st, "last_stored_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last updated time)", (NETDATA_DOUBLE)last_stored_ut/USEC_PER_SEC);
+ rrdset_debug(st, "next_store_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (next interpolation point)", (NETDATA_DOUBLE)next_store_ut/USEC_PER_SEC);
#endif
last_ut = next_store_ut;
@@ -1150,20 +1125,19 @@ static inline size_t rrdset_done_interpolate(
if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED))
continue;
- calculated_number new_value;
+ NETDATA_DOUBLE new_value;
switch(rd->algorithm) {
case RRD_ALGORITHM_INCREMENTAL:
- new_value = (calculated_number)
+ new_value = (NETDATA_DOUBLE)
( rd->calculated_value
- * (calculated_number)(next_store_ut - last_collect_ut)
- / (calculated_number)(now_collect_ut - last_collect_ut)
+ * (NETDATA_DOUBLE)(next_store_ut - last_collect_ut)
+ / (NETDATA_DOUBLE)(now_collect_ut - last_collect_ut)
);
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: CALC2 INC "
- CALCULATED_NUMBER_FORMAT " = "
- CALCULATED_NUMBER_FORMAT
+ rrdset_debug(st, "%s: CALC2 INC " NETDATA_DOUBLE_FORMAT " = "
+ NETDATA_DOUBLE_FORMAT
" * (%llu - %llu)"
" / (%llu - %llu)"
, rd->name
@@ -1177,18 +1151,18 @@ static inline size_t rrdset_done_interpolate(
rd->calculated_value -= new_value;
new_value += rd->last_calculated_value;
rd->last_calculated_value = 0;
- new_value /= (calculated_number)st->update_every;
+ new_value /= (NETDATA_DOUBLE)st->update_every;
if(unlikely(next_store_ut - last_stored_ut < update_every_ut)) {
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: COLLECTION POINT IS SHORT " CALCULATED_NUMBER_FORMAT " - EXTRAPOLATING",
+ rrdset_debug(st, "%s: COLLECTION POINT IS SHORT " NETDATA_DOUBLE_FORMAT " - EXTRAPOLATING",
rd->name
- , (calculated_number)(next_store_ut - last_stored_ut)
+ , (NETDATA_DOUBLE)(next_store_ut - last_stored_ut)
);
#endif
- new_value = new_value * (calculated_number)(st->update_every * USEC_PER_SEC) / (calculated_number)(next_store_ut - last_stored_ut);
+ new_value = new_value * (NETDATA_DOUBLE)(st->update_every * USEC_PER_SEC) / (NETDATA_DOUBLE)(next_store_ut - last_stored_ut);
}
break;
@@ -1207,21 +1181,19 @@ static inline size_t rrdset_done_interpolate(
// we have missed an update
// interpolate in the middle values
- new_value = (calculated_number)
+ new_value = (NETDATA_DOUBLE)
( ( (rd->calculated_value - rd->last_calculated_value)
- * (calculated_number)(next_store_ut - last_collect_ut)
- / (calculated_number)(now_collect_ut - last_collect_ut)
+ * (NETDATA_DOUBLE)(next_store_ut - last_collect_ut)
+ / (NETDATA_DOUBLE)(now_collect_ut - last_collect_ut)
)
+ rd->last_calculated_value
);
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: CALC2 DEF "
- CALCULATED_NUMBER_FORMAT " = ((("
- "(" CALCULATED_NUMBER_FORMAT " - " CALCULATED_NUMBER_FORMAT ")"
+ rrdset_debug(st, "%s: CALC2 DEF " NETDATA_DOUBLE_FORMAT " = ((("
+ "(" NETDATA_DOUBLE_FORMAT " - " NETDATA_DOUBLE_FORMAT ")"
" * %llu"
- " / %llu) + " CALCULATED_NUMBER_FORMAT
- , rd->name
+ " / %llu) + " NETDATA_DOUBLE_FORMAT, rd->name
, new_value
, rd->calculated_value, rd->last_calculated_value
, (next_store_ut - first_ut)
@@ -1234,9 +1206,7 @@ static inline size_t rrdset_done_interpolate(
if(unlikely(!store_this_entry)) {
(void) ml_is_anomalous(rd, 0, false);
-
- rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT);
-// rd->values[current_entry] = SN_EMPTY_SLOT;
+ store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE);
continue;
}
@@ -1245,69 +1215,26 @@ static inline size_t rrdset_done_interpolate(
if (ml_is_anomalous(rd, new_value, true)) {
// clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
- dim_storage_flags &= ~ ((uint32_t) SN_ANOMALY_BIT);
+ dim_storage_flags &= ~((storage_number)SN_FLAG_NOT_ANOMALOUS);
}
- rd->state->collect_ops.store_metric(rd, next_store_ut, pack_storage_number(new_value, dim_storage_flags));
-// rd->values[current_entry] = pack_storage_number(new_value, storage_flags );
+ store_metric(rd, next_store_ut, new_value, dim_storage_flags);
rd->last_stored_value = new_value;
-
- #ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: STORE[%ld] "
- CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT
- , rd->name
- , current_entry
- , unpack_storage_number(rd->values[current_entry]), new_value
- );
- #endif
}
else {
(void) ml_is_anomalous(rd, 0, false);
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING "
- , rd->name
- , current_entry
- );
+ rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING ", rd->name, current_entry);
#endif
-// rd->values[current_entry] = SN_EMPTY_SLOT;
- rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT);
+ store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE);
rd->last_stored_value = NAN;
}
stored_entries++;
-
- #ifdef NETDATA_INTERNAL_CHECKS
- if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) {
- calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor;
- calculated_number t2 = unpack_storage_number(rd->values[current_entry]);
-
- calculated_number accuracy = accuracy_loss(t1, t2);
- debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)"
- , st->id, rd->name
- , current_entry
- , t2
- , t1
- , accuracy
- , (accuracy > ACCURACY_LOSS_ACCEPTED_PERCENT) ? " **TOO BIG** " : ""
- );
-
- rd->collected_volume += t1;
- rd->stored_volume += t2;
-
- accuracy = accuracy_loss(rd->collected_volume, rd->stored_volume);
- debug(D_RRD_STATS, "%s/%s: VOLUME[%ld] = " CALCULATED_NUMBER_FORMAT ", calculated = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s"
- , st->id, rd->name
- , current_entry
- , rd->stored_volume
- , rd->collected_volume
- , accuracy
- , (accuracy > ACCURACY_LOSS_ACCEPTED_PERCENT) ? " **TOO BIG** " : ""
- );
- }
- #endif
}
+
// reset the storage flags for the next point, if any;
storage_flags = SN_DEFAULT_FLAGS;
@@ -1344,7 +1271,7 @@ static inline void rrdset_done_fill_the_gap(RRDSET *st) {
long current_entry = st->current_entry;
for(c = 0; c < entries && next_store_ut <= now_collect_ut ; next_store_ut += update_every_ut, c++) {
- rd->values[current_entry] = SN_EMPTY_SLOT;
+ rd->db[current_entry] = pack_storage_number(NAN, SN_FLAG_NONE);
current_entry = ((current_entry + 1) >= entries) ? 0 : current_entry + 1;
#ifdef NETDATA_INTERNAL_CHECKS
@@ -1368,6 +1295,7 @@ void rrdset_done(RRDSET *st) {
if(unlikely(netdata_exit)) return;
debug(D_RRD_CALLS, "rrdset_done() for chart %s", st->name);
+ rrdcontext_collected_rrdset(st);
RRDDIM *rd;
@@ -1405,7 +1333,8 @@ void rrdset_done(RRDSET *st) {
// check if the chart has a long time to be updated
if(unlikely(st->usec_since_last_update > st->entries * update_every_ut &&
st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE && st->rrd_memory_mode != RRD_MEMORY_MODE_NONE)) {
- info("host '%s', chart %s: took too long to be updated (counter #%zu, update #%zu, %0.3" LONG_DOUBLE_MODIFIER " secs). Resetting it.", st->rrdhost->hostname, st->name, st->counter, st->counter_done, (LONG_DOUBLE)st->usec_since_last_update / USEC_PER_SEC);
+ info("host '%s', chart %s: took too long to be updated (counter #%zu, update #%zu, %0.3" NETDATA_DOUBLE_MODIFIER
+ " secs). Resetting it.", st->rrdhost->hostname, st->name, st->counter, st->counter_done, (NETDATA_DOUBLE)st->usec_since_last_update / USEC_PER_SEC);
rrdset_reset(st);
st->usec_since_last_update = update_every_ut;
store_this_entry = 0;
@@ -1504,6 +1433,7 @@ void rrdset_done(RRDSET *st) {
// and we have collected metrics for this chart in the past (st->counter != 0)
// fill the gap (the chart has been just loaded from disk)
if(unlikely(st->counter) && st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) {
+ // TODO this should be inside the storage engine
rrdset_done_fill_the_gap(st);
last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec;
next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC;
@@ -1531,6 +1461,7 @@ void rrdset_done(RRDSET *st) {
#endif
}
}
+
after_first_database_work:
st->counter_done++;
@@ -1541,10 +1472,10 @@ after_first_database_work:
}
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "last_collect_ut = %0.3" LONG_DOUBLE_MODIFIER " (last collection time)", (LONG_DOUBLE)last_collect_ut/USEC_PER_SEC);
- rrdset_debug(st, "now_collect_ut = %0.3" LONG_DOUBLE_MODIFIER " (current collection time)", (LONG_DOUBLE)now_collect_ut/USEC_PER_SEC);
- rrdset_debug(st, "last_stored_ut = %0.3" LONG_DOUBLE_MODIFIER " (last updated time)", (LONG_DOUBLE)last_stored_ut/USEC_PER_SEC);
- rrdset_debug(st, "next_store_ut = %0.3" LONG_DOUBLE_MODIFIER " (next interpolation point)", (LONG_DOUBLE)next_store_ut/USEC_PER_SEC);
+ rrdset_debug(st, "last_collect_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last collection time)", (NETDATA_DOUBLE)last_collect_ut/USEC_PER_SEC);
+ rrdset_debug(st, "now_collect_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (current collection time)", (NETDATA_DOUBLE)now_collect_ut/USEC_PER_SEC);
+ rrdset_debug(st, "last_stored_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last updated time)", (NETDATA_DOUBLE)last_stored_ut/USEC_PER_SEC);
+ rrdset_debug(st, "next_store_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (next interpolation point)", (NETDATA_DOUBLE)next_store_ut/USEC_PER_SEC);
#endif
// calculate totals and count the dimensions
@@ -1553,7 +1484,9 @@ after_first_database_work:
rrddim_foreach_read(rd, st) {
if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED))
continue;
+
dimensions++;
+
if(likely(rd->updated))
st->collected_total += rd->collected_value;
}
@@ -1581,9 +1514,8 @@ after_first_database_work:
rrdset_debug(st, "%s: START "
" last_collected_value = " COLLECTED_NUMBER_FORMAT
" collected_value = " COLLECTED_NUMBER_FORMAT
- " last_calculated_value = " CALCULATED_NUMBER_FORMAT
- " calculated_value = " CALCULATED_NUMBER_FORMAT
- , rd->name
+ " last_calculated_value = " NETDATA_DOUBLE_FORMAT
+ " calculated_value = " NETDATA_DOUBLE_FORMAT, rd->name
, rd->last_collected_value
, rd->collected_value
, rd->last_calculated_value
@@ -1593,21 +1525,19 @@ after_first_database_work:
switch(rd->algorithm) {
case RRD_ALGORITHM_ABSOLUTE:
- rd->calculated_value = (calculated_number)rd->collected_value
- * (calculated_number)rd->multiplier
- / (calculated_number)rd->divisor;
+ rd->calculated_value = (NETDATA_DOUBLE)rd->collected_value
+ * (NETDATA_DOUBLE)rd->multiplier
+ / (NETDATA_DOUBLE)rd->divisor;
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: CALC ABS/ABS-NO-IN "
- CALCULATED_NUMBER_FORMAT " = "
+ rrdset_debug(st, "%s: CALC ABS/ABS-NO-IN " NETDATA_DOUBLE_FORMAT " = "
COLLECTED_NUMBER_FORMAT
- " * " CALCULATED_NUMBER_FORMAT
- " / " CALCULATED_NUMBER_FORMAT
- , rd->name
+ " * " NETDATA_DOUBLE_FORMAT
+ " / " NETDATA_DOUBLE_FORMAT, rd->name
, rd->calculated_value
, rd->collected_value
- , (calculated_number)rd->multiplier
- , (calculated_number)rd->divisor
+ , (NETDATA_DOUBLE)rd->multiplier
+ , (NETDATA_DOUBLE)rd->divisor
);
#endif
@@ -1620,13 +1550,12 @@ after_first_database_work:
// the percentage of the current value
// over the total of all dimensions
rd->calculated_value =
- (calculated_number)100
- * (calculated_number)rd->collected_value
- / (calculated_number)st->collected_total;
+ (NETDATA_DOUBLE)100
+ * (NETDATA_DOUBLE)rd->collected_value
+ / (NETDATA_DOUBLE)st->collected_total;
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: CALC PCENT-ROW "
- CALCULATED_NUMBER_FORMAT " = 100"
+ rrdset_debug(st, "%s: CALC PCENT-ROW " NETDATA_DOUBLE_FORMAT " = 100"
" * " COLLECTED_NUMBER_FORMAT
" / " COLLECTED_NUMBER_FORMAT
, rd->name
@@ -1677,34 +1606,32 @@ after_first_database_work:
// TODO: remember recent history of rates and compare with current rate to reduce this chance.
if (delta < max_acceptable_rate) {
rd->calculated_value +=
- (calculated_number) delta
- * (calculated_number) rd->multiplier
- / (calculated_number) rd->divisor;
+ (NETDATA_DOUBLE) delta
+ * (NETDATA_DOUBLE) rd->multiplier
+ / (NETDATA_DOUBLE) rd->divisor;
} else {
// This is a reset. Any overflow with a rate greater than MAX_INCREMENTAL_PERCENT_RATE will also
// be detected as a reset instead.
- rd->calculated_value += (calculated_number)0;
+ rd->calculated_value += (NETDATA_DOUBLE)0;
}
}
else {
rd->calculated_value +=
- (calculated_number) (rd->collected_value - rd->last_collected_value)
- * (calculated_number) rd->multiplier
- / (calculated_number) rd->divisor;
+ (NETDATA_DOUBLE) (rd->collected_value - rd->last_collected_value)
+ * (NETDATA_DOUBLE) rd->multiplier
+ / (NETDATA_DOUBLE) rd->divisor;
}
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: CALC INC PRE "
- CALCULATED_NUMBER_FORMAT " = ("
+ rrdset_debug(st, "%s: CALC INC PRE " NETDATA_DOUBLE_FORMAT " = ("
COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT
")"
- " * " CALCULATED_NUMBER_FORMAT
- " / " CALCULATED_NUMBER_FORMAT
- , rd->name
+ " * " NETDATA_DOUBLE_FORMAT
+ " / " NETDATA_DOUBLE_FORMAT, rd->name
, rd->calculated_value
, rd->collected_value, rd->last_collected_value
- , (calculated_number)rd->multiplier
- , (calculated_number)rd->divisor
+ , (NETDATA_DOUBLE)rd->multiplier
+ , (NETDATA_DOUBLE)rd->divisor
);
#endif
@@ -1737,13 +1664,12 @@ after_first_database_work:
rd->calculated_value = 0;
else
rd->calculated_value =
- (calculated_number)100
- * (calculated_number)(rd->collected_value - rd->last_collected_value)
- / (calculated_number)(st->collected_total - st->last_collected_total);
+ (NETDATA_DOUBLE)100
+ * (NETDATA_DOUBLE)(rd->collected_value - rd->last_collected_value)
+ / (NETDATA_DOUBLE)(st->collected_total - st->last_collected_total);
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: CALC PCENT-DIFF "
- CALCULATED_NUMBER_FORMAT " = 100"
+ rrdset_debug(st, "%s: CALC PCENT-DIFF " NETDATA_DOUBLE_FORMAT " = 100"
" * (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
" / (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
, rd->name
@@ -1761,8 +1687,7 @@ after_first_database_work:
rd->calculated_value = 0;
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: CALC "
- CALCULATED_NUMBER_FORMAT " = 0"
+ rrdset_debug(st, "%s: CALC " NETDATA_DOUBLE_FORMAT " = 0"
, rd->name
, rd->calculated_value
);
@@ -1775,9 +1700,8 @@ after_first_database_work:
rrdset_debug(st, "%s: PHASE2 "
" last_collected_value = " COLLECTED_NUMBER_FORMAT
" collected_value = " COLLECTED_NUMBER_FORMAT
- " last_calculated_value = " CALCULATED_NUMBER_FORMAT
- " calculated_value = " CALCULATED_NUMBER_FORMAT
- , rd->name
+ " last_calculated_value = " NETDATA_DOUBLE_FORMAT
+ " calculated_value = " NETDATA_DOUBLE_FORMAT, rd->name
, rd->last_collected_value
, rd->collected_value
, rd->last_calculated_value
@@ -1791,10 +1715,10 @@ after_first_database_work:
// it is now time to interpolate values on a second boundary
#ifdef NETDATA_INTERNAL_CHECKS
- if(unlikely(now_collect_ut < next_store_ut)) {
+ if(unlikely(now_collect_ut < next_store_ut && st->counter_done > 1)) {
// this is collected in the same interpolation point
rrdset_debug(st, "THIS IS IN THE SAME INTERPOLATION POINT");
- info("INTERNAL CHECK: host '%s', chart '%s' is collected in the same interpolation point: short by %llu microseconds", st->rrdhost->hostname, st->name, next_store_ut - now_collect_ut);
+ info("INTERNAL CHECK: host '%s', chart '%s' collection %zu is in the same interpolation point: short by %llu microseconds", st->rrdhost->hostname, st->name, st->counter_done, next_store_ut - now_collect_ut);
}
#endif
@@ -1811,14 +1735,14 @@ after_first_database_work:
after_second_database_work:
st->last_collected_total = st->collected_total;
-#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
+#ifdef ENABLE_ACLK
time_t mark = now_realtime_sec();
#endif
rrddim_foreach_read(rd, st) {
if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED))
continue;
-#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
+#ifdef ENABLE_ACLK
if (likely(!st->state->is_ar_chart)) {
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN) && likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK)))
queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark));
@@ -1837,7 +1761,8 @@ after_second_database_work:
case RRD_ALGORITHM_INCREMENTAL:
if(unlikely(!first_entry)) {
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", rd->name, rd->last_calculated_value + rd->calculated_value, rd->calculated_value);
+ rrdset_debug(st, "%s: setting last_calculated_value (old: " NETDATA_DOUBLE_FORMAT
+ ") to last_calculated_value (new: " NETDATA_DOUBLE_FORMAT ")", rd->name, rd->last_calculated_value + rd->calculated_value, rd->calculated_value);
#endif
rd->last_calculated_value += rd->calculated_value;
@@ -1853,7 +1778,8 @@ after_second_database_work:
case RRD_ALGORITHM_PCENT_OVER_ROW_TOTAL:
case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL:
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", rd->name, rd->last_calculated_value, rd->calculated_value);
+ rrdset_debug(st, "%s: setting last_calculated_value (old: " NETDATA_DOUBLE_FORMAT
+ ") to last_calculated_value (new: " NETDATA_DOUBLE_FORMAT ")", rd->name, rd->last_calculated_value, rd->calculated_value);
#endif
rd->last_calculated_value = rd->calculated_value;
@@ -1868,9 +1794,8 @@ after_second_database_work:
rrdset_debug(st, "%s: END "
" last_collected_value = " COLLECTED_NUMBER_FORMAT
" collected_value = " COLLECTED_NUMBER_FORMAT
- " last_calculated_value = " CALCULATED_NUMBER_FORMAT
- " calculated_value = " CALCULATED_NUMBER_FORMAT
- , rd->name
+ " last_calculated_value = " NETDATA_DOUBLE_FORMAT
+ " calculated_value = " NETDATA_DOUBLE_FORMAT, rd->name
, rd->last_collected_value
, rd->collected_value
, rd->last_calculated_value
@@ -1883,15 +1808,24 @@ after_second_database_work:
// ALL DONE ABOUT THE DATA UPDATE
// --------------------------------------------------------------------
- // find if there are any obsolete dimensions
- time_t now = now_realtime_sec();
+ if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_MAP)) {
+ // update the memory mapped files with the latest values
+ rrdset_memory_file_update(st);
+ rrddim_foreach_read(rd, st) {
+ rrddim_memory_file_update(rd);
+ }
+ }
+
+ // find if there are any obsolete dimensions
if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE_DIMENSIONS))) {
rrddim_foreach_read(rd, st)
if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)))
break;
if(unlikely(rd)) {
+ time_t now = now_realtime_sec();
+
RRDDIM *last;
// there is a dimension to free
// upgrade our read lock to a write lock
@@ -1903,10 +1837,11 @@ after_second_database_work:
&& (rd->last_collected_time.tv_sec + rrdset_free_obsolete_time < now))) {
info("Removing obsolete dimension '%s' (%s) of '%s' (%s).", rd->name, rd->id, st->name, st->id);
- if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || rd->rrd_memory_mode == RRD_MEMORY_MODE_MAP)) {
- info("Deleting dimension file '%s'.", rd->cache_filename);
- if(unlikely(unlink(rd->cache_filename) == -1))
- error("Cannot delete dimension file '%s'", rd->cache_filename);
+ const char *cache_filename = rrddim_cache_filename(rd);
+ if(cache_filename) {
+ info("Deleting dimension file '%s'.", cache_filename);
+ if (unlikely(unlink(cache_filename) == -1))
+ error("Cannot delete dimension file '%s'", cache_filename);
}
#ifdef ENABLE_DBENGINE
@@ -1917,13 +1852,25 @@ after_second_database_work:
rrddim_flag_clear(rd, RRDDIM_FLAG_OBSOLETE);
/* only a collector can mark a chart as obsolete, so we must remove the reference */
- uint8_t can_delete_metric = rd->state->collect_ops.finalize(rd);
- if (can_delete_metric) {
+
+ size_t tiers_available = 0, tiers_said_yes = 0;
+ for(int tier = 0; tier < storage_tiers ;tier++) {
+ if(rd->tiers[tier]) {
+ tiers_available++;
+
+ if(rd->tiers[tier]->collect_ops.finalize(rd->tiers[tier]->db_collection_handle))
+ tiers_said_yes++;
+
+ rd->tiers[tier]->db_collection_handle = NULL;
+ }
+ }
+
+ if (tiers_available == tiers_said_yes && tiers_said_yes) {
/* This metric has no data and no references */
- delete_dimension_uuid(&rd->state->metric_uuid);
+ delete_dimension_uuid(&rd->metric_uuid);
} else {
/* Do not delete this dimension */
-#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
+#ifdef ENABLE_ACLK
queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark));
#endif
last = rd;
@@ -1958,102 +1905,208 @@ after_second_database_work:
netdata_thread_enable_cancelability();
}
-void rrdset_add_label_to_new_list(RRDSET *st, char *key, char *value, LABEL_SOURCE source)
-{
- st->state->new_labels = add_label_to_list(st->state->new_labels, key, value, source);
-}
-void rrdset_finalize_labels(RRDSET *st)
-{
- struct label *new_labels = st->state->new_labels;
- struct label_index *labels = &st->state->labels;
+// ----------------------------------------------------------------------------
+// compatibility layer for RRDSET files v019
+
+#define RRDSET_MAGIC_V019 "NETDATA RRD SET FILE V019"
+#define RRD_ID_LENGTH_MAX_V019 200
+
+struct avl_element_v019 {
+ void *avl_link[2];
+ signed char avl_balance;
+};
+struct avl_tree_type_v019 {
+ void *root;
+ int (*compar)(void *a, void *b);
+};
+struct avl_tree_lock_v019 {
+ struct avl_tree_type_v019 avl_tree;
+ pthread_rwlock_t rwlock;
+};
+struct rrdset_map_save_v019 {
+ struct avl_element_v019 avl; // ignored
+ struct avl_element_v019 avlname; // ignored
+ char id[RRD_ID_LENGTH_MAX_V019 + 1]; // check to reset all - update on load
+ void *name; // ignored
+ void *unused_ptr; // ignored
+ void *type; // ignored
+ void *family; // ignored
+ void *title; // ignored
+ void *units; // ignored
+ void *context; // ignored
+ uint32_t hash_context; // ignored
+ uint32_t chart_type; // ignored
+ int update_every; // check to reset all - update on load
+ long entries; // check to reset all - update on load
+ long current_entry; // NEEDS TO BE UPDATED - FIXED ON LOAD
+ uint32_t flags; // ignored
+ void *exporting_flags; // ignored
+ int gap_when_lost_iterations_above; // ignored
+ long priority; // ignored
+ uint32_t rrd_memory_mode; // ignored
+ void *cache_dir; // ignored
+ char cache_filename[FILENAME_MAX+1]; // ignored - update on load
+ pthread_rwlock_t rrdset_rwlock; // ignored
+ size_t counter; // NEEDS TO BE UPDATED - maintained on load
+ size_t counter_done; // ignored
+ union { //
+ time_t last_accessed_time; // ignored
+ time_t last_entry_t; // ignored
+ }; //
+ time_t upstream_resync_time; // ignored
+ void *plugin_name; // ignored
+ void *module_name; // ignored
+ void *chart_uuid; // ignored
+ void *state; // ignored
+ size_t unused[3]; // ignored
+ size_t rrddim_page_alignment; // ignored
+ uint32_t hash; // ignored
+ uint32_t hash_name; // ignored
+ usec_t usec_since_last_update; // NEEDS TO BE UPDATED - maintained on load
+ struct timeval last_updated; // NEEDS TO BE UPDATED - check to reset all - fixed on load
+ struct timeval last_collected_time; // ignored
+ long long collected_total; // NEEDS TO BE UPDATED - maintained on load
+ long long last_collected_total; // NEEDS TO BE UPDATED - maintained on load
+ void *rrdfamily; // ignored
+ void *rrdhost; // ignored
+ void *next; // ignored
+ long double green; // ignored
+ long double red; // ignored
+ struct avl_tree_lock_v019 rrdvar_root_index; // ignored
+ void *variables; // ignored
+ void *alarms; // ignored
+ unsigned long memsize; // check to reset all - update on load
+ char magic[sizeof(RRDSET_MAGIC_V019) + 1]; // check to reset all - update on load
+ struct avl_tree_lock_v019 dimensions_index; // ignored
+ void *dimensions; // ignored
+};
+
+void rrdset_memory_file_update(RRDSET *st) {
+ if(!st->st_on_file) return;
+ struct rrdset_map_save_v019 *st_on_file = st->st_on_file;
+
+ st_on_file->current_entry = st->current_entry;
+ st_on_file->counter = st->counter;
+ st_on_file->usec_since_last_update = st->usec_since_last_update;
+ st_on_file->last_updated.tv_sec = st->last_updated.tv_sec;
+ st_on_file->last_updated.tv_usec = st->last_updated.tv_usec;
+ st_on_file->collected_total = st->collected_total;
+ st_on_file->last_collected_total = st->last_collected_total;
+}
- if (!labels->head) {
- labels->head = new_labels;
- } else {
- replace_label_list(labels, new_labels);
- }
+const char *rrdset_cache_filename(RRDSET *st) {
+ if(!st->st_on_file) return NULL;
+ struct rrdset_map_save_v019 *st_on_file = st->st_on_file;
+ return st_on_file->cache_filename;
+}
- netdata_rwlock_rdlock(&labels->labels_rwlock);
- struct label *lbl = labels->head;
- while (lbl) {
- sql_store_chart_label(st->chart_uuid, (int)lbl->label_source, lbl->key, lbl->value);
- lbl = lbl->next;
- }
- netdata_rwlock_unlock(&labels->labels_rwlock);
+void rrdset_memory_file_free(RRDSET *st) {
+ if(!st->st_on_file) return;
- st->state->new_labels = NULL;
-}
+ // needed for memory mode map, to save the latest state
+ rrdset_memory_file_update(st);
-void rrdset_update_labels(RRDSET *st, struct label *labels)
-{
- if (!labels)
- return;
+ struct rrdset_map_save_v019 *st_on_file = st->st_on_file;
+ munmap(st_on_file, st_on_file->memsize);
- update_label_list(&st->state->new_labels, labels);
- rrdset_finalize_labels(st);
+ // remove the pointers from the RRDDIM
+ st->st_on_file = NULL;
}
-int rrdset_contains_label_keylist(RRDSET *st, char *keylist)
-{
- struct label_index *labels = &st->state->labels;
- int ret;
+void rrdset_memory_file_save(RRDSET *st) {
+ if(!st->st_on_file) return;
- if (!labels->head)
- return 0;
+ rrdset_memory_file_update(st);
- netdata_rwlock_rdlock(&labels->labels_rwlock);
- ret = label_list_contains_keylist(labels->head, keylist);
- netdata_rwlock_unlock(&labels->labels_rwlock);
+ struct rrdset_map_save_v019 *st_on_file = st->st_on_file;
+ if(st_on_file->rrd_memory_mode != RRD_MEMORY_MODE_SAVE) return;
- return ret;
+ memory_file_save(st_on_file->cache_filename, st->st_on_file, st_on_file->memsize);
}
-struct label *rrdset_lookup_label_key(RRDSET *st, char *key, uint32_t key_hash)
-{
- struct label_index *labels = &st->state->labels;
- struct label *ret = NULL;
+bool rrdset_memory_load_or_create_map_save(RRDSET *st, RRD_MEMORY_MODE memory_mode) {
+ if(memory_mode != RRD_MEMORY_MODE_SAVE && memory_mode != RRD_MEMORY_MODE_MAP)
+ return false;
- if (labels->head) {
- netdata_rwlock_rdlock(&labels->labels_rwlock);
- ret = label_list_lookup_key(labels->head, key, key_hash);
- netdata_rwlock_unlock(&labels->labels_rwlock);
- }
- return ret;
-}
-
-static inline int k8s_space(char c) {
- switch(c) {
- case ':':
- case ',':
- return 1;
- default:
- return 0;
- }
-}
+ char fullfilename[FILENAME_MAX + 1];
+ snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", st->cache_dir);
-int rrdset_matches_label_keys(RRDSET *st, char *keylist, char *words[], uint32_t *hash_key_list, int *word_count, int size)
-{
- struct label_index *labels = &st->state->labels;
+ unsigned long size = sizeof(struct rrdset_map_save_v019);
+ struct rrdset_map_save_v019 *st_on_file = (struct rrdset_map_save_v019 *)netdata_mmap(
+ fullfilename, size,
+ ((memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE),
+ 0);
- if (!labels->head)
- return 0;
+ if(!st_on_file) return false;
- struct label *one_label;
+ time_t now = now_realtime_sec();
- if (!*word_count) {
- *word_count = quoted_strings_splitter(keylist, words, size, k8s_space, NULL, NULL, 0);
- for (int i = 0; i < *word_count - 1; i += 2) {
- hash_key_list[i] = simple_hash(words[i]);
- }
+ st_on_file->magic[sizeof(RRDSET_MAGIC_V019)] = '\0';
+ if(strcmp(st_on_file->magic, RRDSET_MAGIC_V019) != 0) {
+ info("Initializing file '%s'.", fullfilename);
+ memset(st_on_file, 0, size);
+ }
+ else if(strncmp(st_on_file->id, st->id, RRD_ID_LENGTH_MAX_V019) != 0) {
+ error("File '%s' contents are not for chart '%s'. Clearing it.", fullfilename, st->id);
+ memset(st_on_file, 0, size);
+ }
+ else if(st_on_file->memsize != size || st_on_file->entries != st->entries) {
+ error("File '%s' does not have the desired size. Clearing it.", fullfilename);
+ memset(st_on_file, 0, size);
+ }
+ else if(st_on_file->update_every != st->update_every) {
+ error("File '%s' does not have the desired granularity. Clearing it.", fullfilename);
+ memset(st_on_file, 0, size);
+ }
+ else if((now - st_on_file->last_updated.tv_sec) > st->update_every * st->entries) {
+ info("File '%s' is too old. Clearing it.", fullfilename);
+ memset(st_on_file, 0, size);
+ }
+ else if(st_on_file->last_updated.tv_sec > now + st->update_every) {
+ error("File '%s' refers to the future by %zd secs. Resetting it to now.", fullfilename, (ssize_t)(st_on_file->last_updated.tv_sec - now));
+ st_on_file->last_updated.tv_sec = now;
}
- int ret = 1;
- netdata_rwlock_rdlock(&labels->labels_rwlock);
- for (int i = 0; ret && i < *word_count - 1; i += 2) {
- one_label = label_list_lookup_key(labels->head, words[i], hash_key_list[i]);
- ret = (one_label && !strcmp(one_label->value, words[i + 1]));
+ if(st_on_file->current_entry >= st_on_file->entries)
+ st_on_file->current_entry = 0;
+
+ // make sure the database is aligned
+ bool align_last_updated = false;
+ if(st_on_file->last_updated.tv_sec) {
+ st_on_file->update_every = st->update_every;
+ align_last_updated = true;
}
- netdata_rwlock_unlock(&labels->labels_rwlock);
- return ret;
+
+ // copy the useful values to st
+ st->current_entry = st_on_file->current_entry;
+ st->counter = st_on_file->counter;
+ st->usec_since_last_update = st_on_file->usec_since_last_update;
+ st->last_updated.tv_sec = st_on_file->last_updated.tv_sec;
+ st->last_updated.tv_usec = st_on_file->last_updated.tv_usec;
+ st->collected_total = st_on_file->collected_total;
+ st->last_collected_total = st_on_file->last_collected_total;
+
+ // link it to st
+ st->st_on_file = st_on_file;
+
+ // clear everything
+ memset(st_on_file, 0, size);
+
+ // set the values we need
+ strncpyz(st_on_file->id, st->id, RRD_ID_LENGTH_MAX_V019 + 1);
+ strcpy(st_on_file->cache_filename, fullfilename);
+ strcpy(st_on_file->magic, RRDSET_MAGIC_V019);
+ st_on_file->memsize = size;
+ st_on_file->entries = st->entries;
+ st_on_file->update_every = st->update_every;
+ st_on_file->rrd_memory_mode = memory_mode;
+
+ if(align_last_updated)
+ last_updated_time_align(st);
+
+ // copy the useful values back to st_on_file
+ rrdset_memory_file_update(st);
+
+ return true;
}