diff options
Diffstat (limited to 'database/rrdset.c')
-rw-r--r-- | database/rrdset.c | 108 |
1 files changed, 70 insertions, 38 deletions
diff --git a/database/rrdset.c b/database/rrdset.c index 57f962cd6..2843bb330 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -158,7 +158,7 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v STORAGE_ENGINE *eng = st->rrdhost->db[tier].eng; if(!eng) continue; - st->storage_metrics_groups[tier] = eng->api.collect_ops.metrics_group_get(host->db[tier].instance, &st->chart_uuid); + st->storage_metrics_groups[tier] = storage_engine_metrics_group_get(eng->backend, host->db[tier].instance, &st->chart_uuid); } } @@ -184,6 +184,8 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v ml_chart_new(st); } +void pluginsd_rrdset_cleanup(RRDSET *st); + void rrdset_finalize_collection(RRDSET *st, bool dimensions_too) { RRDHOST *host = st->rrdhost; @@ -201,10 +203,12 @@ void rrdset_finalize_collection(RRDSET *st, bool dimensions_too) { if(!eng) continue; if(st->storage_metrics_groups[tier]) { - eng->api.collect_ops.metrics_group_release(host->db[tier].instance, st->storage_metrics_groups[tier]); + storage_engine_metrics_group_release(eng->backend, host->db[tier].instance, st->storage_metrics_groups[tier]); st->storage_metrics_groups[tier] = NULL; } } + + pluginsd_rrdset_cleanup(st); } // the destructor - the dictionary is write locked while this runs @@ -475,6 +479,27 @@ inline RRDSET *rrdset_find_byname(RRDHOST *host, const char *name) { return(st); } +RRDSET_ACQUIRED *rrdset_find_and_acquire(RRDHOST *host, const char *id) { + debug(D_RRD_CALLS, "rrdset_find_and_acquire() for host %s, chart %s", rrdhost_hostname(host), id); + + return (RRDSET_ACQUIRED *)dictionary_get_and_acquire_item(host->rrdset_root_index, id); +} + +RRDSET *rrdset_acquired_to_rrdset(RRDSET_ACQUIRED *rsa) { + if(unlikely(!rsa)) + return NULL; + + return (RRDSET *) dictionary_acquired_item_value((const DICTIONARY_ITEM *)rsa); +} + +void rrdset_acquired_release(RRDSET_ACQUIRED *rsa) { + if(unlikely(!rsa)) + return; + + RRDSET *rs = rrdset_acquired_to_rrdset(rsa); + dictionary_acquired_item_release(rs->rrdhost->rrdset_root_index, (const DICTIONARY_ITEM *)rsa); +} + // ---------------------------------------------------------------------------- // RRDSET - rename charts @@ -737,10 +762,8 @@ void rrdset_reset(RRDSET *st) { rd->collections_counter = 0; if(!rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) { - for(size_t tier = 0; tier < storage_tiers ;tier++) { - if(rd->tiers[tier].db_collection_handle) - rd->tiers[tier].collect_ops->flush(rd->tiers[tier].db_collection_handle); - } + for(size_t tier = 0; tier < storage_tiers ;tier++) + storage_engine_store_flush(rd->tiers[tier].db_collection_handle); } } rrddim_foreach_done(rd); @@ -1116,7 +1139,7 @@ void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAG if (likely(!storage_point_is_unset(t->virtual_point))) { - t->collect_ops->store_metric( + storage_engine_store_metric( t->db_collection_handle, t->next_point_end_time_s * USEC_PER_SEC, t->virtual_point.sum, @@ -1127,7 +1150,7 @@ void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAG t->virtual_point.flags); } else { - t->collect_ops->store_metric( + storage_engine_store_metric( t->db_collection_handle, t->next_point_end_time_s * USEC_PER_SEC, NAN, @@ -1199,7 +1222,10 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, #endif // NETDATA_LOG_COLLECTION_ERRORS // store the metric on tier 0 - rd->tiers[0].collect_ops->store_metric(rd->tiers[0].db_collection_handle, point_end_time_ut, n, 0, 0, 1, 0, flags); + storage_engine_store_metric(rd->tiers[0].db_collection_handle, point_end_time_ut, + n, 0, 0, + 1, 0, flags); + rrdset_done_statistics_points_stored_per_tier[0]++; time_t now_s = (time_t)(point_end_time_ut / USEC_PER_SEC); @@ -1229,6 +1255,8 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, store_metric_at_tier(rd, tier, t, sp, point_end_time_ut); } + + rrdcontext_collected_rrddim(rd); } void store_metric_collection_completed() { @@ -1269,7 +1297,8 @@ void rrdset_thread_rda_free(void) { } static inline size_t rrdset_done_interpolate( - RRDSET *st + RRDSET_STREAM_BUFFER *rsb + , RRDSET *st , struct rda_item *rda_base , size_t rda_slots , usec_t update_every_ut @@ -1399,32 +1428,38 @@ static inline size_t rrdset_done_interpolate( time_t current_time_s = (time_t) (next_store_ut / USEC_PER_SEC); if(unlikely(!store_this_entry)) { - (void) ml_is_anomalous(rd, current_time_s, 0, false); + (void) ml_dimension_is_anomalous(rd, current_time_s, 0, false); + + if(rsb->wb && rsb->v2) + rrddim_push_metrics_v2(rsb, rd, next_store_ut, NAN, SN_FLAG_NONE); rrddim_store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE); - rrdcontext_collected_rrddim(rd); continue; } if(likely(rd->updated && rd->collections_counter > 1 && iterations < gap_when_lost_iterations_above)) { uint32_t dim_storage_flags = storage_flags; - if (ml_is_anomalous(rd, current_time_s, new_value, true)) { + if (ml_dimension_is_anomalous(rd, current_time_s, new_value, true)) { // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous dim_storage_flags &= ~((storage_number)SN_FLAG_NOT_ANOMALOUS); } + if(rsb->wb && rsb->v2) + rrddim_push_metrics_v2(rsb, rd, next_store_ut, new_value, dim_storage_flags); + rrddim_store_metric(rd, next_store_ut, new_value, dim_storage_flags); - rrdcontext_collected_rrddim(rd); rd->last_stored_value = new_value; } else { - (void) ml_is_anomalous(rd, current_time_s, 0, false); + (void) ml_dimension_is_anomalous(rd, current_time_s, 0, false); rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING ", rrddim_name(rd), current_entry); + if(rsb->wb && rsb->v2) + rrddim_push_metrics_v2(rsb, rd, next_store_ut, NAN, SN_FLAG_NONE); + rrddim_store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE); - rrdcontext_collected_rrddim(rd); rd->last_stored_value = NAN; } @@ -1468,6 +1503,10 @@ void rrdset_done(RRDSET *st) { void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) { if(unlikely(!service_running(SERVICE_COLLECTORS))) return; + RRDSET_STREAM_BUFFER stream_buffer = { .wb = NULL, }; + if(unlikely(rrdhost_has_rrdpush_sender_enabled(st->rrdhost))) + stream_buffer = rrdset_push_metric_initialize(st, now.tv_sec); + netdata_spinlock_lock(&st->data_collection_lock); if (pending_rrdset_next) @@ -1489,10 +1528,10 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) update_every_ut = st->update_every * USEC_PER_SEC; // st->update_every in microseconds RRDSET_FLAGS rrdset_flags = rrdset_flag_check(st, ~0); - if(unlikely(rrdset_flags & RRDSET_FLAG_COLLECTION_FINISHED)) + if(unlikely(rrdset_flags & RRDSET_FLAG_COLLECTION_FINISHED)) { + netdata_spinlock_unlock(&st->data_collection_lock); return; - - netdata_thread_disable_cancelability(); + } if (unlikely(rrdset_flags & RRDSET_FLAG_OBSOLETE)) { error("Chart '%s' has the OBSOLETE flag set, but it is collected.", rrdset_id(st)); @@ -1500,8 +1539,7 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) } // check if the chart has a long time to be updated - if(unlikely(st->usec_since_last_update > st->entries * update_every_ut && - st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE && st->rrd_memory_mode != RRD_MEMORY_MODE_NONE)) { + if(unlikely(st->usec_since_last_update > MAX(st->entries, 60) * update_every_ut)) { info("host '%s', chart '%s': took too long to be updated (counter #%zu, update #%zu, %0.3" NETDATA_DOUBLE_MODIFIER " secs). Resetting it.", rrdhost_hostname(st->rrdhost), rrdset_id(st), st->counter, st->counter_done, (NETDATA_DOUBLE)st->usec_since_last_update / USEC_PER_SEC); rrdset_reset(st); @@ -1527,9 +1565,6 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) // calculate the proper last_collected_time, using usec_since_last_update last_collect_ut = rrdset_update_last_collected_time(st); } - if (unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE)) { - goto after_first_database_work; - } // if this set has not been updated in the past // we fake the last_update time to be = now - usec_since_last_update @@ -1592,11 +1627,10 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) } } -after_first_database_work: st->counter_done++; - if(unlikely(rrdhost_has_rrdpush_sender_enabled(st->rrdhost))) - rrdset_done_push(st); + if(stream_buffer.wb && !stream_buffer.v2) + rrdset_push_metrics_v1(&stream_buffer, st); uint32_t has_reset_value = 0; @@ -1654,9 +1688,6 @@ after_first_database_work: rrddim_foreach_done(rd); rda_slots = dimensions; - if (unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE)) - goto after_second_database_work; - rrdset_debug(st, "last_collect_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last collection time)", (NETDATA_DOUBLE)last_collect_ut/USEC_PER_SEC); rrdset_debug(st, "now_collect_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (current collection time)", (NETDATA_DOUBLE)now_collect_ut/USEC_PER_SEC); rrdset_debug(st, "last_stored_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last updated time)", (NETDATA_DOUBLE)last_stored_ut/USEC_PER_SEC); @@ -1857,7 +1888,8 @@ after_first_database_work: // #endif rrdset_done_interpolate( - st + &stream_buffer + , st , rda_base , rda_slots , update_every_ut @@ -1869,7 +1901,6 @@ after_first_database_work: , has_reset_value ); -after_second_database_work: for(dim_id = 0, rda = rda_base ; dim_id < rda_slots ; ++dim_id, ++rda) { rd = rda->rd; if(unlikely(!rd)) continue; @@ -1928,6 +1959,7 @@ after_second_database_work: } netdata_spinlock_unlock(&st->data_collection_lock); + rrdset_push_metrics_finished(&stream_buffer, st); // ALL DONE ABOUT THE DATA UPDATE // -------------------------------------------------------------------- @@ -1955,8 +1987,6 @@ after_second_database_work: rrdcontext_collected_rrdset(st); - netdata_thread_enable_cancelability(); - store_metric_collection_completed(); } @@ -1965,18 +1995,20 @@ time_t rrdset_set_update_every_s(RRDSET *st, time_t update_every_s) { internal_error(true, "RRDSET '%s' switching update every from %d to %d", rrdset_id(st), (int)st->update_every, (int)update_every_s); - time_t prev_update_every_s = st->update_every; - st->update_every = update_every_s; + time_t prev_update_every_s = (time_t) st->update_every; + st->update_every = (int) update_every_s; // switch update every to the storage engine RRDDIM *rd; rrddim_foreach_read(rd, st) { for (size_t tier = 0; tier < storage_tiers; tier++) { if (rd->tiers[tier].db_collection_handle) - rd->tiers[tier].collect_ops->change_collection_frequency(rd->tiers[tier].db_collection_handle, (int)(st->rrdhost->db[tier].tier_grouping * st->update_every)); + storage_engine_store_change_collection_frequency( + rd->tiers[tier].db_collection_handle, + (int)(st->rrdhost->db[tier].tier_grouping * st->update_every)); } - assert(rd->update_every == prev_update_every_s && + assert(rd->update_every == (int) prev_update_every_s && "chart's update every differs from the update every of its dimensions"); rd->update_every = st->update_every; } |