diff options
Diffstat (limited to 'database/rrdset.c')
-rw-r--r-- | database/rrdset.c | 933 |
1 files changed, 493 insertions, 440 deletions
diff --git a/database/rrdset.c b/database/rrdset.c index e7cb89df..9693ee21 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -80,17 +80,9 @@ static inline RRDSET *rrdset_index_find_name(RRDHOST *host, const char *name, ui tmp.name = name; tmp.hash_name = (hash)?hash:simple_hash(tmp.name); - // fprintf(stderr, "SEARCHING: %s\n", name); result = avl_search_lock(&host->rrdset_root_index_name, (avl_t *) (&(tmp.avlname))); - if(result) { - RRDSET *st = rrdset_from_avlname(result); - if(strcmp(st->magic, RRDSET_MAGIC) != 0) - error("Search for RRDSET %s returned an invalid RRDSET %s (name %s)", name, st->id, st->name); + if(result) return rrdset_from_avlname(result); - // fprintf(stderr, "FOUND: %s\n", name); - return rrdset_from_avlname(result); - } - // fprintf(stderr, "NOT FOUND: %s\n", name); return NULL; } @@ -194,6 +186,7 @@ int rrdset_set_name(RRDSET *st, const char *name) { rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE); rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + rrdcontext_updated_rrdset_name(st); return 2; } @@ -212,6 +205,7 @@ inline void rrdset_is_obsolete(RRDSET *st) { // the chart will not get more updates (data collection) // so, we have to push its definition now rrdset_push_chart_definition_now(st); + rrdcontext_updated_rrdset_flags(st); } } @@ -224,6 +218,7 @@ inline void rrdset_isnot_obsolete(RRDSET *st) { // the chart will be pushed upstream automatically // due to data collection + rrdcontext_updated_rrdset_flags(st); } } @@ -259,6 +254,7 @@ inline void rrdset_update_heterogeneous_flag(RRDSET *st) { } rrdset_flag_clear(st, RRDSET_FLAG_HETEROGENEOUS); + rrdcontext_updated_rrdset_flags(st); } // ---------------------------------------------------------------------------- @@ -281,12 +277,13 @@ void rrdset_reset(RRDSET *st) { rd->last_collected_time.tv_sec = 0; rd->last_collected_time.tv_usec = 0; rd->collections_counter = 0; - // memset(rd->values, 0, rd->entries * sizeof(storage_number)); -#ifdef ENABLE_DBENGINE - if (RRD_MEMORY_MODE_DBENGINE == st->rrd_memory_mode && !rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) { - rrdeng_store_metric_flush_current_page(rd); + + if(!rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) { + for(int tier = 0; tier < storage_tiers ;tier++) { + if(rd->tiers[tier]) + rd->tiers[tier]->collect_ops.flush(rd->tiers[tier]->db_collection_handle); + } } -#endif } } @@ -294,20 +291,27 @@ void rrdset_reset(RRDSET *st) { // RRDSET - helpers for rrdset_create() inline long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries) { - if(unlikely(entries < 5)) entries = 5; - if(unlikely(entries > RRD_HISTORY_ENTRIES_MAX)) entries = RRD_HISTORY_ENTRIES_MAX; + if(mode == RRD_MEMORY_MODE_DBENGINE) return 0; + if(mode == RRD_MEMORY_MODE_NONE) return 5; - if(unlikely(mode == RRD_MEMORY_MODE_NONE || mode == RRD_MEMORY_MODE_ALLOC)) - return entries; + if(entries < 5) entries = 5; + if(entries > RRD_HISTORY_ENTRIES_MAX) entries = RRD_HISTORY_ENTRIES_MAX; - long page = (size_t)sysconf(_SC_PAGESIZE); - long size = sizeof(RRDDIM) + entries * sizeof(storage_number); - if(unlikely(size % page)) { - size -= (size % page); - size += page; + if(mode == RRD_MEMORY_MODE_MAP || mode == RRD_MEMORY_MODE_SAVE || mode == RRD_MEMORY_MODE_RAM) { + long header_size = 0; - long n = (size - sizeof(RRDDIM)) / sizeof(storage_number); - return n; + if(mode == RRD_MEMORY_MODE_MAP || mode == RRD_MEMORY_MODE_SAVE) + header_size = (long)rrddim_memory_file_header_size(); + + long page = (long)sysconf(_SC_PAGESIZE); + long size = (long)(header_size + entries * sizeof(storage_number)); + if (unlikely(size % page)) { + size -= (size % page); + size += page; + + long n = (long)((size - header_size) / sizeof(storage_number)); + return n; + } } return entries; @@ -383,11 +387,13 @@ void rrdset_free(RRDSET *st) { rrdset_unlock(st); + // this has to be after the dimensions are freed + rrdcontext_removed_rrdset(st); + // ------------------------------------------------------------------------ // free it netdata_rwlock_destroy(&st->rrdset_rwlock); - netdata_rwlock_destroy(&st->state->labels.labels_rwlock); // free directly allocated members freez((void *)st->name); @@ -402,77 +408,51 @@ void rrdset_free(RRDSET *st) { freez(st->state->old_title); freez(st->state->old_units); freez(st->state->old_context); - free_label_list(st->state->labels.head); + rrdlabels_destroy(st->state->chart_labels); freez(st->state); freez(st->chart_uuid); - switch(st->rrd_memory_mode) { - case RRD_MEMORY_MODE_SAVE: - case RRD_MEMORY_MODE_MAP: - case RRD_MEMORY_MODE_RAM: - debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name); - munmap(st, st->memsize); - break; - - case RRD_MEMORY_MODE_ALLOC: - case RRD_MEMORY_MODE_NONE: - case RRD_MEMORY_MODE_DBENGINE: - freez(st); - break; - } - + rrdset_memory_file_free(st); + freez(st); } void rrdset_save(RRDSET *st) { rrdset_check_rdlock(st); - // info("Saving chart '%s' ('%s')", st->id, st->name); - - if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) { - debug(D_RRD_STATS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename); - memory_file_save(st->cache_filename, st, st->memsize); - } + rrdset_memory_file_save(st); RRDDIM *rd; - rrddim_foreach_read(rd, st) { - if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE)) { - debug(D_RRD_STATS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename); - memory_file_save(rd->cache_filename, rd, rd->memsize); - } - } + rrddim_foreach_read(rd, st) + rrddim_memory_file_save(rd); } -void rrdset_delete_custom(RRDSET *st, int db_rotated) { +void rrdset_delete_files(RRDSET *st) { RRDDIM *rd; -#ifndef ENABLE_ACLK - UNUSED(db_rotated); -#endif rrdset_check_rdlock(st); info("Deleting chart '%s' ('%s') from disk...", st->id, st->name); if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || st->rrd_memory_mode == RRD_MEMORY_MODE_MAP) { - info("Deleting chart header file '%s'.", st->cache_filename); - if(unlikely(unlink(st->cache_filename) == -1)) - error("Cannot delete chart header file '%s'", st->cache_filename); + const char *cache_filename = rrdset_cache_filename(st); + if(cache_filename) { + info("Deleting chart header file '%s'.", cache_filename); + if (unlikely(unlink(cache_filename) == -1)) + error("Cannot delete chart header file '%s'", cache_filename); + } + else + error("Cannot find the cache filename of chart '%s'", st->id); } rrddim_foreach_read(rd, st) { - if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || rd->rrd_memory_mode == RRD_MEMORY_MODE_MAP)) { - info("Deleting dimension file '%s'.", rd->cache_filename); - if(unlikely(unlink(rd->cache_filename) == -1)) - error("Cannot delete dimension file '%s'", rd->cache_filename); - } - } + const char *cache_filename = rrddim_cache_filename(rd); + if(!cache_filename) continue; - recursively_delete_dir(st->cache_dir, "left-over chart"); -#ifdef ENABLE_ACLK - if ((netdata_cloud_setting) && (db_rotated || RRD_MEMORY_MODE_DBENGINE != st->rrd_memory_mode)) { - aclk_del_collector(st->rrdhost, st->plugin_name, st->module_name); - st->rrdhost->deleted_charts_count++; + info("Deleting dimension file '%s'.", cache_filename); + if(unlikely(unlink(cache_filename) == -1)) + error("Cannot delete dimension file '%s'", cache_filename); } -#endif + recursively_delete_dir(st->cache_dir, "left-over chart"); } void rrdset_delete_obsolete_dimensions(RRDSET *st) { @@ -484,11 +464,11 @@ void rrdset_delete_obsolete_dimensions(RRDSET *st) { rrddim_foreach_read(rd, st) { if(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)) { - if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || rd->rrd_memory_mode == RRD_MEMORY_MODE_MAP)) { - info("Deleting dimension file '%s'.", rd->cache_filename); - if(unlikely(unlink(rd->cache_filename) == -1)) - error("Cannot delete dimension file '%s'", rd->cache_filename); - } + const char *cache_filename = rrddim_cache_filename(rd); + if(!cache_filename) continue; + info("Deleting dimension file '%s'.", cache_filename); + if(unlikely(unlink(cache_filename) == -1)) + error("Cannot delete dimension file '%s'", cache_filename); } } } @@ -507,6 +487,14 @@ static inline RRDSET *rrdset_find_on_create(RRDHOST *host, const char *fullid) { return NULL; } +static inline void rrdset_update_permanent_labels(RRDSET *st) { + if(!st->state || !st->state->chart_labels) return; + + rrdlabels_add(st->state->chart_labels, "_collect_plugin", st->plugin_name, RRDLABEL_SRC_AUTO| RRDLABEL_FLAG_PERMANENT); + rrdlabels_add(st->state->chart_labels, "_collect_module", st->module_name, RRDLABEL_SRC_AUTO| RRDLABEL_FLAG_PERMANENT); + rrdlabels_add(st->state->chart_labels, "_instance_family", st->family, RRDLABEL_SRC_AUTO| RRDLABEL_FLAG_PERMANENT); +} + RRDSET *rrdset_create_custom( RRDHOST *host , const char *type @@ -660,22 +648,7 @@ RRDSET *rrdset_create_custom( } if (mark_rebuild) { -#ifdef ENABLE_ACLK - if (netdata_cloud_setting) { - if (mark_rebuild & META_CHART_ACTIVATED) { - aclk_add_collector(host, st->plugin_name, st->module_name); - } - else { - if (mark_rebuild & (META_PLUGIN_UPDATED | META_MODULE_UPDATED)) { - aclk_del_collector( - host, mark_rebuild & META_PLUGIN_UPDATED ? old_plugin : st->plugin_name, - mark_rebuild & META_MODULE_UPDATED ? old_module : st->module_name); - aclk_add_collector(host, st->plugin_name, st->module_name); - } - } - rrdset_flag_clear(st, RRDSET_FLAG_ACLK); - } -#endif + rrdset_flag_clear(st, RRDSET_FLAG_ACLK); freez(old_plugin); freez(old_module); freez(old_title); @@ -701,8 +674,11 @@ RRDSET *rrdset_create_custom( } } /* Fall-through during switch from archived to active so that the host lock is taken and health is linked */ - if (!changed_from_archived_to_active) + if (!changed_from_archived_to_active) { + rrdset_update_permanent_labels(st); + rrdcontext_updated_rrdset(st); return st; + } } rrdhost_wrlock(host); @@ -722,11 +698,10 @@ RRDSET *rrdset_create_custom( rrdhost_unlock(host); rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK); rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + rrdcontext_updated_rrdset(st); return st; } - char fullfilename[FILENAME_MAX + 1]; - // ------------------------------------------------------------------------ // get the options from the config, we need to create it @@ -734,126 +709,37 @@ RRDSET *rrdset_create_custom( if (memory_mode != RRD_MEMORY_MODE_DBENGINE) entries = align_entries_to_pagesize(memory_mode, history_entries); - unsigned long size = sizeof(RRDSET); char *cache_dir = rrdset_cache_dir(host, fullid); - time_t now = now_realtime_sec(); - // ------------------------------------------------------------------------ // load it or allocate it debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id); - snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir); - if(memory_mode == RRD_MEMORY_MODE_SAVE || memory_mode == RRD_MEMORY_MODE_MAP || - memory_mode == RRD_MEMORY_MODE_RAM) { - st = (RRDSET *)netdata_mmap( - (memory_mode == RRD_MEMORY_MODE_RAM) ? NULL : fullfilename, - size, - ((memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE), - 0); - - if(st) { - memset(&st->avl, 0, sizeof(avl_t)); - memset(&st->avlname, 0, sizeof(avl_t)); - memset(&st->rrdvar_root_index, 0, sizeof(avl_tree_lock)); - memset(&st->dimensions_index, 0, sizeof(avl_tree_lock)); - memset(&st->rrdset_rwlock, 0, sizeof(netdata_rwlock_t)); - - st->name = NULL; - st->type = NULL; - st->family = NULL; - st->title = NULL; - st->units = NULL; - st->context = NULL; - st->cache_dir = NULL; - st->plugin_name = NULL; - st->module_name = NULL; - st->dimensions = NULL; - st->rrdfamily = NULL; - st->rrdhost = NULL; - st->next = NULL; - st->variables = NULL; - st->alarms = NULL; - st->flags = 0x00000000; - st->exporting_flags = NULL; - - if(memory_mode == RRD_MEMORY_MODE_RAM) { - memset(st, 0, size); - } - else { - if(strcmp(st->magic, RRDSET_MAGIC) != 0) { - info("Initializing file %s.", fullfilename); - memset(st, 0, size); - } - else if(strcmp(st->id, fullid) != 0) { - error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid); - // munmap(st, size); - // st = NULL; - memset(st, 0, size); - } - else if(st->memsize != size || st->entries != entries) { - error("File %s does not have the desired size. Clearing it.", fullfilename); - memset(st, 0, size); - } - else if(st->update_every != update_every) { - error("File %s does not have the desired update frequency. Clearing it.", fullfilename); - memset(st, 0, size); - } - else if((now - st->last_updated.tv_sec) > update_every * entries) { - info("File %s is too old. Clearing it.", fullfilename); - memset(st, 0, size); - } - else if(st->last_updated.tv_sec > now + update_every) { - error("File %s refers to the future by %zd secs. Resetting it to now.", fullfilename, (ssize_t)(st->last_updated.tv_sec - now)); - st->last_updated.tv_sec = now; - } - - // make sure the database is aligned - if(st->last_updated.tv_sec) { - st->update_every = update_every; - last_updated_time_align(st); - } - } - - // make sure we have the right memory mode - // even if we cleared the memory - st->rrd_memory_mode = memory_mode; - } - } - - if(unlikely(!st)) { - st = callocz(1, size); - if (memory_mode == RRD_MEMORY_MODE_DBENGINE) - st->rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE; - else - st->rrd_memory_mode = (memory_mode == RRD_MEMORY_MODE_NONE) ? RRD_MEMORY_MODE_NONE : RRD_MEMORY_MODE_ALLOC; - } - - st->plugin_name = plugin?strdupz(plugin):NULL; - st->module_name = module?strdupz(module):NULL; - - st->rrdhost = host; - st->memsize = size; - st->entries = entries; - st->update_every = update_every; - - if(st->current_entry >= st->entries) st->current_entry = 0; - - strcpy(st->cache_filename, fullfilename); - strcpy(st->magic, RRDSET_MAGIC); + st = callocz(1, sizeof(RRDSET)); + st->state = callocz(1, sizeof(*st->state)); strcpy(st->id, fullid); st->hash = simple_hash(st->id); + st->rrdhost = host; st->cache_dir = cache_dir; + st->entries = entries; + st->update_every = update_every; - st->chart_type = chart_type; - st->type = strdupz(type); - - st->state = callocz(1, sizeof(*st->state)); + if(memory_mode == RRD_MEMORY_MODE_SAVE || memory_mode == RRD_MEMORY_MODE_MAP) { + if(!rrdset_memory_load_or_create_map_save(st, memory_mode)) { + info("Failed to use memory mode %s for chart '%s', falling back to ram", (memory_mode == RRD_MEMORY_MODE_MAP)?"map":"save", st->name); + memory_mode = RRD_MEMORY_MODE_RAM; + } + } + st->rrd_memory_mode = memory_mode; - st->family = family ? strdupz(family) : strdupz(st->type); + st->plugin_name = plugin?strdupz(plugin):NULL; + st->module_name = module?strdupz(module):NULL; + st->chart_type = chart_type; + st->type = strdupz(type); + st->family = family ? strdupz(family) : strdupz(st->type); json_fix_string(st->family); st->state->is_ar_chart = strcmp(st->id, ML_ANOMALY_RATES_CHART_ID) == 0; @@ -874,21 +760,14 @@ RRDSET *rrdset_create_custom( st->green = NAN; st->red = NAN; - st->last_collected_time.tv_sec = 0; - st->last_collected_time.tv_usec = 0; - st->counter_done = 0; - st->rrddim_page_alignment = 0; - st->gap_when_lost_iterations_above = (int) (gap_when_lost_iterations_above + 2); - st->last_accessed_time = 0; - st->upstream_resync_time = 0; - avl_init_lock(&st->dimensions_index, rrddim_compare); avl_init_lock(&st->rrdvar_root_index, rrdvar_compare); netdata_rwlock_init(&st->rrdset_rwlock); - netdata_rwlock_init(&st->state->labels.labels_rwlock); + st->state->chart_labels = rrdlabels_create(); + rrdset_update_permanent_labels(st); if(name && *name && rrdset_set_name(st, name)) // we did set the name @@ -930,11 +809,7 @@ RRDSET *rrdset_create_custom( compute_chart_hash(st); rrdhost_unlock(host); -#ifdef ENABLE_ACLK - if (netdata_cloud_setting) - aclk_add_collector(host, plugin, module); - rrdset_flag_clear(st, RRDSET_FLAG_ACLK); -#endif + rrdcontext_updated_rrdset(st); return(st); } @@ -994,7 +869,8 @@ inline void rrdset_next_usec(RRDSET *st, usec_t microseconds) { if(unlikely(since_last_usec < 0)) { // oops! the database is in the future #ifdef NETDATA_INTERNAL_CHECKS - info("RRD database for chart '%s' on host '%s' is %0.5" LONG_DOUBLE_MODIFIER " secs in the future (counter #%zu, update #%zu). Adjusting it to current time.", st->id, st->rrdhost->hostname, (LONG_DOUBLE)-since_last_usec / USEC_PER_SEC, st->counter, st->counter_done); + info("RRD database for chart '%s' on host '%s' is %0.5" NETDATA_DOUBLE_MODIFIER + " secs in the future (counter #%zu, update #%zu). Adjusting it to current time.", st->id, st->rrdhost->hostname, (NETDATA_DOUBLE)-since_last_usec / USEC_PER_SEC, st->counter, st->counter_done); #endif st->last_collected_time.tv_sec = now.tv_sec - st->update_every; @@ -1013,7 +889,8 @@ inline void rrdset_next_usec(RRDSET *st, usec_t microseconds) { else if(unlikely((usec_t)since_last_usec > (usec_t)(st->update_every * 5 * USEC_PER_SEC))) { // oops! the database is too far behind #ifdef NETDATA_INTERNAL_CHECKS - info("RRD database for chart '%s' on host '%s' is %0.5" LONG_DOUBLE_MODIFIER " secs in the past (counter #%zu, update #%zu). Adjusting it to current time.", st->id, st->rrdhost->hostname, (LONG_DOUBLE)since_last_usec / USEC_PER_SEC, st->counter, st->counter_done); + info("RRD database for chart '%s' on host '%s' is %0.5" NETDATA_DOUBLE_MODIFIER + " secs in the past (counter #%zu, update #%zu). Adjusting it to current time.", st->id, st->rrdhost->hostname, (NETDATA_DOUBLE)since_last_usec / USEC_PER_SEC, st->counter, st->counter_done); #endif microseconds = (usec_t)since_last_usec; @@ -1070,7 +947,7 @@ static inline usec_t rrdset_init_last_collected_time(RRDSET *st) { usec_t last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec; #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "initialized last collected time to %0.3" LONG_DOUBLE_MODIFIER, (LONG_DOUBLE)last_collect_ut / USEC_PER_SEC); + rrdset_debug(st, "initialized last collected time to %0.3" NETDATA_DOUBLE_MODIFIER, (NETDATA_DOUBLE)last_collect_ut / USEC_PER_SEC); #endif return last_collect_ut; @@ -1083,7 +960,7 @@ static inline usec_t rrdset_update_last_collected_time(RRDSET *st) { st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC); #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "updated last collected time to %0.3" LONG_DOUBLE_MODIFIER, (LONG_DOUBLE)last_collect_ut / USEC_PER_SEC); + rrdset_debug(st, "updated last collected time to %0.3" NETDATA_DOUBLE_MODIFIER, (NETDATA_DOUBLE)last_collect_ut / USEC_PER_SEC); #endif return last_collect_ut; @@ -1102,12 +979,110 @@ static inline usec_t rrdset_init_last_updated_time(RRDSET *st) { usec_t last_updated_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec; #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "initialized last updated time to %0.3" LONG_DOUBLE_MODIFIER, (LONG_DOUBLE)last_updated_ut / USEC_PER_SEC); + rrdset_debug(st, "initialized last updated time to %0.3" NETDATA_DOUBLE_MODIFIER, (NETDATA_DOUBLE)last_updated_ut / USEC_PER_SEC); #endif return last_updated_ut; } +static inline time_t tier_next_point_time(RRDDIM *rd, struct rrddim_tier *t, time_t now) { + time_t loop = (time_t)rd->update_every * (time_t)t->tier_grouping; + return now + loop - ((now + loop) % loop); +} + +void store_metric_at_tier(RRDDIM *rd, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut) { + if (unlikely(!t->next_point_time)) + t->next_point_time = tier_next_point_time(rd, t, sp.end_time); + + // merge the dates into our virtual point + if (unlikely(sp.start_time < t->virtual_point.start_time)) + t->virtual_point.start_time = sp.start_time; + + if (likely(sp.end_time > t->virtual_point.end_time)) + t->virtual_point.end_time = sp.end_time; + + // merge the values into our virtual point + if (likely(!storage_point_is_empty(sp))) { + // we aggregate only non NULLs into higher tiers + + if (likely(!storage_point_is_unset(t->virtual_point))) { + // merge the collected point to our virtual one + t->virtual_point.sum += sp.sum; + t->virtual_point.min = MIN(t->virtual_point.min, sp.min); + t->virtual_point.max = MAX(t->virtual_point.max, sp.max); + t->virtual_point.count += sp.count; + t->virtual_point.anomaly_count += sp.anomaly_count; + t->virtual_point.flags |= sp.flags; + } + else { + // reset our virtual point to this one + t->virtual_point = sp; + } + } + + if(unlikely(sp.end_time >= t->next_point_time)) { + if (likely(!storage_point_is_unset(t->virtual_point))) { + + t->collect_ops.store_metric( + t->db_collection_handle, + now_ut, + t->virtual_point.sum, + t->virtual_point.min, + t->virtual_point.max, + t->virtual_point.count, + t->virtual_point.anomaly_count, + t->virtual_point.flags); + } + else { + t->collect_ops.store_metric( + t->db_collection_handle, + now_ut, + NAN, + NAN, + NAN, + 0, + 0, SN_FLAG_NONE); + } + + t->virtual_point.count = 0; + t->next_point_time = tier_next_point_time(rd, t, sp.end_time); + } +} + +static void store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) { + + // store the metric on tier 0 + rd->tiers[0]->collect_ops.store_metric(rd->tiers[0]->db_collection_handle, point_end_time_ut, n, 0, 0, 1, 0, flags); + + for(int tier = 1; tier < storage_tiers ;tier++) { + if(unlikely(!rd->tiers[tier])) continue; + + struct rrddim_tier *t = rd->tiers[tier]; + + time_t now = (time_t)(point_end_time_ut / USEC_PER_SEC); + + if(!t->last_collected_ut) { + // we have not collected this tier before + // let's fill any gap that may exist + rrdr_fill_tier_gap_from_smaller_tiers(rd, tier, now); + } + + STORAGE_POINT sp = { + .start_time = now - rd->update_every, + .end_time = now, + .min = n, + .max = n, + .sum = n, + .count = 1, + .anomaly_count = (flags & SN_FLAG_NOT_ANOMALOUS) ? 0 : 1, + .flags = flags + }; + + t->last_collected_ut = point_end_time_ut; + store_metric_at_tier(rd, t, sp, point_end_time_ut); + } +} + static inline size_t rrdset_done_interpolate( RRDSET *st , usec_t update_every_ut @@ -1131,17 +1106,17 @@ static inline size_t rrdset_done_interpolate( size_t counter = st->counter; long current_entry = st->current_entry; - uint32_t storage_flags = SN_DEFAULT_FLAGS; + SN_FLAGS storage_flags = SN_DEFAULT_FLAGS; if (has_reset_value) - storage_flags |= SN_EXISTS_RESET; + storage_flags |= SN_FLAG_RESET; for( ; next_store_ut <= now_collect_ut ; last_collect_ut = next_store_ut, next_store_ut += update_every_ut, iterations-- ) { #ifdef NETDATA_INTERNAL_CHECKS if(iterations < 0) { error("INTERNAL CHECK: %s: iterations calculation wrapped! first_ut = %llu, last_stored_ut = %llu, next_store_ut = %llu, now_collect_ut = %llu", st->name, first_ut, last_stored_ut, next_store_ut, now_collect_ut); } - rrdset_debug(st, "last_stored_ut = %0.3" LONG_DOUBLE_MODIFIER " (last updated time)", (LONG_DOUBLE)last_stored_ut/USEC_PER_SEC); - rrdset_debug(st, "next_store_ut = %0.3" LONG_DOUBLE_MODIFIER " (next interpolation point)", (LONG_DOUBLE)next_store_ut/USEC_PER_SEC); + rrdset_debug(st, "last_stored_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last updated time)", (NETDATA_DOUBLE)last_stored_ut/USEC_PER_SEC); + rrdset_debug(st, "next_store_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (next interpolation point)", (NETDATA_DOUBLE)next_store_ut/USEC_PER_SEC); #endif last_ut = next_store_ut; @@ -1150,20 +1125,19 @@ static inline size_t rrdset_done_interpolate( if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) continue; - calculated_number new_value; + NETDATA_DOUBLE new_value; switch(rd->algorithm) { case RRD_ALGORITHM_INCREMENTAL: - new_value = (calculated_number) + new_value = (NETDATA_DOUBLE) ( rd->calculated_value - * (calculated_number)(next_store_ut - last_collect_ut) - / (calculated_number)(now_collect_ut - last_collect_ut) + * (NETDATA_DOUBLE)(next_store_ut - last_collect_ut) + / (NETDATA_DOUBLE)(now_collect_ut - last_collect_ut) ); #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: CALC2 INC " - CALCULATED_NUMBER_FORMAT " = " - CALCULATED_NUMBER_FORMAT + rrdset_debug(st, "%s: CALC2 INC " NETDATA_DOUBLE_FORMAT " = " + NETDATA_DOUBLE_FORMAT " * (%llu - %llu)" " / (%llu - %llu)" , rd->name @@ -1177,18 +1151,18 @@ static inline size_t rrdset_done_interpolate( rd->calculated_value -= new_value; new_value += rd->last_calculated_value; rd->last_calculated_value = 0; - new_value /= (calculated_number)st->update_every; + new_value /= (NETDATA_DOUBLE)st->update_every; if(unlikely(next_store_ut - last_stored_ut < update_every_ut)) { #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: COLLECTION POINT IS SHORT " CALCULATED_NUMBER_FORMAT " - EXTRAPOLATING", + rrdset_debug(st, "%s: COLLECTION POINT IS SHORT " NETDATA_DOUBLE_FORMAT " - EXTRAPOLATING", rd->name - , (calculated_number)(next_store_ut - last_stored_ut) + , (NETDATA_DOUBLE)(next_store_ut - last_stored_ut) ); #endif - new_value = new_value * (calculated_number)(st->update_every * USEC_PER_SEC) / (calculated_number)(next_store_ut - last_stored_ut); + new_value = new_value * (NETDATA_DOUBLE)(st->update_every * USEC_PER_SEC) / (NETDATA_DOUBLE)(next_store_ut - last_stored_ut); } break; @@ -1207,21 +1181,19 @@ static inline size_t rrdset_done_interpolate( // we have missed an update // interpolate in the middle values - new_value = (calculated_number) + new_value = (NETDATA_DOUBLE) ( ( (rd->calculated_value - rd->last_calculated_value) - * (calculated_number)(next_store_ut - last_collect_ut) - / (calculated_number)(now_collect_ut - last_collect_ut) + * (NETDATA_DOUBLE)(next_store_ut - last_collect_ut) + / (NETDATA_DOUBLE)(now_collect_ut - last_collect_ut) ) + rd->last_calculated_value ); #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: CALC2 DEF " - CALCULATED_NUMBER_FORMAT " = (((" - "(" CALCULATED_NUMBER_FORMAT " - " CALCULATED_NUMBER_FORMAT ")" + rrdset_debug(st, "%s: CALC2 DEF " NETDATA_DOUBLE_FORMAT " = (((" + "(" NETDATA_DOUBLE_FORMAT " - " NETDATA_DOUBLE_FORMAT ")" " * %llu" - " / %llu) + " CALCULATED_NUMBER_FORMAT - , rd->name + " / %llu) + " NETDATA_DOUBLE_FORMAT, rd->name , new_value , rd->calculated_value, rd->last_calculated_value , (next_store_ut - first_ut) @@ -1234,9 +1206,7 @@ static inline size_t rrdset_done_interpolate( if(unlikely(!store_this_entry)) { (void) ml_is_anomalous(rd, 0, false); - - rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT); -// rd->values[current_entry] = SN_EMPTY_SLOT; + store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE); continue; } @@ -1245,69 +1215,26 @@ static inline size_t rrdset_done_interpolate( if (ml_is_anomalous(rd, new_value, true)) { // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous - dim_storage_flags &= ~ ((uint32_t) SN_ANOMALY_BIT); + dim_storage_flags &= ~((storage_number)SN_FLAG_NOT_ANOMALOUS); } - rd->state->collect_ops.store_metric(rd, next_store_ut, pack_storage_number(new_value, dim_storage_flags)); -// rd->values[current_entry] = pack_storage_number(new_value, storage_flags ); + store_metric(rd, next_store_ut, new_value, dim_storage_flags); rd->last_stored_value = new_value; - - #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: STORE[%ld] " - CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT - , rd->name - , current_entry - , unpack_storage_number(rd->values[current_entry]), new_value - ); - #endif } else { (void) ml_is_anomalous(rd, 0, false); #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING " - , rd->name - , current_entry - ); + rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING ", rd->name, current_entry); #endif -// rd->values[current_entry] = SN_EMPTY_SLOT; - rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT); + store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE); rd->last_stored_value = NAN; } stored_entries++; - - #ifdef NETDATA_INTERNAL_CHECKS - if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) { - calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor; - calculated_number t2 = unpack_storage_number(rd->values[current_entry]); - - calculated_number accuracy = accuracy_loss(t1, t2); - debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)" - , st->id, rd->name - , current_entry - , t2 - , t1 - , accuracy - , (accuracy > ACCURACY_LOSS_ACCEPTED_PERCENT) ? " **TOO BIG** " : "" - ); - - rd->collected_volume += t1; - rd->stored_volume += t2; - - accuracy = accuracy_loss(rd->collected_volume, rd->stored_volume); - debug(D_RRD_STATS, "%s/%s: VOLUME[%ld] = " CALCULATED_NUMBER_FORMAT ", calculated = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s" - , st->id, rd->name - , current_entry - , rd->stored_volume - , rd->collected_volume - , accuracy - , (accuracy > ACCURACY_LOSS_ACCEPTED_PERCENT) ? " **TOO BIG** " : "" - ); - } - #endif } + // reset the storage flags for the next point, if any; storage_flags = SN_DEFAULT_FLAGS; @@ -1344,7 +1271,7 @@ static inline void rrdset_done_fill_the_gap(RRDSET *st) { long current_entry = st->current_entry; for(c = 0; c < entries && next_store_ut <= now_collect_ut ; next_store_ut += update_every_ut, c++) { - rd->values[current_entry] = SN_EMPTY_SLOT; + rd->db[current_entry] = pack_storage_number(NAN, SN_FLAG_NONE); current_entry = ((current_entry + 1) >= entries) ? 0 : current_entry + 1; #ifdef NETDATA_INTERNAL_CHECKS @@ -1368,6 +1295,7 @@ void rrdset_done(RRDSET *st) { if(unlikely(netdata_exit)) return; debug(D_RRD_CALLS, "rrdset_done() for chart %s", st->name); + rrdcontext_collected_rrdset(st); RRDDIM *rd; @@ -1405,7 +1333,8 @@ void rrdset_done(RRDSET *st) { // check if the chart has a long time to be updated if(unlikely(st->usec_since_last_update > st->entries * update_every_ut && st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE && st->rrd_memory_mode != RRD_MEMORY_MODE_NONE)) { - info("host '%s', chart %s: took too long to be updated (counter #%zu, update #%zu, %0.3" LONG_DOUBLE_MODIFIER " secs). Resetting it.", st->rrdhost->hostname, st->name, st->counter, st->counter_done, (LONG_DOUBLE)st->usec_since_last_update / USEC_PER_SEC); + info("host '%s', chart %s: took too long to be updated (counter #%zu, update #%zu, %0.3" NETDATA_DOUBLE_MODIFIER + " secs). Resetting it.", st->rrdhost->hostname, st->name, st->counter, st->counter_done, (NETDATA_DOUBLE)st->usec_since_last_update / USEC_PER_SEC); rrdset_reset(st); st->usec_since_last_update = update_every_ut; store_this_entry = 0; @@ -1504,6 +1433,7 @@ void rrdset_done(RRDSET *st) { // and we have collected metrics for this chart in the past (st->counter != 0) // fill the gap (the chart has been just loaded from disk) if(unlikely(st->counter) && st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) { + // TODO this should be inside the storage engine rrdset_done_fill_the_gap(st); last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec; next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC; @@ -1531,6 +1461,7 @@ void rrdset_done(RRDSET *st) { #endif } } + after_first_database_work: st->counter_done++; @@ -1541,10 +1472,10 @@ after_first_database_work: } #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "last_collect_ut = %0.3" LONG_DOUBLE_MODIFIER " (last collection time)", (LONG_DOUBLE)last_collect_ut/USEC_PER_SEC); - rrdset_debug(st, "now_collect_ut = %0.3" LONG_DOUBLE_MODIFIER " (current collection time)", (LONG_DOUBLE)now_collect_ut/USEC_PER_SEC); - rrdset_debug(st, "last_stored_ut = %0.3" LONG_DOUBLE_MODIFIER " (last updated time)", (LONG_DOUBLE)last_stored_ut/USEC_PER_SEC); - rrdset_debug(st, "next_store_ut = %0.3" LONG_DOUBLE_MODIFIER " (next interpolation point)", (LONG_DOUBLE)next_store_ut/USEC_PER_SEC); + rrdset_debug(st, "last_collect_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last collection time)", (NETDATA_DOUBLE)last_collect_ut/USEC_PER_SEC); + rrdset_debug(st, "now_collect_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (current collection time)", (NETDATA_DOUBLE)now_collect_ut/USEC_PER_SEC); + rrdset_debug(st, "last_stored_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last updated time)", (NETDATA_DOUBLE)last_stored_ut/USEC_PER_SEC); + rrdset_debug(st, "next_store_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (next interpolation point)", (NETDATA_DOUBLE)next_store_ut/USEC_PER_SEC); #endif // calculate totals and count the dimensions @@ -1553,7 +1484,9 @@ after_first_database_work: rrddim_foreach_read(rd, st) { if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) continue; + dimensions++; + if(likely(rd->updated)) st->collected_total += rd->collected_value; } @@ -1581,9 +1514,8 @@ after_first_database_work: rrdset_debug(st, "%s: START " " last_collected_value = " COLLECTED_NUMBER_FORMAT " collected_value = " COLLECTED_NUMBER_FORMAT - " last_calculated_value = " CALCULATED_NUMBER_FORMAT - " calculated_value = " CALCULATED_NUMBER_FORMAT - , rd->name + " last_calculated_value = " NETDATA_DOUBLE_FORMAT + " calculated_value = " NETDATA_DOUBLE_FORMAT, rd->name , rd->last_collected_value , rd->collected_value , rd->last_calculated_value @@ -1593,21 +1525,19 @@ after_first_database_work: switch(rd->algorithm) { case RRD_ALGORITHM_ABSOLUTE: - rd->calculated_value = (calculated_number)rd->collected_value - * (calculated_number)rd->multiplier - / (calculated_number)rd->divisor; + rd->calculated_value = (NETDATA_DOUBLE)rd->collected_value + * (NETDATA_DOUBLE)rd->multiplier + / (NETDATA_DOUBLE)rd->divisor; #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: CALC ABS/ABS-NO-IN " - CALCULATED_NUMBER_FORMAT " = " + rrdset_debug(st, "%s: CALC ABS/ABS-NO-IN " NETDATA_DOUBLE_FORMAT " = " COLLECTED_NUMBER_FORMAT - " * " CALCULATED_NUMBER_FORMAT - " / " CALCULATED_NUMBER_FORMAT - , rd->name + " * " NETDATA_DOUBLE_FORMAT + " / " NETDATA_DOUBLE_FORMAT, rd->name , rd->calculated_value , rd->collected_value - , (calculated_number)rd->multiplier - , (calculated_number)rd->divisor + , (NETDATA_DOUBLE)rd->multiplier + , (NETDATA_DOUBLE)rd->divisor ); #endif @@ -1620,13 +1550,12 @@ after_first_database_work: // the percentage of the current value // over the total of all dimensions rd->calculated_value = - (calculated_number)100 - * (calculated_number)rd->collected_value - / (calculated_number)st->collected_total; + (NETDATA_DOUBLE)100 + * (NETDATA_DOUBLE)rd->collected_value + / (NETDATA_DOUBLE)st->collected_total; #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: CALC PCENT-ROW " - CALCULATED_NUMBER_FORMAT " = 100" + rrdset_debug(st, "%s: CALC PCENT-ROW " NETDATA_DOUBLE_FORMAT " = 100" " * " COLLECTED_NUMBER_FORMAT " / " COLLECTED_NUMBER_FORMAT , rd->name @@ -1677,34 +1606,32 @@ after_first_database_work: // TODO: remember recent history of rates and compare with current rate to reduce this chance. if (delta < max_acceptable_rate) { rd->calculated_value += - (calculated_number) delta - * (calculated_number) rd->multiplier - / (calculated_number) rd->divisor; + (NETDATA_DOUBLE) delta + * (NETDATA_DOUBLE) rd->multiplier + / (NETDATA_DOUBLE) rd->divisor; } else { // This is a reset. Any overflow with a rate greater than MAX_INCREMENTAL_PERCENT_RATE will also // be detected as a reset instead. - rd->calculated_value += (calculated_number)0; + rd->calculated_value += (NETDATA_DOUBLE)0; } } else { rd->calculated_value += - (calculated_number) (rd->collected_value - rd->last_collected_value) - * (calculated_number) rd->multiplier - / (calculated_number) rd->divisor; + (NETDATA_DOUBLE) (rd->collected_value - rd->last_collected_value) + * (NETDATA_DOUBLE) rd->multiplier + / (NETDATA_DOUBLE) rd->divisor; } #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: CALC INC PRE " - CALCULATED_NUMBER_FORMAT " = (" + rrdset_debug(st, "%s: CALC INC PRE " NETDATA_DOUBLE_FORMAT " = (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")" - " * " CALCULATED_NUMBER_FORMAT - " / " CALCULATED_NUMBER_FORMAT - , rd->name + " * " NETDATA_DOUBLE_FORMAT + " / " NETDATA_DOUBLE_FORMAT, rd->name , rd->calculated_value , rd->collected_value, rd->last_collected_value - , (calculated_number)rd->multiplier - , (calculated_number)rd->divisor + , (NETDATA_DOUBLE)rd->multiplier + , (NETDATA_DOUBLE)rd->divisor ); #endif @@ -1737,13 +1664,12 @@ after_first_database_work: rd->calculated_value = 0; else rd->calculated_value = - (calculated_number)100 - * (calculated_number)(rd->collected_value - rd->last_collected_value) - / (calculated_number)(st->collected_total - st->last_collected_total); + (NETDATA_DOUBLE)100 + * (NETDATA_DOUBLE)(rd->collected_value - rd->last_collected_value) + / (NETDATA_DOUBLE)(st->collected_total - st->last_collected_total); #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: CALC PCENT-DIFF " - CALCULATED_NUMBER_FORMAT " = 100" + rrdset_debug(st, "%s: CALC PCENT-DIFF " NETDATA_DOUBLE_FORMAT " = 100" " * (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")" " / (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")" , rd->name @@ -1761,8 +1687,7 @@ after_first_database_work: rd->calculated_value = 0; #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: CALC " - CALCULATED_NUMBER_FORMAT " = 0" + rrdset_debug(st, "%s: CALC " NETDATA_DOUBLE_FORMAT " = 0" , rd->name , rd->calculated_value ); @@ -1775,9 +1700,8 @@ after_first_database_work: rrdset_debug(st, "%s: PHASE2 " " last_collected_value = " COLLECTED_NUMBER_FORMAT " collected_value = " COLLECTED_NUMBER_FORMAT - " last_calculated_value = " CALCULATED_NUMBER_FORMAT - " calculated_value = " CALCULATED_NUMBER_FORMAT - , rd->name + " last_calculated_value = " NETDATA_DOUBLE_FORMAT + " calculated_value = " NETDATA_DOUBLE_FORMAT, rd->name , rd->last_collected_value , rd->collected_value , rd->last_calculated_value @@ -1791,10 +1715,10 @@ after_first_database_work: // it is now time to interpolate values on a second boundary #ifdef NETDATA_INTERNAL_CHECKS - if(unlikely(now_collect_ut < next_store_ut)) { + if(unlikely(now_collect_ut < next_store_ut && st->counter_done > 1)) { // this is collected in the same interpolation point rrdset_debug(st, "THIS IS IN THE SAME INTERPOLATION POINT"); - info("INTERNAL CHECK: host '%s', chart '%s' is collected in the same interpolation point: short by %llu microseconds", st->rrdhost->hostname, st->name, next_store_ut - now_collect_ut); + info("INTERNAL CHECK: host '%s', chart '%s' collection %zu is in the same interpolation point: short by %llu microseconds", st->rrdhost->hostname, st->name, st->counter_done, next_store_ut - now_collect_ut); } #endif @@ -1811,14 +1735,14 @@ after_first_database_work: after_second_database_work: st->last_collected_total = st->collected_total; -#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) +#ifdef ENABLE_ACLK time_t mark = now_realtime_sec(); #endif rrddim_foreach_read(rd, st) { if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) continue; -#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) +#ifdef ENABLE_ACLK if (likely(!st->state->is_ar_chart)) { if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN) && likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark)); @@ -1837,7 +1761,8 @@ after_second_database_work: case RRD_ALGORITHM_INCREMENTAL: if(unlikely(!first_entry)) { #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", rd->name, rd->last_calculated_value + rd->calculated_value, rd->calculated_value); + rrdset_debug(st, "%s: setting last_calculated_value (old: " NETDATA_DOUBLE_FORMAT + ") to last_calculated_value (new: " NETDATA_DOUBLE_FORMAT ")", rd->name, rd->last_calculated_value + rd->calculated_value, rd->calculated_value); #endif rd->last_calculated_value += rd->calculated_value; @@ -1853,7 +1778,8 @@ after_second_database_work: case RRD_ALGORITHM_PCENT_OVER_ROW_TOTAL: case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL: #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", rd->name, rd->last_calculated_value, rd->calculated_value); + rrdset_debug(st, "%s: setting last_calculated_value (old: " NETDATA_DOUBLE_FORMAT + ") to last_calculated_value (new: " NETDATA_DOUBLE_FORMAT ")", rd->name, rd->last_calculated_value, rd->calculated_value); #endif rd->last_calculated_value = rd->calculated_value; @@ -1868,9 +1794,8 @@ after_second_database_work: rrdset_debug(st, "%s: END " " last_collected_value = " COLLECTED_NUMBER_FORMAT " collected_value = " COLLECTED_NUMBER_FORMAT - " last_calculated_value = " CALCULATED_NUMBER_FORMAT - " calculated_value = " CALCULATED_NUMBER_FORMAT - , rd->name + " last_calculated_value = " NETDATA_DOUBLE_FORMAT + " calculated_value = " NETDATA_DOUBLE_FORMAT, rd->name , rd->last_collected_value , rd->collected_value , rd->last_calculated_value @@ -1883,15 +1808,24 @@ after_second_database_work: // ALL DONE ABOUT THE DATA UPDATE // -------------------------------------------------------------------- - // find if there are any obsolete dimensions - time_t now = now_realtime_sec(); + if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_MAP)) { + // update the memory mapped files with the latest values + rrdset_memory_file_update(st); + rrddim_foreach_read(rd, st) { + rrddim_memory_file_update(rd); + } + } + + // find if there are any obsolete dimensions if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE_DIMENSIONS))) { rrddim_foreach_read(rd, st) if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE))) break; if(unlikely(rd)) { + time_t now = now_realtime_sec(); + RRDDIM *last; // there is a dimension to free // upgrade our read lock to a write lock @@ -1903,10 +1837,11 @@ after_second_database_work: && (rd->last_collected_time.tv_sec + rrdset_free_obsolete_time < now))) { info("Removing obsolete dimension '%s' (%s) of '%s' (%s).", rd->name, rd->id, st->name, st->id); - if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || rd->rrd_memory_mode == RRD_MEMORY_MODE_MAP)) { - info("Deleting dimension file '%s'.", rd->cache_filename); - if(unlikely(unlink(rd->cache_filename) == -1)) - error("Cannot delete dimension file '%s'", rd->cache_filename); + const char *cache_filename = rrddim_cache_filename(rd); + if(cache_filename) { + info("Deleting dimension file '%s'.", cache_filename); + if (unlikely(unlink(cache_filename) == -1)) + error("Cannot delete dimension file '%s'", cache_filename); } #ifdef ENABLE_DBENGINE @@ -1917,13 +1852,25 @@ after_second_database_work: rrddim_flag_clear(rd, RRDDIM_FLAG_OBSOLETE); /* only a collector can mark a chart as obsolete, so we must remove the reference */ - uint8_t can_delete_metric = rd->state->collect_ops.finalize(rd); - if (can_delete_metric) { + + size_t tiers_available = 0, tiers_said_yes = 0; + for(int tier = 0; tier < storage_tiers ;tier++) { + if(rd->tiers[tier]) { + tiers_available++; + + if(rd->tiers[tier]->collect_ops.finalize(rd->tiers[tier]->db_collection_handle)) + tiers_said_yes++; + + rd->tiers[tier]->db_collection_handle = NULL; + } + } + + if (tiers_available == tiers_said_yes && tiers_said_yes) { /* This metric has no data and no references */ - delete_dimension_uuid(&rd->state->metric_uuid); + delete_dimension_uuid(&rd->metric_uuid); } else { /* Do not delete this dimension */ -#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) +#ifdef ENABLE_ACLK queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark)); #endif last = rd; @@ -1958,102 +1905,208 @@ after_second_database_work: netdata_thread_enable_cancelability(); } -void rrdset_add_label_to_new_list(RRDSET *st, char *key, char *value, LABEL_SOURCE source) -{ - st->state->new_labels = add_label_to_list(st->state->new_labels, key, value, source); -} -void rrdset_finalize_labels(RRDSET *st) -{ - struct label *new_labels = st->state->new_labels; - struct label_index *labels = &st->state->labels; +// ---------------------------------------------------------------------------- +// compatibility layer for RRDSET files v019 + +#define RRDSET_MAGIC_V019 "NETDATA RRD SET FILE V019" +#define RRD_ID_LENGTH_MAX_V019 200 + +struct avl_element_v019 { + void *avl_link[2]; + signed char avl_balance; +}; +struct avl_tree_type_v019 { + void *root; + int (*compar)(void *a, void *b); +}; +struct avl_tree_lock_v019 { + struct avl_tree_type_v019 avl_tree; + pthread_rwlock_t rwlock; +}; +struct rrdset_map_save_v019 { + struct avl_element_v019 avl; // ignored + struct avl_element_v019 avlname; // ignored + char id[RRD_ID_LENGTH_MAX_V019 + 1]; // check to reset all - update on load + void *name; // ignored + void *unused_ptr; // ignored + void *type; // ignored + void *family; // ignored + void *title; // ignored + void *units; // ignored + void *context; // ignored + uint32_t hash_context; // ignored + uint32_t chart_type; // ignored + int update_every; // check to reset all - update on load + long entries; // check to reset all - update on load + long current_entry; // NEEDS TO BE UPDATED - FIXED ON LOAD + uint32_t flags; // ignored + void *exporting_flags; // ignored + int gap_when_lost_iterations_above; // ignored + long priority; // ignored + uint32_t rrd_memory_mode; // ignored + void *cache_dir; // ignored + char cache_filename[FILENAME_MAX+1]; // ignored - update on load + pthread_rwlock_t rrdset_rwlock; // ignored + size_t counter; // NEEDS TO BE UPDATED - maintained on load + size_t counter_done; // ignored + union { // + time_t last_accessed_time; // ignored + time_t last_entry_t; // ignored + }; // + time_t upstream_resync_time; // ignored + void *plugin_name; // ignored + void *module_name; // ignored + void *chart_uuid; // ignored + void *state; // ignored + size_t unused[3]; // ignored + size_t rrddim_page_alignment; // ignored + uint32_t hash; // ignored + uint32_t hash_name; // ignored + usec_t usec_since_last_update; // NEEDS TO BE UPDATED - maintained on load + struct timeval last_updated; // NEEDS TO BE UPDATED - check to reset all - fixed on load + struct timeval last_collected_time; // ignored + long long collected_total; // NEEDS TO BE UPDATED - maintained on load + long long last_collected_total; // NEEDS TO BE UPDATED - maintained on load + void *rrdfamily; // ignored + void *rrdhost; // ignored + void *next; // ignored + long double green; // ignored + long double red; // ignored + struct avl_tree_lock_v019 rrdvar_root_index; // ignored + void *variables; // ignored + void *alarms; // ignored + unsigned long memsize; // check to reset all - update on load + char magic[sizeof(RRDSET_MAGIC_V019) + 1]; // check to reset all - update on load + struct avl_tree_lock_v019 dimensions_index; // ignored + void *dimensions; // ignored +}; + +void rrdset_memory_file_update(RRDSET *st) { + if(!st->st_on_file) return; + struct rrdset_map_save_v019 *st_on_file = st->st_on_file; + + st_on_file->current_entry = st->current_entry; + st_on_file->counter = st->counter; + st_on_file->usec_since_last_update = st->usec_since_last_update; + st_on_file->last_updated.tv_sec = st->last_updated.tv_sec; + st_on_file->last_updated.tv_usec = st->last_updated.tv_usec; + st_on_file->collected_total = st->collected_total; + st_on_file->last_collected_total = st->last_collected_total; +} - if (!labels->head) { - labels->head = new_labels; - } else { - replace_label_list(labels, new_labels); - } +const char *rrdset_cache_filename(RRDSET *st) { + if(!st->st_on_file) return NULL; + struct rrdset_map_save_v019 *st_on_file = st->st_on_file; + return st_on_file->cache_filename; +} - netdata_rwlock_rdlock(&labels->labels_rwlock); - struct label *lbl = labels->head; - while (lbl) { - sql_store_chart_label(st->chart_uuid, (int)lbl->label_source, lbl->key, lbl->value); - lbl = lbl->next; - } - netdata_rwlock_unlock(&labels->labels_rwlock); +void rrdset_memory_file_free(RRDSET *st) { + if(!st->st_on_file) return; - st->state->new_labels = NULL; -} + // needed for memory mode map, to save the latest state + rrdset_memory_file_update(st); -void rrdset_update_labels(RRDSET *st, struct label *labels) -{ - if (!labels) - return; + struct rrdset_map_save_v019 *st_on_file = st->st_on_file; + munmap(st_on_file, st_on_file->memsize); - update_label_list(&st->state->new_labels, labels); - rrdset_finalize_labels(st); + // remove the pointers from the RRDDIM + st->st_on_file = NULL; } -int rrdset_contains_label_keylist(RRDSET *st, char *keylist) -{ - struct label_index *labels = &st->state->labels; - int ret; +void rrdset_memory_file_save(RRDSET *st) { + if(!st->st_on_file) return; - if (!labels->head) - return 0; + rrdset_memory_file_update(st); - netdata_rwlock_rdlock(&labels->labels_rwlock); - ret = label_list_contains_keylist(labels->head, keylist); - netdata_rwlock_unlock(&labels->labels_rwlock); + struct rrdset_map_save_v019 *st_on_file = st->st_on_file; + if(st_on_file->rrd_memory_mode != RRD_MEMORY_MODE_SAVE) return; - return ret; + memory_file_save(st_on_file->cache_filename, st->st_on_file, st_on_file->memsize); } -struct label *rrdset_lookup_label_key(RRDSET *st, char *key, uint32_t key_hash) -{ - struct label_index *labels = &st->state->labels; - struct label *ret = NULL; +bool rrdset_memory_load_or_create_map_save(RRDSET *st, RRD_MEMORY_MODE memory_mode) { + if(memory_mode != RRD_MEMORY_MODE_SAVE && memory_mode != RRD_MEMORY_MODE_MAP) + return false; - if (labels->head) { - netdata_rwlock_rdlock(&labels->labels_rwlock); - ret = label_list_lookup_key(labels->head, key, key_hash); - netdata_rwlock_unlock(&labels->labels_rwlock); - } - return ret; -} - -static inline int k8s_space(char c) { - switch(c) { - case ':': - case ',': - return 1; - default: - return 0; - } -} + char fullfilename[FILENAME_MAX + 1]; + snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", st->cache_dir); -int rrdset_matches_label_keys(RRDSET *st, char *keylist, char *words[], uint32_t *hash_key_list, int *word_count, int size) -{ - struct label_index *labels = &st->state->labels; + unsigned long size = sizeof(struct rrdset_map_save_v019); + struct rrdset_map_save_v019 *st_on_file = (struct rrdset_map_save_v019 *)netdata_mmap( + fullfilename, size, + ((memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE), + 0); - if (!labels->head) - return 0; + if(!st_on_file) return false; - struct label *one_label; + time_t now = now_realtime_sec(); - if (!*word_count) { - *word_count = quoted_strings_splitter(keylist, words, size, k8s_space, NULL, NULL, 0); - for (int i = 0; i < *word_count - 1; i += 2) { - hash_key_list[i] = simple_hash(words[i]); - } + st_on_file->magic[sizeof(RRDSET_MAGIC_V019)] = '\0'; + if(strcmp(st_on_file->magic, RRDSET_MAGIC_V019) != 0) { + info("Initializing file '%s'.", fullfilename); + memset(st_on_file, 0, size); + } + else if(strncmp(st_on_file->id, st->id, RRD_ID_LENGTH_MAX_V019) != 0) { + error("File '%s' contents are not for chart '%s'. Clearing it.", fullfilename, st->id); + memset(st_on_file, 0, size); + } + else if(st_on_file->memsize != size || st_on_file->entries != st->entries) { + error("File '%s' does not have the desired size. Clearing it.", fullfilename); + memset(st_on_file, 0, size); + } + else if(st_on_file->update_every != st->update_every) { + error("File '%s' does not have the desired granularity. Clearing it.", fullfilename); + memset(st_on_file, 0, size); + } + else if((now - st_on_file->last_updated.tv_sec) > st->update_every * st->entries) { + info("File '%s' is too old. Clearing it.", fullfilename); + memset(st_on_file, 0, size); + } + else if(st_on_file->last_updated.tv_sec > now + st->update_every) { + error("File '%s' refers to the future by %zd secs. Resetting it to now.", fullfilename, (ssize_t)(st_on_file->last_updated.tv_sec - now)); + st_on_file->last_updated.tv_sec = now; } - int ret = 1; - netdata_rwlock_rdlock(&labels->labels_rwlock); - for (int i = 0; ret && i < *word_count - 1; i += 2) { - one_label = label_list_lookup_key(labels->head, words[i], hash_key_list[i]); - ret = (one_label && !strcmp(one_label->value, words[i + 1])); + if(st_on_file->current_entry >= st_on_file->entries) + st_on_file->current_entry = 0; + + // make sure the database is aligned + bool align_last_updated = false; + if(st_on_file->last_updated.tv_sec) { + st_on_file->update_every = st->update_every; + align_last_updated = true; } - netdata_rwlock_unlock(&labels->labels_rwlock); - return ret; + + // copy the useful values to st + st->current_entry = st_on_file->current_entry; + st->counter = st_on_file->counter; + st->usec_since_last_update = st_on_file->usec_since_last_update; + st->last_updated.tv_sec = st_on_file->last_updated.tv_sec; + st->last_updated.tv_usec = st_on_file->last_updated.tv_usec; + st->collected_total = st_on_file->collected_total; + st->last_collected_total = st_on_file->last_collected_total; + + // link it to st + st->st_on_file = st_on_file; + + // clear everything + memset(st_on_file, 0, size); + + // set the values we need + strncpyz(st_on_file->id, st->id, RRD_ID_LENGTH_MAX_V019 + 1); + strcpy(st_on_file->cache_filename, fullfilename); + strcpy(st_on_file->magic, RRDSET_MAGIC_V019); + st_on_file->memsize = size; + st_on_file->entries = st->entries; + st_on_file->update_every = st->update_every; + st_on_file->rrd_memory_mode = memory_mode; + + if(align_last_updated) + last_updated_time_align(st); + + // copy the useful values back to st_on_file + rrdset_memory_file_update(st); + + return true; } |