summaryrefslogtreecommitdiffstats
path: root/database/rrdset.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/rrdset.c')
-rw-r--r--database/rrdset.c108
1 files changed, 70 insertions, 38 deletions
diff --git a/database/rrdset.c b/database/rrdset.c
index 57f962cd6..2843bb330 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -158,7 +158,7 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
STORAGE_ENGINE *eng = st->rrdhost->db[tier].eng;
if(!eng) continue;
- st->storage_metrics_groups[tier] = eng->api.collect_ops.metrics_group_get(host->db[tier].instance, &st->chart_uuid);
+ st->storage_metrics_groups[tier] = storage_engine_metrics_group_get(eng->backend, host->db[tier].instance, &st->chart_uuid);
}
}
@@ -184,6 +184,8 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
ml_chart_new(st);
}
+void pluginsd_rrdset_cleanup(RRDSET *st);
+
void rrdset_finalize_collection(RRDSET *st, bool dimensions_too) {
RRDHOST *host = st->rrdhost;
@@ -201,10 +203,12 @@ void rrdset_finalize_collection(RRDSET *st, bool dimensions_too) {
if(!eng) continue;
if(st->storage_metrics_groups[tier]) {
- eng->api.collect_ops.metrics_group_release(host->db[tier].instance, st->storage_metrics_groups[tier]);
+ storage_engine_metrics_group_release(eng->backend, host->db[tier].instance, st->storage_metrics_groups[tier]);
st->storage_metrics_groups[tier] = NULL;
}
}
+
+ pluginsd_rrdset_cleanup(st);
}
// the destructor - the dictionary is write locked while this runs
@@ -475,6 +479,27 @@ inline RRDSET *rrdset_find_byname(RRDHOST *host, const char *name) {
return(st);
}
+RRDSET_ACQUIRED *rrdset_find_and_acquire(RRDHOST *host, const char *id) {
+ debug(D_RRD_CALLS, "rrdset_find_and_acquire() for host %s, chart %s", rrdhost_hostname(host), id);
+
+ return (RRDSET_ACQUIRED *)dictionary_get_and_acquire_item(host->rrdset_root_index, id);
+}
+
+RRDSET *rrdset_acquired_to_rrdset(RRDSET_ACQUIRED *rsa) {
+ if(unlikely(!rsa))
+ return NULL;
+
+ return (RRDSET *) dictionary_acquired_item_value((const DICTIONARY_ITEM *)rsa);
+}
+
+void rrdset_acquired_release(RRDSET_ACQUIRED *rsa) {
+ if(unlikely(!rsa))
+ return;
+
+ RRDSET *rs = rrdset_acquired_to_rrdset(rsa);
+ dictionary_acquired_item_release(rs->rrdhost->rrdset_root_index, (const DICTIONARY_ITEM *)rsa);
+}
+
// ----------------------------------------------------------------------------
// RRDSET - rename charts
@@ -737,10 +762,8 @@ void rrdset_reset(RRDSET *st) {
rd->collections_counter = 0;
if(!rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) {
- for(size_t tier = 0; tier < storage_tiers ;tier++) {
- if(rd->tiers[tier].db_collection_handle)
- rd->tiers[tier].collect_ops->flush(rd->tiers[tier].db_collection_handle);
- }
+ for(size_t tier = 0; tier < storage_tiers ;tier++)
+ storage_engine_store_flush(rd->tiers[tier].db_collection_handle);
}
}
rrddim_foreach_done(rd);
@@ -1116,7 +1139,7 @@ void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAG
if (likely(!storage_point_is_unset(t->virtual_point))) {
- t->collect_ops->store_metric(
+ storage_engine_store_metric(
t->db_collection_handle,
t->next_point_end_time_s * USEC_PER_SEC,
t->virtual_point.sum,
@@ -1127,7 +1150,7 @@ void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAG
t->virtual_point.flags);
}
else {
- t->collect_ops->store_metric(
+ storage_engine_store_metric(
t->db_collection_handle,
t->next_point_end_time_s * USEC_PER_SEC,
NAN,
@@ -1199,7 +1222,10 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n,
#endif // NETDATA_LOG_COLLECTION_ERRORS
// store the metric on tier 0
- rd->tiers[0].collect_ops->store_metric(rd->tiers[0].db_collection_handle, point_end_time_ut, n, 0, 0, 1, 0, flags);
+ storage_engine_store_metric(rd->tiers[0].db_collection_handle, point_end_time_ut,
+ n, 0, 0,
+ 1, 0, flags);
+
rrdset_done_statistics_points_stored_per_tier[0]++;
time_t now_s = (time_t)(point_end_time_ut / USEC_PER_SEC);
@@ -1229,6 +1255,8 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n,
store_metric_at_tier(rd, tier, t, sp, point_end_time_ut);
}
+
+ rrdcontext_collected_rrddim(rd);
}
void store_metric_collection_completed() {
@@ -1269,7 +1297,8 @@ void rrdset_thread_rda_free(void) {
}
static inline size_t rrdset_done_interpolate(
- RRDSET *st
+ RRDSET_STREAM_BUFFER *rsb
+ , RRDSET *st
, struct rda_item *rda_base
, size_t rda_slots
, usec_t update_every_ut
@@ -1399,32 +1428,38 @@ static inline size_t rrdset_done_interpolate(
time_t current_time_s = (time_t) (next_store_ut / USEC_PER_SEC);
if(unlikely(!store_this_entry)) {
- (void) ml_is_anomalous(rd, current_time_s, 0, false);
+ (void) ml_dimension_is_anomalous(rd, current_time_s, 0, false);
+
+ if(rsb->wb && rsb->v2)
+ rrddim_push_metrics_v2(rsb, rd, next_store_ut, NAN, SN_FLAG_NONE);
rrddim_store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE);
- rrdcontext_collected_rrddim(rd);
continue;
}
if(likely(rd->updated && rd->collections_counter > 1 && iterations < gap_when_lost_iterations_above)) {
uint32_t dim_storage_flags = storage_flags;
- if (ml_is_anomalous(rd, current_time_s, new_value, true)) {
+ if (ml_dimension_is_anomalous(rd, current_time_s, new_value, true)) {
// clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
dim_storage_flags &= ~((storage_number)SN_FLAG_NOT_ANOMALOUS);
}
+ if(rsb->wb && rsb->v2)
+ rrddim_push_metrics_v2(rsb, rd, next_store_ut, new_value, dim_storage_flags);
+
rrddim_store_metric(rd, next_store_ut, new_value, dim_storage_flags);
- rrdcontext_collected_rrddim(rd);
rd->last_stored_value = new_value;
}
else {
- (void) ml_is_anomalous(rd, current_time_s, 0, false);
+ (void) ml_dimension_is_anomalous(rd, current_time_s, 0, false);
rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING ", rrddim_name(rd), current_entry);
+ if(rsb->wb && rsb->v2)
+ rrddim_push_metrics_v2(rsb, rd, next_store_ut, NAN, SN_FLAG_NONE);
+
rrddim_store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE);
- rrdcontext_collected_rrddim(rd);
rd->last_stored_value = NAN;
}
@@ -1468,6 +1503,10 @@ void rrdset_done(RRDSET *st) {
void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) {
if(unlikely(!service_running(SERVICE_COLLECTORS))) return;
+ RRDSET_STREAM_BUFFER stream_buffer = { .wb = NULL, };
+ if(unlikely(rrdhost_has_rrdpush_sender_enabled(st->rrdhost)))
+ stream_buffer = rrdset_push_metric_initialize(st, now.tv_sec);
+
netdata_spinlock_lock(&st->data_collection_lock);
if (pending_rrdset_next)
@@ -1489,10 +1528,10 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
update_every_ut = st->update_every * USEC_PER_SEC; // st->update_every in microseconds
RRDSET_FLAGS rrdset_flags = rrdset_flag_check(st, ~0);
- if(unlikely(rrdset_flags & RRDSET_FLAG_COLLECTION_FINISHED))
+ if(unlikely(rrdset_flags & RRDSET_FLAG_COLLECTION_FINISHED)) {
+ netdata_spinlock_unlock(&st->data_collection_lock);
return;
-
- netdata_thread_disable_cancelability();
+ }
if (unlikely(rrdset_flags & RRDSET_FLAG_OBSOLETE)) {
error("Chart '%s' has the OBSOLETE flag set, but it is collected.", rrdset_id(st));
@@ -1500,8 +1539,7 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
}
// check if the chart has a long time to be updated
- if(unlikely(st->usec_since_last_update > st->entries * update_every_ut &&
- st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE && st->rrd_memory_mode != RRD_MEMORY_MODE_NONE)) {
+ if(unlikely(st->usec_since_last_update > MAX(st->entries, 60) * update_every_ut)) {
info("host '%s', chart '%s': took too long to be updated (counter #%zu, update #%zu, %0.3" NETDATA_DOUBLE_MODIFIER
" secs). Resetting it.", rrdhost_hostname(st->rrdhost), rrdset_id(st), st->counter, st->counter_done, (NETDATA_DOUBLE)st->usec_since_last_update / USEC_PER_SEC);
rrdset_reset(st);
@@ -1527,9 +1565,6 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
// calculate the proper last_collected_time, using usec_since_last_update
last_collect_ut = rrdset_update_last_collected_time(st);
}
- if (unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE)) {
- goto after_first_database_work;
- }
// if this set has not been updated in the past
// we fake the last_update time to be = now - usec_since_last_update
@@ -1592,11 +1627,10 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
}
}
-after_first_database_work:
st->counter_done++;
- if(unlikely(rrdhost_has_rrdpush_sender_enabled(st->rrdhost)))
- rrdset_done_push(st);
+ if(stream_buffer.wb && !stream_buffer.v2)
+ rrdset_push_metrics_v1(&stream_buffer, st);
uint32_t has_reset_value = 0;
@@ -1654,9 +1688,6 @@ after_first_database_work:
rrddim_foreach_done(rd);
rda_slots = dimensions;
- if (unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE))
- goto after_second_database_work;
-
rrdset_debug(st, "last_collect_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last collection time)", (NETDATA_DOUBLE)last_collect_ut/USEC_PER_SEC);
rrdset_debug(st, "now_collect_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (current collection time)", (NETDATA_DOUBLE)now_collect_ut/USEC_PER_SEC);
rrdset_debug(st, "last_stored_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last updated time)", (NETDATA_DOUBLE)last_stored_ut/USEC_PER_SEC);
@@ -1857,7 +1888,8 @@ after_first_database_work:
// #endif
rrdset_done_interpolate(
- st
+ &stream_buffer
+ , st
, rda_base
, rda_slots
, update_every_ut
@@ -1869,7 +1901,6 @@ after_first_database_work:
, has_reset_value
);
-after_second_database_work:
for(dim_id = 0, rda = rda_base ; dim_id < rda_slots ; ++dim_id, ++rda) {
rd = rda->rd;
if(unlikely(!rd)) continue;
@@ -1928,6 +1959,7 @@ after_second_database_work:
}
netdata_spinlock_unlock(&st->data_collection_lock);
+ rrdset_push_metrics_finished(&stream_buffer, st);
// ALL DONE ABOUT THE DATA UPDATE
// --------------------------------------------------------------------
@@ -1955,8 +1987,6 @@ after_second_database_work:
rrdcontext_collected_rrdset(st);
- netdata_thread_enable_cancelability();
-
store_metric_collection_completed();
}
@@ -1965,18 +1995,20 @@ time_t rrdset_set_update_every_s(RRDSET *st, time_t update_every_s) {
internal_error(true, "RRDSET '%s' switching update every from %d to %d",
rrdset_id(st), (int)st->update_every, (int)update_every_s);
- time_t prev_update_every_s = st->update_every;
- st->update_every = update_every_s;
+ time_t prev_update_every_s = (time_t) st->update_every;
+ st->update_every = (int) update_every_s;
// switch update every to the storage engine
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
for (size_t tier = 0; tier < storage_tiers; tier++) {
if (rd->tiers[tier].db_collection_handle)
- rd->tiers[tier].collect_ops->change_collection_frequency(rd->tiers[tier].db_collection_handle, (int)(st->rrdhost->db[tier].tier_grouping * st->update_every));
+ storage_engine_store_change_collection_frequency(
+ rd->tiers[tier].db_collection_handle,
+ (int)(st->rrdhost->db[tier].tier_grouping * st->update_every));
}
- assert(rd->update_every == prev_update_every_s &&
+ assert(rd->update_every == (int) prev_update_every_s &&
"chart's update every differs from the update every of its dimensions");
rd->update_every = st->update_every;
}