diff options
Diffstat (limited to 'database/rrdset.c')
-rw-r--r-- | database/rrdset.c | 96 |
1 files changed, 63 insertions, 33 deletions
diff --git a/database/rrdset.c b/database/rrdset.c index fd6605df..2d3ee960 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -194,6 +194,8 @@ inline void rrdset_is_obsolete(RRDSET *st) { if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))) { rrdset_flag_set(st, RRDSET_FLAG_OBSOLETE); + st->rrdhost->obsolete_charts_count++; + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); // the chart will not get more updates (data collection) @@ -205,6 +207,8 @@ inline void rrdset_is_obsolete(RRDSET *st) { inline void rrdset_isnot_obsolete(RRDSET *st) { if(unlikely((rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))) { rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE); + st->rrdhost->obsolete_charts_count--; + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); // the chart will be pushed upstream automatically @@ -452,7 +456,7 @@ void rrdset_delete_custom(RRDSET *st, int db_rotated) { #ifdef ENABLE_ACLK if ((netdata_cloud_setting) && (db_rotated || RRD_MEMORY_MODE_DBENGINE != st->rrd_memory_mode)) { aclk_del_collector(st->rrdhost, st->plugin_name, st->module_name); - st->rrdhost->obsolete_count++; + st->rrdhost->deleted_charts_count++; } #endif @@ -645,7 +649,7 @@ RRDSET *rrdset_create_custom( aclk_add_collector(host, st->plugin_name, st->module_name); } } - rrdset_flag_set(st, RRDSET_FLAG_ACLK); + rrdset_flag_clear(st, RRDSET_FLAG_ACLK); } #endif freez(old_plugin); @@ -695,11 +699,11 @@ RRDSET *rrdset_create_custom( // ------------------------------------------------------------------------ // compose the config_section for this chart - char config_section[RRD_ID_LENGTH_MAX + 1]; + char config_section[RRD_ID_LENGTH_MAX + GUID_LEN + 2]; if(host == localhost) strcpy(config_section, fullid); else - snprintfz(config_section, RRD_ID_LENGTH_MAX, "%s/%s", host->machine_guid, fullid); + snprintfz(config_section, RRD_ID_LENGTH_MAX + GUID_LEN + 1, "%s/%s", host->machine_guid, fullid); // ------------------------------------------------------------------------ // get the options from the config, we need to create it @@ -931,21 +935,13 @@ RRDSET *rrdset_create_custom( update_chart_metadata(st->chart_uuid, st, id, name); store_active_chart(st->chart_uuid); - -#ifdef ENABLE_ACLK - host->obsolete_count = 0; -#endif - rrdhost_cleanup_obsolete_charts(host); -#ifdef ENABLE_ACLK - if (host->obsolete_count) - aclk_update_chart(st->rrdhost, "dummy-chart", ACLK_CMD_CHARTDEL); -#endif + compute_chart_hash(st); rrdhost_unlock(host); #ifdef ENABLE_ACLK if (netdata_cloud_setting) aclk_add_collector(host, plugin, module); - rrdset_flag_set(st, RRDSET_FLAG_ACLK); + rrdset_flag_clear(st, RRDSET_FLAG_ACLK); #endif return(st); } @@ -1124,7 +1120,7 @@ static inline size_t rrdset_done_interpolate( , usec_t last_collect_ut , usec_t now_collect_ut , char store_this_entry - , uint32_t storage_flags + , uint32_t has_reset_value ) { RRDDIM *rd; @@ -1139,6 +1135,11 @@ static inline size_t rrdset_done_interpolate( size_t counter = st->counter; long current_entry = st->current_entry; + uint32_t storage_flags = SN_DEFAULT_FLAGS; + + if (has_reset_value) + storage_flags |= SN_EXISTS_RESET; + for( ; next_store_ut <= now_collect_ut ; last_collect_ut = next_store_ut, next_store_ut += update_every_ut, iterations-- ) { #ifdef NETDATA_INTERNAL_CHECKS @@ -1236,13 +1237,22 @@ static inline size_t rrdset_done_interpolate( } if(unlikely(!store_this_entry)) { - rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT); //pack_storage_number(0, SN_NOT_EXISTS) -// rd->values[current_entry] = SN_EMPTY_SLOT; //pack_storage_number(0, SN_NOT_EXISTS); + (void) ml_is_anomalous(rd, 0, false); + + rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT); +// rd->values[current_entry] = SN_EMPTY_SLOT; continue; } if(likely(rd->updated && rd->collections_counter > 1 && iterations < st->gap_when_lost_iterations_above)) { - rd->state->collect_ops.store_metric(rd, next_store_ut, pack_storage_number(new_value, storage_flags)); + uint32_t dim_storage_flags = storage_flags; + + if (ml_is_anomalous(rd, new_value, true)) { + // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous + dim_storage_flags &= ~ ((uint32_t) SN_ANOMALY_BIT); + } + + rd->state->collect_ops.store_metric(rd, next_store_ut, pack_storage_number(new_value, dim_storage_flags)); // rd->values[current_entry] = pack_storage_number(new_value, storage_flags ); rd->last_stored_value = new_value; @@ -1254,9 +1264,9 @@ static inline size_t rrdset_done_interpolate( , unpack_storage_number(rd->values[current_entry]), new_value ); #endif - } else { + (void) ml_is_anomalous(rd, 0, false); #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING " @@ -1265,8 +1275,8 @@ static inline size_t rrdset_done_interpolate( ); #endif -// rd->values[current_entry] = SN_EMPTY_SLOT; // pack_storage_number(0, SN_NOT_EXISTS); - rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT); //pack_storage_number(0, SN_NOT_EXISTS) +// rd->values[current_entry] = SN_EMPTY_SLOT; + rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT); rd->last_stored_value = NAN; } @@ -1278,11 +1288,10 @@ static inline size_t rrdset_done_interpolate( calculated_number t2 = unpack_storage_number(rd->values[current_entry]); calculated_number accuracy = accuracy_loss(t1, t2); - debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " FLAGS=0x%08x (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)" + debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)" , st->id, rd->name , current_entry , t2 - , get_storage_number_flags(rd->values[current_entry]) , t1 , accuracy , (accuracy > ACCURACY_LOSS_ACCEPTED_PERCENT) ? " **TOO BIG** " : "" @@ -1304,7 +1313,7 @@ static inline size_t rrdset_done_interpolate( #endif } // reset the storage flags for the next point, if any; - storage_flags = SN_EXISTS; + storage_flags = SN_DEFAULT_FLAGS; st->counter = ++counter; st->current_entry = current_entry = ((current_entry + 1) >= st->entries) ? 0 : current_entry + 1; @@ -1353,6 +1362,7 @@ static inline void rrdset_done_fill_the_gap(RRDSET *st) { st->last_updated.tv_sec += c * st->update_every; st->current_entry += c; + st->counter += c; if(st->current_entry >= st->entries) st->current_entry -= st->entries; } @@ -1382,9 +1392,11 @@ void rrdset_done(RRDSET *st) { rrdset_rdlock(st); #ifdef ENABLE_ACLK - if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { - rrdset_flag_clear(st, RRDSET_FLAG_ACLK); - aclk_update_chart(st->rrdhost, st->id, ACLK_CMD_CHART); + if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { + if (st->counter_done >= RRDSET_MINIMUM_LIVE_COUNT && st->dimensions) { + if (likely(!queue_chart_to_aclk(st))) + rrdset_flag_set(st, RRDSET_FLAG_ACLK); + } } #endif @@ -1535,7 +1547,7 @@ after_first_database_work: st->collected_total += rd->collected_value; } - uint32_t storage_flags = SN_EXISTS; + uint32_t has_reset_value = 0; // process all dimensions to calculate their values // based on the collected figures only @@ -1632,7 +1644,7 @@ after_first_database_work: , rd->collected_value); if(!(rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS))) - storage_flags = SN_EXISTS_RESET; + has_reset_value = 1; uint64_t last = (uint64_t)rd->last_collected_value; uint64_t new = (uint64_t)rd->collected_value; @@ -1703,7 +1715,7 @@ after_first_database_work: ); if(!(rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS))) - storage_flags = SN_EXISTS_RESET; + has_reset_value = 1; rd->last_collected_value = rd->collected_value; } @@ -1782,15 +1794,32 @@ after_first_database_work: , last_collect_ut , now_collect_ut , store_this_entry - , storage_flags + , has_reset_value ); after_second_database_work: st->last_collected_total = st->collected_total; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + time_t mark = now_realtime_sec(); +#endif rrddim_foreach_read(rd, st) { if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) continue; + +#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) + if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) { + int live = ((mark - rd->last_collected_time.tv_sec) < (RRDSET_MINIMUM_LIVE_COUNT * rd->update_every)); + if (unlikely(live != rd->state->aclk_live_status)) { + if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { + if (likely(!queue_dimension_to_aclk(rd))) { + rd->state->aclk_live_status = live; + rrddim_flag_set(rd, RRDDIM_FLAG_ACLK); + } + } + } + } +#endif if(unlikely(!rd->updated)) continue; @@ -1866,7 +1895,8 @@ after_second_database_work: rrdset_wrlock(st); for( rd = st->dimensions, last = NULL ; likely(rd) ; ) { - if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE) && (rd->last_collected_time.tv_sec + rrdset_free_obsolete_time < now))) { + if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE) && !rrddim_flag_check(rd, RRDDIM_FLAG_ACLK) + && (rd->last_collected_time.tv_sec + rrdset_free_obsolete_time < now))) { info("Removing obsolete dimension '%s' (%s) of '%s' (%s).", rd->name, rd->id, st->name, st->id); if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || rd->rrd_memory_mode == RRD_MEMORY_MODE_MAP)) { @@ -1886,7 +1916,7 @@ after_second_database_work: uint8_t can_delete_metric = rd->state->collect_ops.finalize(rd); if (can_delete_metric) { /* This metric has no data and no references */ - delete_dimension_uuid(rd->state->metric_uuid); + delete_dimension_uuid(&rd->state->metric_uuid); } else { /* Do not delete this dimension */ last = rd; |