diff options
Diffstat (limited to 'src/database/contexts/context.c')
-rw-r--r-- | src/database/contexts/context.c | 342 |
1 files changed, 342 insertions, 0 deletions
diff --git a/src/database/contexts/context.c b/src/database/contexts/context.c new file mode 100644 index 000000000..33037eb93 --- /dev/null +++ b/src/database/contexts/context.c @@ -0,0 +1,342 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "internal.h" + +inline const char *rrdcontext_acquired_id(RRDCONTEXT_ACQUIRED *rca) { + RRDCONTEXT *rc = rrdcontext_acquired_value(rca); + return string2str(rc->id); +} + +inline bool rrdcontext_acquired_belongs_to_host(RRDCONTEXT_ACQUIRED *rca, RRDHOST *host) { + RRDCONTEXT *rc = rrdcontext_acquired_value(rca); + return rc->rrdhost == host; +} + +// ---------------------------------------------------------------------------- +// RRDCONTEXT + +static void rrdcontext_freez(RRDCONTEXT *rc) { + string_freez(rc->id); + string_freez(rc->title); + string_freez(rc->units); + string_freez(rc->family); +} + +static void rrdcontext_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost) { + RRDHOST *host = (RRDHOST *)rrdhost; + RRDCONTEXT *rc = (RRDCONTEXT *)value; + + rc->rrdhost = host; + rc->flags = rc->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics at constructor + + if(rc->hub.version) { + // we are loading data from the SQL database + + if(rc->version) + netdata_log_error("RRDCONTEXT: context '%s' is already initialized with version %"PRIu64", but it is loaded again from SQL with version %"PRIu64"", string2str(rc->id), rc->version, rc->hub.version); + + // IMPORTANT + // replace all string pointers in rc->hub with our own versions + // the originals are coming from a tmp allocation of sqlite + + string_freez(rc->id); + rc->id = string_strdupz(rc->hub.id); + rc->hub.id = string2str(rc->id); + + string_freez(rc->title); + rc->title = string_strdupz(rc->hub.title); + rc->hub.title = string2str(rc->title); + + string_freez(rc->units); + rc->units = string_strdupz(rc->hub.units); + rc->hub.units = string2str(rc->units); + + string_freez(rc->family); + rc->family = string_strdupz(rc->hub.family); + rc->hub.family = string2str(rc->family); + + rc->chart_type = rrdset_type_id(rc->hub.chart_type); + rc->hub.chart_type = rrdset_type_name(rc->chart_type); + + rc->version = rc->hub.version; + rc->priority = rc->hub.priority; + rc->first_time_s = (time_t)rc->hub.first_time_s; + rc->last_time_s = (time_t)rc->hub.last_time_s; + + if(rc->hub.deleted || !rc->hub.first_time_s) + rrd_flag_set_deleted(rc, RRD_FLAG_NONE); + else { + if (rc->last_time_s == 0) + rrd_flag_set_collected(rc); + else + rrd_flag_set_archived(rc); + } + + rc->flags |= RRD_FLAG_UPDATE_REASON_LOAD_SQL; // no need for atomics at constructor + } + else { + // we are adding this context now for the first time + rc->version = now_realtime_sec(); + } + + rrdinstances_create_in_rrdcontext(rc); + spinlock_init(&rc->spinlock); + + // signal the react callback to do the job + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_NEW_OBJECT); +} + +static void rrdcontext_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost __maybe_unused) { + + RRDCONTEXT *rc = (RRDCONTEXT *)value; + + rrdinstances_destroy_from_rrdcontext(rc); + rrdcontext_freez(rc); +} + +typedef enum __attribute__((packed)) { + OLDNEW_KEEP_OLD, + OLDNEW_USE_NEW, + OLDNEW_MERGE, +} OLDNEW; + +static inline OLDNEW oldnew_decide(bool archived, bool new_archived) { + if(archived && !new_archived) + return OLDNEW_USE_NEW; + + if(!archived && new_archived) + return OLDNEW_KEEP_OLD; + + return OLDNEW_MERGE; +} + +static inline void string_replace(STRING **stringpp, STRING *new_string) { + STRING *old = *stringpp; + *stringpp = string_dup(new_string); + string_freez(old); +} + +static inline void string_merge(STRING **stringpp, STRING *new_string) { + STRING *old = *stringpp; + *stringpp = string_2way_merge(*stringpp, new_string); + string_freez(old); +} + +static void rrdcontext_merge_with(RRDCONTEXT *rc, bool archived, STRING *title, STRING *family, STRING *units, RRDSET_TYPE chart_type, uint32_t priority) { + OLDNEW oldnew = oldnew_decide(rrd_flag_is_archived(rc), archived); + + switch(oldnew) { + case OLDNEW_KEEP_OLD: + break; + + case OLDNEW_USE_NEW: + if(rc->title != title) { + string_replace(&rc->title, title); + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + if(rc->family != family) { + string_replace(&rc->family, family); + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + break; + + case OLDNEW_MERGE: + if(rc->title != title) { + string_merge(&rc->title, title); + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + if(rc->family != family) { + string_merge(&rc->family, family); + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + break; + } + + switch(oldnew) { + case OLDNEW_KEEP_OLD: + break; + + case OLDNEW_USE_NEW: + case OLDNEW_MERGE: + if(rc->units != units) { + string_replace(&rc->units, units); + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + + if(rc->chart_type != chart_type) { + rc->chart_type = chart_type; + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + + if(rc->priority != priority) { + rc->priority = priority; + rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA); + } + break; + } +} + +void rrdcontext_update_from_collected_rrdinstance(RRDINSTANCE *ri) { + rrdcontext_merge_with(ri->rc, rrd_flag_is_archived(ri), + ri->title, ri->family, ri->units, ri->chart_type, ri->priority); +} + +static bool rrdcontext_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *rrdhost __maybe_unused) { + RRDCONTEXT *rc = (RRDCONTEXT *)old_value; + RRDCONTEXT *rc_new = (RRDCONTEXT *)new_value; + + //current rc is not archived, new_rc is archived, don't merge + if (!rrd_flag_is_archived(rc) && rrd_flag_is_archived(rc_new)) { + rrdcontext_freez(rc_new); + return false; + } + + rrdcontext_lock(rc); + + rrdcontext_merge_with(rc, rrd_flag_is_archived(rc_new), + rc_new->title, rc_new->family, rc_new->units, rc_new->chart_type, rc_new->priority); + + rrd_flag_set(rc, rc_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no need for atomics on rc_new + + if(rrd_flag_is_collected(rc) && rrd_flag_is_archived(rc)) + rrd_flag_set_collected(rc); + + if(rrd_flag_is_updated(rc)) + rrd_flag_set(rc, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT); + + rrdcontext_unlock(rc); + + // free the resources of the new one + rrdcontext_freez(rc_new); + + // the react callback will continue from here + return rrd_flag_is_updated(rc); +} + +static void rrdcontext_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost __maybe_unused) { + RRDCONTEXT *rc = (RRDCONTEXT *)value; + rrdcontext_trigger_updates(rc, __FUNCTION__ ); +} + +void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function) { + if(rrd_flag_is_updated(rc) || !rrd_flag_check(rc, RRD_FLAG_LIVE_RETENTION)) + rrdcontext_queue_for_post_processing(rc, function, rc->flags); +} + +static void rrdcontext_hub_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) { + RRDCONTEXT *rc = context; + rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB); + rc->queue.queued_ut = now_realtime_usec(); + rc->queue.queued_flags = rrd_flags_get(rc); +} + +static void rrdcontext_hub_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) { + RRDCONTEXT *rc = context; + rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB); +} + +static bool rrdcontext_hub_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) { + // context and new_context are the same + // we just need to update the timings + RRDCONTEXT *rc = context; + rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB); + rc->queue.queued_ut = now_realtime_usec(); + rc->queue.queued_flags |= rrd_flags_get(rc); + + return true; +} + +static void rrdcontext_post_processing_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) { + RRDCONTEXT *rc = context; + rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP); + rc->pp.queued_flags = rc->flags; + rc->pp.queued_ut = now_realtime_usec(); +} + +static void rrdcontext_post_processing_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) { + RRDCONTEXT *rc = context; + + // IMPORTANT: + // Do not rely on this flag being absent, because the dictionaries have delayed deletions (garbage collect) + // so, this flag may not be deleted immediately from the context. + rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_PP); + rc->pp.dequeued_ut = now_realtime_usec(); +} + +static bool rrdcontext_post_processing_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) { + RRDCONTEXT *rc = context; + bool changed = false; + + if(!(rc->flags & RRD_FLAG_QUEUED_FOR_PP)) { + rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP); + changed = true; + } + + if(rc->pp.queued_flags != rc->flags) { + rc->pp.queued_flags |= rc->flags; + changed = true; + } + + return changed; +} + + +void rrdhost_create_rrdcontexts(RRDHOST *host) { + if(unlikely(!host)) return; + if(likely(host->rrdctx.contexts)) return; + + host->rrdctx.contexts = dictionary_create_advanced( + DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + &dictionary_stats_category_rrdcontext, sizeof(RRDCONTEXT)); + + dictionary_register_insert_callback(host->rrdctx.contexts, rrdcontext_insert_callback, host); + dictionary_register_delete_callback(host->rrdctx.contexts, rrdcontext_delete_callback, host); + dictionary_register_conflict_callback(host->rrdctx.contexts, rrdcontext_conflict_callback, host); + dictionary_register_react_callback(host->rrdctx.contexts, rrdcontext_react_callback, host); + + host->rrdctx.hub_queue = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0); + dictionary_register_insert_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_insert_callback, NULL); + dictionary_register_delete_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_delete_callback, NULL); + dictionary_register_conflict_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_conflict_callback, NULL); + + host->rrdctx.pp_queue = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0); + dictionary_register_insert_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_insert_callback, NULL); + dictionary_register_delete_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_delete_callback, NULL); + dictionary_register_conflict_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_conflict_callback, NULL); +} + +void rrdhost_destroy_rrdcontexts(RRDHOST *host) { + if(unlikely(!host)) return; + if(unlikely(!host->rrdctx.contexts)) return; + + DICTIONARY *old; + + if(host->rrdctx.hub_queue) { + old = host->rrdctx.hub_queue; + host->rrdctx.hub_queue = NULL; + + RRDCONTEXT *rc; + dfe_start_write(old, rc) { + dictionary_del(old, string2str(rc->id)); + } + dfe_done(rc); + dictionary_destroy(old); + } + + if(host->rrdctx.pp_queue) { + old = host->rrdctx.pp_queue; + host->rrdctx.pp_queue = NULL; + + RRDCONTEXT *rc; + dfe_start_write(old, rc) { + dictionary_del(old, string2str(rc->id)); + } + dfe_done(rc); + dictionary_destroy(old); + } + + old = host->rrdctx.contexts; + host->rrdctx.contexts = NULL; + dictionary_destroy(old); +} + |