summaryrefslogtreecommitdiffstats
path: root/database/rrdset.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/rrdset.c96
1 files changed, 63 insertions, 33 deletions
diff --git a/database/rrdset.c b/database/rrdset.c
index fd6605dff..2d3ee9609 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -194,6 +194,8 @@ inline void rrdset_is_obsolete(RRDSET *st) {
if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))) {
rrdset_flag_set(st, RRDSET_FLAG_OBSOLETE);
+ st->rrdhost->obsolete_charts_count++;
+
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
// the chart will not get more updates (data collection)
@@ -205,6 +207,8 @@ inline void rrdset_is_obsolete(RRDSET *st) {
inline void rrdset_isnot_obsolete(RRDSET *st) {
if(unlikely((rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))) {
rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE);
+ st->rrdhost->obsolete_charts_count--;
+
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
// the chart will be pushed upstream automatically
@@ -452,7 +456,7 @@ void rrdset_delete_custom(RRDSET *st, int db_rotated) {
#ifdef ENABLE_ACLK
if ((netdata_cloud_setting) && (db_rotated || RRD_MEMORY_MODE_DBENGINE != st->rrd_memory_mode)) {
aclk_del_collector(st->rrdhost, st->plugin_name, st->module_name);
- st->rrdhost->obsolete_count++;
+ st->rrdhost->deleted_charts_count++;
}
#endif
@@ -645,7 +649,7 @@ RRDSET *rrdset_create_custom(
aclk_add_collector(host, st->plugin_name, st->module_name);
}
}
- rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
}
#endif
freez(old_plugin);
@@ -695,11 +699,11 @@ RRDSET *rrdset_create_custom(
// ------------------------------------------------------------------------
// compose the config_section for this chart
- char config_section[RRD_ID_LENGTH_MAX + 1];
+ char config_section[RRD_ID_LENGTH_MAX + GUID_LEN + 2];
if(host == localhost)
strcpy(config_section, fullid);
else
- snprintfz(config_section, RRD_ID_LENGTH_MAX, "%s/%s", host->machine_guid, fullid);
+ snprintfz(config_section, RRD_ID_LENGTH_MAX + GUID_LEN + 1, "%s/%s", host->machine_guid, fullid);
// ------------------------------------------------------------------------
// get the options from the config, we need to create it
@@ -931,21 +935,13 @@ RRDSET *rrdset_create_custom(
update_chart_metadata(st->chart_uuid, st, id, name);
store_active_chart(st->chart_uuid);
-
-#ifdef ENABLE_ACLK
- host->obsolete_count = 0;
-#endif
- rrdhost_cleanup_obsolete_charts(host);
-#ifdef ENABLE_ACLK
- if (host->obsolete_count)
- aclk_update_chart(st->rrdhost, "dummy-chart", ACLK_CMD_CHARTDEL);
-#endif
+ compute_chart_hash(st);
rrdhost_unlock(host);
#ifdef ENABLE_ACLK
if (netdata_cloud_setting)
aclk_add_collector(host, plugin, module);
- rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
#endif
return(st);
}
@@ -1124,7 +1120,7 @@ static inline size_t rrdset_done_interpolate(
, usec_t last_collect_ut
, usec_t now_collect_ut
, char store_this_entry
- , uint32_t storage_flags
+ , uint32_t has_reset_value
) {
RRDDIM *rd;
@@ -1139,6 +1135,11 @@ static inline size_t rrdset_done_interpolate(
size_t counter = st->counter;
long current_entry = st->current_entry;
+ uint32_t storage_flags = SN_DEFAULT_FLAGS;
+
+ if (has_reset_value)
+ storage_flags |= SN_EXISTS_RESET;
+
for( ; next_store_ut <= now_collect_ut ; last_collect_ut = next_store_ut, next_store_ut += update_every_ut, iterations-- ) {
#ifdef NETDATA_INTERNAL_CHECKS
@@ -1236,13 +1237,22 @@ static inline size_t rrdset_done_interpolate(
}
if(unlikely(!store_this_entry)) {
- rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT); //pack_storage_number(0, SN_NOT_EXISTS)
-// rd->values[current_entry] = SN_EMPTY_SLOT; //pack_storage_number(0, SN_NOT_EXISTS);
+ (void) ml_is_anomalous(rd, 0, false);
+
+ rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT);
+// rd->values[current_entry] = SN_EMPTY_SLOT;
continue;
}
if(likely(rd->updated && rd->collections_counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
- rd->state->collect_ops.store_metric(rd, next_store_ut, pack_storage_number(new_value, storage_flags));
+ uint32_t dim_storage_flags = storage_flags;
+
+ if (ml_is_anomalous(rd, new_value, true)) {
+ // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
+ dim_storage_flags &= ~ ((uint32_t) SN_ANOMALY_BIT);
+ }
+
+ rd->state->collect_ops.store_metric(rd, next_store_ut, pack_storage_number(new_value, dim_storage_flags));
// rd->values[current_entry] = pack_storage_number(new_value, storage_flags );
rd->last_stored_value = new_value;
@@ -1254,9 +1264,9 @@ static inline size_t rrdset_done_interpolate(
, unpack_storage_number(rd->values[current_entry]), new_value
);
#endif
-
}
else {
+ (void) ml_is_anomalous(rd, 0, false);
#ifdef NETDATA_INTERNAL_CHECKS
rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING "
@@ -1265,8 +1275,8 @@ static inline size_t rrdset_done_interpolate(
);
#endif
-// rd->values[current_entry] = SN_EMPTY_SLOT; // pack_storage_number(0, SN_NOT_EXISTS);
- rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT); //pack_storage_number(0, SN_NOT_EXISTS)
+// rd->values[current_entry] = SN_EMPTY_SLOT;
+ rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT);
rd->last_stored_value = NAN;
}
@@ -1278,11 +1288,10 @@ static inline size_t rrdset_done_interpolate(
calculated_number t2 = unpack_storage_number(rd->values[current_entry]);
calculated_number accuracy = accuracy_loss(t1, t2);
- debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " FLAGS=0x%08x (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)"
+ debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)"
, st->id, rd->name
, current_entry
, t2
- , get_storage_number_flags(rd->values[current_entry])
, t1
, accuracy
, (accuracy > ACCURACY_LOSS_ACCEPTED_PERCENT) ? " **TOO BIG** " : ""
@@ -1304,7 +1313,7 @@ static inline size_t rrdset_done_interpolate(
#endif
}
// reset the storage flags for the next point, if any;
- storage_flags = SN_EXISTS;
+ storage_flags = SN_DEFAULT_FLAGS;
st->counter = ++counter;
st->current_entry = current_entry = ((current_entry + 1) >= st->entries) ? 0 : current_entry + 1;
@@ -1353,6 +1362,7 @@ static inline void rrdset_done_fill_the_gap(RRDSET *st) {
st->last_updated.tv_sec += c * st->update_every;
st->current_entry += c;
+ st->counter += c;
if(st->current_entry >= st->entries)
st->current_entry -= st->entries;
}
@@ -1382,9 +1392,11 @@ void rrdset_done(RRDSET *st) {
rrdset_rdlock(st);
#ifdef ENABLE_ACLK
- if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
- rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
- aclk_update_chart(st->rrdhost, st->id, ACLK_CMD_CHART);
+ if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
+ if (st->counter_done >= RRDSET_MINIMUM_LIVE_COUNT && st->dimensions) {
+ if (likely(!queue_chart_to_aclk(st)))
+ rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ }
}
#endif
@@ -1535,7 +1547,7 @@ after_first_database_work:
st->collected_total += rd->collected_value;
}
- uint32_t storage_flags = SN_EXISTS;
+ uint32_t has_reset_value = 0;
// process all dimensions to calculate their values
// based on the collected figures only
@@ -1632,7 +1644,7 @@ after_first_database_work:
, rd->collected_value);
if(!(rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)))
- storage_flags = SN_EXISTS_RESET;
+ has_reset_value = 1;
uint64_t last = (uint64_t)rd->last_collected_value;
uint64_t new = (uint64_t)rd->collected_value;
@@ -1703,7 +1715,7 @@ after_first_database_work:
);
if(!(rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)))
- storage_flags = SN_EXISTS_RESET;
+ has_reset_value = 1;
rd->last_collected_value = rd->collected_value;
}
@@ -1782,15 +1794,32 @@ after_first_database_work:
, last_collect_ut
, now_collect_ut
, store_this_entry
- , storage_flags
+ , has_reset_value
);
after_second_database_work:
st->last_collected_total = st->collected_total;
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ time_t mark = now_realtime_sec();
+#endif
rrddim_foreach_read(rd, st) {
if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED))
continue;
+
+#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
+ if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) {
+ int live = ((mark - rd->last_collected_time.tv_sec) < (RRDSET_MINIMUM_LIVE_COUNT * rd->update_every));
+ if (unlikely(live != rd->state->aclk_live_status)) {
+ if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
+ if (likely(!queue_dimension_to_aclk(rd))) {
+ rd->state->aclk_live_status = live;
+ rrddim_flag_set(rd, RRDDIM_FLAG_ACLK);
+ }
+ }
+ }
+ }
+#endif
if(unlikely(!rd->updated))
continue;
@@ -1866,7 +1895,8 @@ after_second_database_work:
rrdset_wrlock(st);
for( rd = st->dimensions, last = NULL ; likely(rd) ; ) {
- if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE) && (rd->last_collected_time.tv_sec + rrdset_free_obsolete_time < now))) {
+ if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE) && !rrddim_flag_check(rd, RRDDIM_FLAG_ACLK)
+ && (rd->last_collected_time.tv_sec + rrdset_free_obsolete_time < now))) {
info("Removing obsolete dimension '%s' (%s) of '%s' (%s).", rd->name, rd->id, st->name, st->id);
if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || rd->rrd_memory_mode == RRD_MEMORY_MODE_MAP)) {
@@ -1886,7 +1916,7 @@ after_second_database_work:
uint8_t can_delete_metric = rd->state->collect_ops.finalize(rd);
if (can_delete_metric) {
/* This metric has no data and no references */
- delete_dimension_uuid(rd->state->metric_uuid);
+ delete_dimension_uuid(&rd->state->metric_uuid);
} else {
/* Do not delete this dimension */
last = rd;