diff options
Diffstat (limited to 'database/rrdset.c')
-rw-r--r-- | database/rrdset.c | 417 |
1 files changed, 219 insertions, 198 deletions
diff --git a/database/rrdset.c b/database/rrdset.c index 6eb3c7105..57f962cd6 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -28,6 +28,8 @@ static inline void rrdset_index_del_name(RRDHOST *host, RRDSET *st) { } static inline RRDSET *rrdset_index_find_name(RRDHOST *host, const char *name) { + if (unlikely(!host->rrdset_root_index_name)) + return NULL; return dictionary_get(host->rrdset_root_index_name, name); } @@ -126,15 +128,15 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v st->module_name = rrd_string_strdupz(ctr->module); st->priority = ctr->priority; - st->cache_dir = rrdset_cache_dir(host, chart_full_id); st->entries = (ctr->memory_mode != RRD_MEMORY_MODE_DBENGINE) ? align_entries_to_pagesize(ctr->memory_mode, ctr->history_entries) : 5; st->update_every = ctr->update_every; st->rrd_memory_mode = ctr->memory_mode; st->chart_type = ctr->chart_type; - st->gap_when_lost_iterations_above = (int) (gap_when_lost_iterations_above + 2); st->rrdhost = host; + netdata_spinlock_init(&st->data_collection_lock); + st->flags = RRDSET_FLAG_SYNC_CLOCK | RRDSET_FLAG_INDEXED_ID | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED @@ -165,7 +167,7 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v // chart variables - we need this for data collection to work (collector given chart variables) - not only health rrdsetvar_index_init(st); - if (host->health_enabled) { + if (host->health.health_enabled) { st->rrdfamily = rrdfamily_add_and_acquire(host, rrdset_family(st)); st->rrdvars = rrdvariables_create(); rrddimvar_index_init(st); @@ -178,6 +180,31 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v st->red = NAN; ctr->react_action = RRDSET_REACT_NEW; + + ml_chart_new(st); +} + +void rrdset_finalize_collection(RRDSET *st, bool dimensions_too) { + RRDHOST *host = st->rrdhost; + + rrdset_flag_set(st, RRDSET_FLAG_COLLECTION_FINISHED); + + if(dimensions_too) { + RRDDIM *rd; + rrddim_foreach_read(rd, st) + rrddim_finalize_collection_and_check_retention(rd); + rrddim_foreach_done(rd); + } + + for(size_t tier = 0; tier < storage_tiers ; tier++) { + STORAGE_ENGINE *eng = st->rrdhost->db[tier].eng; + if(!eng) continue; + + if(st->storage_metrics_groups[tier]) { + eng->api.collect_ops.metrics_group_release(host->db[tier].instance, st->storage_metrics_groups[tier]); + st->storage_metrics_groups[tier] = NULL; + } + } } // the destructor - the dictionary is write locked while this runs @@ -187,15 +214,7 @@ static void rrdset_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v rrdset_flag_clear(st, RRDSET_FLAG_INDEXED_ID); - // cleanup storage engines - { - for(size_t tier = 0; tier < storage_tiers ; tier++) { - STORAGE_ENGINE *eng = st->rrdhost->db[tier].eng; - if(!eng) continue; - - eng->api.collect_ops.metrics_group_release(host->db[tier].instance, st->storage_metrics_groups[tier]); - } - } + rrdset_finalize_collection(st, false); // remove it from the name index rrdset_index_del_name(host, st); @@ -232,6 +251,9 @@ static void rrdset_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v // 7. destroy the chart labels rrdlabels_destroy(st->rrdlabels); // destroy the labels, after letting the contexts know + // 8. destroy the ml handle + ml_chart_delete(st); + rrdset_memory_file_free(st); // remove files of db mode save and map // ------------------------------------------------------------------------ @@ -282,7 +304,7 @@ static bool rrdset_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, } if (unlikely(st->update_every != ctr->update_every)) { - rrdset_set_update_every(st, ctr->update_every); + rrdset_set_update_every_s(st, ctr->update_every); ctr->react_action |= RRDSET_REACT_UPDATED; } @@ -356,33 +378,17 @@ static void rrdset_react_callback(const DICTIONARY_ITEM *item __maybe_unused, vo RRDSET *st = rrdset; RRDHOST *host = st->rrdhost; - st->last_accessed_time = now_realtime_sec(); + st->last_accessed_time_s = now_realtime_sec(); - if(host->health_enabled && (ctr->react_action & (RRDSET_REACT_NEW | RRDSET_REACT_CHART_ACTIVATED))) { + if(host->health.health_enabled && (ctr->react_action & (RRDSET_REACT_NEW | RRDSET_REACT_CHART_ACTIVATED))) { rrdset_flag_set(st, RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION); rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION); } if(ctr->react_action & (RRDSET_REACT_NEW | RRDSET_REACT_PLUGIN_UPDATED | RRDSET_REACT_MODULE_UPDATED)) { if (ctr->react_action & RRDSET_REACT_NEW) { - if(unlikely(rrdcontext_find_chart_uuid(st, &st->chart_uuid))) { + if(unlikely(rrdcontext_find_chart_uuid(st, &st->chart_uuid))) uuid_generate(st->chart_uuid); - bool found_in_sql = false; (void)found_in_sql; - -// bool found_in_sql = true; -// if(unlikely(sql_find_chart_uuid(host, st, &st->chart_uuid))) { -// uuid_generate(st->chart_uuid); -// found_in_sql = false; -// } - -#ifdef NETDATA_INTERNAL_CHECKS - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower(st->chart_uuid, uuid_str); - error_report("Chart UUID for host %s chart [%s] not found in context. It is now set to %s (%s)", - string2str(host->hostname), - string2str(st->name), uuid_str, found_in_sql ? "found in sqlite" : "newly generated"); -#endif - } } rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE); rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); @@ -393,7 +399,8 @@ static void rrdset_react_callback(const DICTIONARY_ITEM *item __maybe_unused, vo void rrdset_index_init(RRDHOST *host) { if(!host->rrdset_root_index) { - host->rrdset_root_index = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + host->rrdset_root_index = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + &dictionary_stats_category_rrdset_rrddim, sizeof(RRDSET)); dictionary_register_insert_callback(host->rrdset_root_index, rrdset_insert_callback, NULL); dictionary_register_conflict_callback(host->rrdset_root_index, rrdset_conflict_callback, NULL); @@ -402,8 +409,9 @@ void rrdset_index_init(RRDHOST *host) { } if(!host->rrdset_root_index_name) { - host->rrdset_root_index_name = dictionary_create( - DICT_OPTION_NAME_LINK_DONT_CLONE | DICT_OPTION_VALUE_LINK_DONT_CLONE | DICT_OPTION_DONT_OVERWRITE_VALUE); + host->rrdset_root_index_name = dictionary_create_advanced( + DICT_OPTION_NAME_LINK_DONT_CLONE | DICT_OPTION_VALUE_LINK_DONT_CLONE | DICT_OPTION_DONT_OVERWRITE_VALUE, + &dictionary_stats_category_rrdset_rrddim, 0); dictionary_register_insert_callback(host->rrdset_root_index_name, rrdset_name_insert_callback, host); dictionary_register_delete_callback(host->rrdset_root_index_name, rrdset_name_delete_callback, host); @@ -431,6 +439,8 @@ static inline void rrdset_index_del(RRDHOST *host, RRDSET *st) { static RRDSET *rrdset_index_find(RRDHOST *host, const char *id) { // TODO - the name index should have an acquired dictionary item, not just a pointer to RRDSET + if (unlikely(!host->rrdset_root_index)) + return NULL; return dictionary_get(host->rrdset_root_index, id); } @@ -442,7 +452,7 @@ inline RRDSET *rrdset_find(RRDHOST *host, const char *id) { RRDSET *st = rrdset_index_find(host, id); if(st) - st->last_accessed_time = now_realtime_sec(); + st->last_accessed_time_s = now_realtime_sec(); return(st); } @@ -521,51 +531,106 @@ int rrdset_reset_name(RRDSET *st, const char *name) { } // get the timestamp of the last entry in the round-robin database -time_t rrdset_last_entry_t(RRDSET *st) { +time_t rrdset_last_entry_s(RRDSET *st) { RRDDIM *rd; - time_t last_entry_t = 0; + time_t last_entry_s = 0; rrddim_foreach_read(rd, st) { - time_t t = rrddim_last_entry_t(rd); - if(t > last_entry_t) last_entry_t = t; + time_t t = rrddim_last_entry_s(rd); + if(t > last_entry_s) last_entry_s = t; } rrddim_foreach_done(rd); - return last_entry_t; + return last_entry_s; +} + +time_t rrdset_last_entry_s_of_tier(RRDSET *st, size_t tier) { + RRDDIM *rd; + time_t last_entry_s = 0; + + rrddim_foreach_read(rd, st) { + time_t t = rrddim_last_entry_s_of_tier(rd, tier); + if(t > last_entry_s) last_entry_s = t; + } + rrddim_foreach_done(rd); + + return last_entry_s; } // get the timestamp of first entry in the round-robin database -time_t rrdset_first_entry_t(RRDSET *st) { +time_t rrdset_first_entry_s(RRDSET *st) { RRDDIM *rd; - time_t first_entry_t = LONG_MAX; + time_t first_entry_s = LONG_MAX; rrddim_foreach_read(rd, st) { - time_t t = rrddim_first_entry_t(rd); - if(t < first_entry_t) - first_entry_t = t; + time_t t = rrddim_first_entry_s(rd); + if(t < first_entry_s) + first_entry_s = t; } rrddim_foreach_done(rd); - if (unlikely(LONG_MAX == first_entry_t)) return 0; - return first_entry_t; + if (unlikely(LONG_MAX == first_entry_s)) return 0; + return first_entry_s; } -time_t rrdset_first_entry_t_of_tier(RRDSET *st, size_t tier) { +time_t rrdset_first_entry_s_of_tier(RRDSET *st, size_t tier) { if(unlikely(tier > storage_tiers)) return 0; RRDDIM *rd; - time_t first_entry_t = LONG_MAX; + time_t first_entry_s = LONG_MAX; rrddim_foreach_read(rd, st) { - time_t t = rrddim_first_entry_t_of_tier(rd, tier); - if(t && t < first_entry_t) - first_entry_t = t; + time_t t = rrddim_first_entry_s_of_tier(rd, tier); + if(t && t < first_entry_s) + first_entry_s = t; } rrddim_foreach_done(rd); - if (unlikely(LONG_MAX == first_entry_t)) return 0; - return first_entry_t; + if (unlikely(LONG_MAX == first_entry_s)) return 0; + return first_entry_s; +} + +void rrdset_get_retention_of_tier_for_collected_chart(RRDSET *st, time_t *first_time_s, time_t *last_time_s, time_t now_s, size_t tier) { + if(!now_s) + now_s = now_realtime_sec(); + + time_t db_first_entry_s = rrdset_first_entry_s_of_tier(st, tier); + time_t db_last_entry_s = st->last_updated.tv_sec; // we assume this is a collected RRDSET + + if(unlikely(!db_last_entry_s)) { + db_last_entry_s = rrdset_last_entry_s_of_tier(st, tier); + + if (unlikely(!db_last_entry_s)) { + // we assume this is a collected RRDSET + db_first_entry_s = 0; + db_last_entry_s = 0; + } + } + + if(unlikely(db_last_entry_s > now_s)) { + internal_error(db_last_entry_s > now_s + 1, + "RRDSET: 'host:%s/chart:%s' latest db time %ld is in the future, adjusting it to now %ld", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + db_last_entry_s, now_s); + db_last_entry_s = now_s; + } + + if(unlikely(db_first_entry_s && db_last_entry_s && db_first_entry_s >= db_last_entry_s)) { + internal_error(db_first_entry_s > db_last_entry_s, + "RRDSET: 'host:%s/chart:%s' oldest db time %ld is bigger than latest db time %ld, adjusting it to (latest time %ld - update every %ld)", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + db_first_entry_s, db_last_entry_s, + db_last_entry_s, (time_t)st->update_every); + db_first_entry_s = db_last_entry_s - st->update_every; + } + + if(unlikely(!db_first_entry_s && db_last_entry_s)) + // this can be the case on the first data collection of a chart + db_first_entry_s = db_last_entry_s - st->update_every; + + *first_time_s = db_first_entry_s; + *last_time_s = db_last_entry_s; } inline void rrdset_is_obsolete(RRDSET *st) { @@ -578,7 +643,7 @@ inline void rrdset_is_obsolete(RRDSET *st) { rrdset_flag_set(st, RRDSET_FLAG_OBSOLETE); rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS); - st->last_accessed_time = now_realtime_sec(); + st->last_accessed_time_s = now_realtime_sec(); rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); @@ -592,7 +657,7 @@ inline void rrdset_is_obsolete(RRDSET *st) { inline void rrdset_isnot_obsolete(RRDSET *st) { if(unlikely((rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))) { rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE); - st->last_accessed_time = now_realtime_sec(); + st->last_accessed_time_s = now_realtime_sec(); rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); @@ -673,8 +738,8 @@ void rrdset_reset(RRDSET *st) { if(!rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) { for(size_t tier = 0; tier < storage_tiers ;tier++) { - if(rd->tiers[tier]) - rd->tiers[tier]->collect_ops->flush(rd->tiers[tier]->db_collection_handle); + if(rd->tiers[tier].db_collection_handle) + rd->tiers[tier].collect_ops->flush(rd->tiers[tier].db_collection_handle); } } } @@ -768,7 +833,8 @@ void rrdset_delete_files(RRDSET *st) { } rrddim_foreach_done(rd); - recursively_delete_dir(st->cache_dir, "left-over chart"); + if(st->cache_dir) + recursively_delete_dir(st->cache_dir, "left-over chart"); } void rrdset_delete_obsolete_dimensions(RRDSET *st) { @@ -809,7 +875,7 @@ RRDSET *rrdset_create_custom( , long history_entries ) { if (host != localhost) - host->senders_last_chart_command = now_realtime_sec(); + host->child_last_chart_command = now_realtime_sec(); if(!type || !type[0]) fatal("Cannot create rrd stats without a type: id '%s', name '%s', family '%s', context '%s', title '%s', units '%s', plugin '%s', module '%s'." @@ -920,15 +986,8 @@ void rrdset_timed_next(RRDSET *st, struct timeval now, usec_t duration_since_las ); #endif - st->last_collected_time.tv_sec = now.tv_sec - st->update_every; - st->last_collected_time.tv_usec = now.tv_usec; - last_collected_time_align(st); + duration_since_last_update = 0; - st->last_updated.tv_sec = now.tv_sec - st->update_every; - st->last_updated.tv_usec = now.tv_usec; - last_updated_time_align(st); - - duration_since_last_update = st->update_every * USEC_PER_SEC; #ifdef NETDATA_INTERNAL_CHECKS if(!discard_reason) discard_reason = "COLLECTION TIME IN FUTURE"; #endif @@ -941,6 +1000,7 @@ void rrdset_timed_next(RRDSET *st, struct timeval now, usec_t duration_since_las #endif duration_since_last_update = (usec_t)since_last_usec; + #ifdef NETDATA_INTERNAL_CHECKS if(!discard_reason) discard_reason = "COLLECTION TIME TOO FAR IN THE PAST"; #endif @@ -949,16 +1009,16 @@ void rrdset_timed_next(RRDSET *st, struct timeval now, usec_t duration_since_las #ifdef NETDATA_INTERNAL_CHECKS if(since_last_usec > 0 && (susec_t) duration_since_last_update < since_last_usec) { static __thread susec_t min_delta = USEC_PER_SEC * 3600, permanent_min_delta = 0; - static __thread time_t last_t = 0; + static __thread time_t last_time_s = 0; // the first time initialize it so that it will make the check later - if(last_t == 0) last_t = now.tv_sec + 60; + if(last_time_s == 0) last_time_s = now.tv_sec + 60; susec_t delta = since_last_usec - (susec_t) duration_since_last_update; if(delta < min_delta) min_delta = delta; - if(now.tv_sec >= last_t + 60) { - last_t = now.tv_sec; + if(now.tv_sec >= last_time_s + 60) { + last_time_s = now.tv_sec; if(min_delta > permanent_min_delta) { info("MINIMUM MICROSECONDS DELTA of thread %d increased from %lld to %lld (+%lld)", gettid(), permanent_min_delta, min_delta, min_delta - permanent_min_delta); @@ -1029,7 +1089,7 @@ static inline usec_t rrdset_update_last_collected_time(RRDSET *st) { return last_collect_ut; } -static inline usec_t rrdset_init_last_updated_time(RRDSET *st) { +static inline void rrdset_init_last_updated_time(RRDSET *st) { // copy the last collected time to last updated time st->last_updated.tv_sec = st->last_collected_time.tv_sec; st->last_updated.tv_usec = st->last_collected_time.tv_usec; @@ -1038,31 +1098,27 @@ static inline usec_t rrdset_init_last_updated_time(RRDSET *st) { st->last_updated.tv_sec -= st->update_every; last_updated_time_align(st); - - usec_t last_updated_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec; - - rrdset_debug(st, "initialized last updated time to %0.3" NETDATA_DOUBLE_MODIFIER, (NETDATA_DOUBLE)last_updated_ut / USEC_PER_SEC); - - return last_updated_ut; } static __thread size_t rrdset_done_statistics_points_stored_per_tier[RRD_STORAGE_TIERS]; -static inline time_t tier_next_point_time(RRDDIM *rd, struct rrddim_tier *t, time_t now) { +static inline time_t tier_next_point_time_s(RRDDIM *rd, struct rrddim_tier *t, time_t now_s) { time_t loop = (time_t)rd->update_every * (time_t)t->tier_grouping; - return now + loop - ((now + loop) % loop); + return now_s + loop - ((now_s + loop) % loop); } void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut __maybe_unused) { - if (unlikely(!t->next_point_time)) - t->next_point_time = tier_next_point_time(rd, t, sp.end_time); + if (unlikely(!t->next_point_end_time_s)) + t->next_point_end_time_s = tier_next_point_time_s(rd, t, sp.end_time_s); + + if(unlikely(sp.start_time_s >= t->next_point_end_time_s)) { + // flush the virtual point, it is done - if(unlikely(sp.start_time > t->next_point_time)) { if (likely(!storage_point_is_unset(t->virtual_point))) { t->collect_ops->store_metric( t->db_collection_handle, - t->next_point_time * USEC_PER_SEC, + t->next_point_end_time_s * USEC_PER_SEC, t->virtual_point.sum, t->virtual_point.min, t->virtual_point.max, @@ -1073,7 +1129,7 @@ void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAG else { t->collect_ops->store_metric( t->db_collection_handle, - t->next_point_time * USEC_PER_SEC, + t->next_point_end_time_s * USEC_PER_SEC, NAN, NAN, NAN, @@ -1083,18 +1139,18 @@ void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAG rrdset_done_statistics_points_stored_per_tier[tier]++; t->virtual_point.count = 0; // make the point unset - t->next_point_time = tier_next_point_time(rd, t, sp.end_time); + t->next_point_end_time_s = tier_next_point_time_s(rd, t, sp.end_time_s); } // merge the dates into our virtual point - if (unlikely(sp.start_time < t->virtual_point.start_time)) - t->virtual_point.start_time = sp.start_time; + if (unlikely(sp.start_time_s < t->virtual_point.start_time_s)) + t->virtual_point.start_time_s = sp.start_time_s; - if (likely(sp.end_time > t->virtual_point.end_time)) - t->virtual_point.end_time = sp.end_time; + if (likely(sp.end_time_s > t->virtual_point.end_time_s)) + t->virtual_point.end_time_s = sp.end_time_s; // merge the values into our virtual point - if (likely(!storage_point_is_empty(sp))) { + if (likely(!storage_point_is_gap(sp))) { // we aggregate only non NULLs into higher tiers if (likely(!storage_point_is_unset(t->virtual_point))) { @@ -1143,14 +1199,14 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, #endif // NETDATA_LOG_COLLECTION_ERRORS // store the metric on tier 0 - rd->tiers[0]->collect_ops->store_metric(rd->tiers[0]->db_collection_handle, point_end_time_ut, n, 0, 0, 1, 0, flags); + rd->tiers[0].collect_ops->store_metric(rd->tiers[0].db_collection_handle, point_end_time_ut, n, 0, 0, 1, 0, flags); rrdset_done_statistics_points_stored_per_tier[0]++; - time_t now = (time_t)(point_end_time_ut / USEC_PER_SEC); + time_t now_s = (time_t)(point_end_time_ut / USEC_PER_SEC); STORAGE_POINT sp = { - .start_time = now - rd->update_every, - .end_time = now, + .start_time_s = now_s - rd->update_every, + .end_time_s = now_s, .min = n, .max = n, .sum = n, @@ -1160,14 +1216,14 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, }; for(size_t tier = 1; tier < storage_tiers ;tier++) { - if(unlikely(!rd->tiers[tier])) continue; + if(unlikely(!rd->tiers[tier].db_metric_handle)) continue; - struct rrddim_tier *t = rd->tiers[tier]; + struct rrddim_tier *t = &rd->tiers[tier]; if(!rrddim_option_check(rd, RRDDIM_OPTION_BACKFILLED_HIGH_TIERS)) { // we have not collected this tier before // let's fill any gap that may exist - rrdr_fill_tier_gap_from_smaller_tiers(rd, tier, now); + rrdr_fill_tier_gap_from_smaller_tiers(rd, tier, now_s); rrddim_option_set(rd, RRDDIM_OPTION_BACKFILLED_HIGH_TIERS); } @@ -1188,12 +1244,16 @@ struct rda_item { static __thread struct rda_item *thread_rda = NULL; static __thread size_t thread_rda_entries = 0; -struct rda_item *rrdset_thread_rda(size_t *dimensions) { +struct rda_item *rrdset_thread_rda_get(size_t *dimensions) { if(unlikely(!thread_rda || (*dimensions) > thread_rda_entries)) { + size_t old_mem = thread_rda_entries * sizeof(struct rda_item); freez(thread_rda); - thread_rda = mallocz((*dimensions) * sizeof(struct rda_item)); thread_rda_entries = *dimensions; + size_t new_mem = thread_rda_entries * sizeof(struct rda_item); + thread_rda = mallocz(new_mem); + + __atomic_add_fetch(&netdata_buffers_statistics.rrdset_done_rda_size, new_mem - old_mem, __ATOMIC_RELAXED); } *dimensions = thread_rda_entries; @@ -1201,6 +1261,8 @@ struct rda_item *rrdset_thread_rda(size_t *dimensions) { } void rrdset_thread_rda_free(void) { + __atomic_sub_fetch(&netdata_buffers_statistics.rrdset_done_rda_size, thread_rda_entries * sizeof(struct rda_item), __ATOMIC_RELAXED); + freez(thread_rda); thread_rda = NULL; thread_rda_entries = 0; @@ -1253,6 +1315,8 @@ static inline size_t rrdset_done_interpolate( last_ut = next_store_ut; + ml_chart_update_begin(st); + struct rda_item *rda; size_t dim_id; for(dim_id = 0, rda = rda_base ; dim_id < rda_slots ; ++dim_id, ++rda) { @@ -1332,17 +1396,20 @@ static inline size_t rrdset_done_interpolate( break; } + time_t current_time_s = (time_t) (next_store_ut / USEC_PER_SEC); + if(unlikely(!store_this_entry)) { - (void) ml_is_anomalous(rd, 0, false); + (void) ml_is_anomalous(rd, current_time_s, 0, false); + rrddim_store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE); rrdcontext_collected_rrddim(rd); continue; } - if(likely(rd->updated && rd->collections_counter > 1 && iterations < st->gap_when_lost_iterations_above)) { + if(likely(rd->updated && rd->collections_counter > 1 && iterations < gap_when_lost_iterations_above)) { uint32_t dim_storage_flags = storage_flags; - if (ml_is_anomalous(rd, new_value, true)) { + if (ml_is_anomalous(rd, current_time_s, new_value, true)) { // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous dim_storage_flags &= ~((storage_number)SN_FLAG_NOT_ANOMALOUS); } @@ -1352,7 +1419,7 @@ static inline size_t rrdset_done_interpolate( rd->last_stored_value = new_value; } else { - (void) ml_is_anomalous(rd, 0, false); + (void) ml_is_anomalous(rd, current_time_s, 0, false); rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING ", rrddim_name(rd), current_entry); @@ -1364,6 +1431,8 @@ static inline size_t rrdset_done_interpolate( stored_entries++; } + ml_chart_update_end(st); + // reset the storage flags for the next point, if any; storage_flags = SN_DEFAULT_FLAGS; @@ -1389,36 +1458,6 @@ static inline size_t rrdset_done_interpolate( return stored_entries; } -static inline void rrdset_done_fill_the_gap(RRDSET *st) { - usec_t update_every_ut = st->update_every * USEC_PER_SEC; - usec_t now_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec; - - long c = 0, entries = st->entries; - RRDDIM *rd; - rrddim_foreach_read(rd, st) { - usec_t next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC; - long current_entry = st->current_entry; - - for(c = 0; c < entries && next_store_ut <= now_collect_ut ; next_store_ut += update_every_ut, c++) { - rd->db[current_entry] = pack_storage_number(NAN, SN_FLAG_NONE); - current_entry = ((current_entry + 1) >= entries) ? 0 : current_entry + 1; - - rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING (FILLED THE GAP)", rrddim_name(rd), current_entry); - } - } - rrddim_foreach_done(rd); - - if(c > 0) { - c--; - st->last_updated.tv_sec += c * st->update_every; - - st->current_entry += c; - st->counter += c; - if(st->current_entry >= st->entries) - st->current_entry -= st->entries; - } -} - void rrdset_done(RRDSET *st) { struct timeval now; @@ -1427,10 +1466,12 @@ void rrdset_done(RRDSET *st) { } void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) { - if(unlikely(netdata_exit)) return; + if(unlikely(!service_running(SERVICE_COLLECTORS))) return; + + netdata_spinlock_lock(&st->data_collection_lock); if (pending_rrdset_next) - rrdset_next(st); + rrdset_timed_next(st, now, 0ULL); debug(D_RRD_CALLS, "rrdset_done() for chart '%s'", rrdset_name(st)); @@ -1447,9 +1488,13 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) next_store_ut = 0, // the timestamp in microseconds, of the next entry to store in the db update_every_ut = st->update_every * USEC_PER_SEC; // st->update_every in microseconds + RRDSET_FLAGS rrdset_flags = rrdset_flag_check(st, ~0); + if(unlikely(rrdset_flags & RRDSET_FLAG_COLLECTION_FINISHED)) + return; + netdata_thread_disable_cancelability(); - if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) { + if (unlikely(rrdset_flags & RRDSET_FLAG_OBSOLETE)) { error("Chart '%s' has the OBSOLETE flag set, but it is collected.", rrdset_id(st)); rrdset_isnot_obsolete(st); } @@ -1519,29 +1564,6 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) first_entry = 1; } -#ifdef ENABLE_DBENGINE - // check if we will re-write the entire page - if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && - dt_usec(&st->last_collected_time, &st->last_updated) > (RRDENG_BLOCK_SIZE / sizeof(storage_number)) * update_every_ut)) { - info( - "'%s': too old data (last updated at %" PRId64 ".%" PRId64 ", last collected at %" PRId64 ".%" PRId64 "). " - "Resetting it. Will not store the next entry.", - rrdset_id(st), - (int64_t)st->last_updated.tv_sec, - (int64_t)st->last_updated.tv_usec, - (int64_t)st->last_collected_time.tv_sec, - (int64_t)st->last_collected_time.tv_usec); - rrdset_reset(st); - rrdset_init_last_updated_time(st); - - st->usec_since_last_update = update_every_ut; - - // the first entry should not be stored - store_this_entry = 0; - first_entry = 1; - } -#endif - // these are the 3 variables that will help us in interpolation // last_stored_ut = the last time we added a value to the storage // now_collect_ut = the time the current value has been collected @@ -1551,23 +1573,13 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC; if(unlikely(!st->counter_done)) { - // if we have not collected metrics this session (st->counter_done == 0) - // and we have collected metrics for this chart in the past (st->counter != 0) - // fill the gap (the chart has been just loaded from disk) - if(unlikely(st->counter) && st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) { - // TODO this should be inside the storage engine - rrdset_done_fill_the_gap(st); - last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec; - next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC; - } - if (st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) { - // set a fake last_updated to jump to current time - rrdset_init_last_updated_time(st); - last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec; - next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC; - } + // set a fake last_updated to jump to current time + rrdset_init_last_updated_time(st); + + last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec; + next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC; - if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST))) { + if(unlikely(rrdset_flags & RRDSET_FLAG_STORE_FIRST)) { store_this_entry = 1; last_collect_ut = next_store_ut - update_every_ut; @@ -1589,7 +1601,7 @@ after_first_database_work: uint32_t has_reset_value = 0; size_t rda_slots = dictionary_entries(st->rrddim_root_index); - struct rda_item *rda_base = rrdset_thread_rda(&rda_slots); + struct rda_item *rda_base = rrdset_thread_rda_get(&rda_slots); size_t dim_id; size_t dimensions = 0; @@ -1915,6 +1927,8 @@ after_second_database_work: ); } + netdata_spinlock_unlock(&st->data_collection_lock); + // ALL DONE ABOUT THE DATA UPDATE // -------------------------------------------------------------------- @@ -1946,29 +1960,29 @@ after_second_database_work: store_metric_collection_completed(); } -time_t rrdset_set_update_every(RRDSET *st, time_t update_every) { +time_t rrdset_set_update_every_s(RRDSET *st, time_t update_every_s) { internal_error(true, "RRDSET '%s' switching update every from %d to %d", - rrdset_id(st), (int)st->update_every, (int)update_every); + rrdset_id(st), (int)st->update_every, (int)update_every_s); - time_t prev_update_every = st->update_every; - st->update_every = update_every; + time_t prev_update_every_s = st->update_every; + st->update_every = update_every_s; // switch update every to the storage engine RRDDIM *rd; rrddim_foreach_read(rd, st) { for (size_t tier = 0; tier < storage_tiers; tier++) { - if (rd->tiers[tier] && rd->tiers[tier]->db_collection_handle) - rd->tiers[tier]->collect_ops->change_collection_frequency(rd->tiers[tier]->db_collection_handle, (int)(st->rrdhost->db[tier].tier_grouping * st->update_every)); + if (rd->tiers[tier].db_collection_handle) + rd->tiers[tier].collect_ops->change_collection_frequency(rd->tiers[tier].db_collection_handle, (int)(st->rrdhost->db[tier].tier_grouping * st->update_every)); } - assert(rd->update_every == prev_update_every && + assert(rd->update_every == prev_update_every_s && "chart's update every differs from the update every of its dimensions"); rd->update_every = st->update_every; } rrddim_foreach_done(rd); - return prev_update_every; + return prev_update_every_s; } // ---------------------------------------------------------------------------- @@ -2016,8 +2030,8 @@ struct rrdset_map_save_v019 { size_t counter; // NEEDS TO BE UPDATED - maintained on load size_t counter_done; // ignored union { // - time_t last_accessed_time; // ignored - time_t last_entry_t; // ignored + time_t last_accessed_time_s; // ignored + time_t last_entry_s; // ignored }; // time_t upstream_resync_time; // ignored void *plugin_name; // ignored @@ -2064,6 +2078,13 @@ const char *rrdset_cache_filename(RRDSET *st) { return st_on_file->cache_filename; } +const char *rrdset_cache_dir(RRDSET *st) { + if(!st->cache_dir) + st->cache_dir = rrdhost_cache_dir_for_rrdset_alloc(st->rrdhost, rrdset_id(st)); + + return st->cache_dir; +} + void rrdset_memory_file_free(RRDSET *st) { if(!st->st_on_file) return; @@ -2071,6 +2092,7 @@ void rrdset_memory_file_free(RRDSET *st) { rrdset_memory_file_update(st); struct rrdset_map_save_v019 *st_on_file = st->st_on_file; + __atomic_sub_fetch(&rrddim_db_memory_size, st_on_file->memsize, __ATOMIC_RELAXED); netdata_munmap(st_on_file, st_on_file->memsize); // remove the pointers from the RRDDIM @@ -2093,17 +2115,15 @@ bool rrdset_memory_load_or_create_map_save(RRDSET *st, RRD_MEMORY_MODE memory_mo return false; char fullfilename[FILENAME_MAX + 1]; - snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", st->cache_dir); + snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", rrdset_cache_dir(st)); unsigned long size = sizeof(struct rrdset_map_save_v019); struct rrdset_map_save_v019 *st_on_file = (struct rrdset_map_save_v019 *)netdata_mmap( - fullfilename, size, - ((memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE), - 0); + fullfilename, size, ((memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE), 0, false, NULL); if(!st_on_file) return false; - time_t now = now_realtime_sec(); + time_t now_s = now_realtime_sec(); st_on_file->magic[sizeof(RRDSET_MAGIC_V019)] = '\0'; if(strcmp(st_on_file->magic, RRDSET_MAGIC_V019) != 0) { @@ -2122,13 +2142,13 @@ bool rrdset_memory_load_or_create_map_save(RRDSET *st, RRD_MEMORY_MODE memory_mo error("File '%s' does not have the desired granularity. Clearing it.", fullfilename); memset(st_on_file, 0, size); } - else if((now - st_on_file->last_updated.tv_sec) > st->update_every * st->entries) { + else if((now_s - st_on_file->last_updated.tv_sec) > st->update_every * st->entries) { info("File '%s' is too old. Clearing it.", fullfilename); memset(st_on_file, 0, size); } - else if(st_on_file->last_updated.tv_sec > now + st->update_every) { - error("File '%s' refers to the future by %zd secs. Resetting it to now.", fullfilename, (ssize_t)(st_on_file->last_updated.tv_sec - now)); - st_on_file->last_updated.tv_sec = now; + else if(st_on_file->last_updated.tv_sec > now_s + st->update_every) { + error("File '%s' refers to the future by %zd secs. Resetting it to now.", fullfilename, (ssize_t)(st_on_file->last_updated.tv_sec - now_s)); + st_on_file->last_updated.tv_sec = now_s; } if(st_on_file->current_entry >= st_on_file->entries) @@ -2169,5 +2189,6 @@ bool rrdset_memory_load_or_create_map_save(RRDSET *st, RRD_MEMORY_MODE memory_mo // copy the useful values back to st_on_file rrdset_memory_file_update(st); + __atomic_add_fetch(&rrddim_db_memory_size, st_on_file->memsize, __ATOMIC_RELAXED); return true; } |