diff options
Diffstat (limited to 'database/rrd.h')
-rw-r--r-- | database/rrd.h | 266 |
1 files changed, 196 insertions, 70 deletions
diff --git a/database/rrd.h b/database/rrd.h index 9c5ad6f2f..5f4bee037 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -239,7 +239,6 @@ typedef enum __attribute__ ((__packed__)) rrddim_options { RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS = (1 << 1), // do not offer RESET or OVERFLOW info to callers RRDDIM_OPTION_BACKFILLED_HIGH_TIERS = (1 << 2), // when set, we have backfilled higher tiers RRDDIM_OPTION_UPDATED = (1 << 3), // single-threaded collector updated flag - RRDDIM_OPTION_EXPOSED = (1 << 4), // single-threaded collector exposed flag // this is 8-bit } RRDDIM_OPTIONS; @@ -253,20 +252,22 @@ typedef enum __attribute__ ((__packed__)) rrddim_flags { RRDDIM_FLAG_NONE = 0, RRDDIM_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 0), - RRDDIM_FLAG_OBSOLETE = (1 << 2), // this is marked by the collector/module as obsolete + RRDDIM_FLAG_OBSOLETE = (1 << 1), // this is marked by the collector/module as obsolete // No new values have been collected for this dimension since agent start, or it was marked RRDDIM_FLAG_OBSOLETE at // least rrdset_free_obsolete_time seconds ago. - RRDDIM_FLAG_ARCHIVED = (1 << 3), - RRDDIM_FLAG_METADATA_UPDATE = (1 << 4), // Metadata needs to go to the database + RRDDIM_FLAG_ARCHIVED = (1 << 2), + RRDDIM_FLAG_METADATA_UPDATE = (1 << 3), // Metadata needs to go to the database - RRDDIM_FLAG_META_HIDDEN = (1 << 6), // Status of hidden option in the metadata database + RRDDIM_FLAG_META_HIDDEN = (1 << 4), // Status of hidden option in the metadata database + RRDDIM_FLAG_ML_MODEL_LOAD = (1 << 5), // Do ML LOAD for this dimension // this is 8 bit } RRDDIM_FLAGS; -#define rrddim_flag_check(rd, flag) (__atomic_load_n(&((rd)->flags), __ATOMIC_SEQ_CST) & (flag)) -#define rrddim_flag_set(rd, flag) __atomic_or_fetch(&((rd)->flags), (flag), __ATOMIC_SEQ_CST) -#define rrddim_flag_clear(rd, flag) __atomic_and_fetch(&((rd)->flags), ~(flag), __ATOMIC_SEQ_CST) +#define rrddim_flag_get(rd) __atomic_load_n(&((rd)->flags), __ATOMIC_ACQUIRE) +#define rrddim_flag_check(rd, flag) (__atomic_load_n(&((rd)->flags), __ATOMIC_ACQUIRE) & (flag)) +#define rrddim_flag_set(rd, flag) __atomic_or_fetch(&((rd)->flags), (flag), __ATOMIC_RELEASE) +#define rrddim_flag_clear(rd, flag) __atomic_and_fetch(&((rd)->flags), ~(flag), __ATOMIC_RELEASE) // ---------------------------------------------------------------------------- // engine-specific iterator state for dimension data collection @@ -312,7 +313,11 @@ struct rrddim { struct rrdset *rrdset; rrd_ml_dimension_t *ml_dimension; // machine learning data about this dimension - RRDMETRIC_ACQUIRED *rrdmetric; // the rrdmetric of this dimension + + struct { + RRDMETRIC_ACQUIRED *rrdmetric; // the rrdmetric of this dimension + bool collected; + } rrdcontexts; #ifdef NETDATA_LOG_COLLECTION_ERRORS usec_t rrddim_store_metric_last_ut; // the timestamp we last called rrddim_store_metric() @@ -332,6 +337,16 @@ struct rrddim { } db; // ------------------------------------------------------------------------ + // streaming + + struct { + struct { + uint32_t sent_version; + uint32_t dim_slot; + } sender; + } rrdpush; + + // ------------------------------------------------------------------------ // data collection members struct { @@ -367,10 +382,6 @@ size_t rrddim_size(void); #define rrddim_set_updated(rd) (rd)->collector.options |= RRDDIM_OPTION_UPDATED #define rrddim_clear_updated(rd) (rd)->collector.options &= ~RRDDIM_OPTION_UPDATED -#define rrddim_check_exposed(rd) ((rd)->collector.options & RRDDIM_OPTION_EXPOSED) -#define rrddim_set_exposed(rd) (rd)->collector.options |= RRDDIM_OPTION_EXPOSED -#define rrddim_clear_exposed(rd) (rd)->collector.options &= ~RRDDIM_OPTION_EXPOSED - // returns the RRDDIM cache filename, or NULL if it does not exist const char *rrddim_cache_filename(RRDDIM *rd); @@ -685,41 +696,47 @@ typedef enum __attribute__ ((__packed__)) rrdset_flags { RRDSET_FLAG_UPSTREAM_SEND = (1 << 6), // if set, this chart should be sent upstream (streaming) RRDSET_FLAG_UPSTREAM_IGNORE = (1 << 7), // if set, this chart should not be sent upstream (streaming) - RRDSET_FLAG_UPSTREAM_EXPOSED = (1 << 8), // if set, we have sent this chart definition to netdata parent (streaming) - RRDSET_FLAG_STORE_FIRST = (1 << 9), // if set, do not eliminate the first collection during interpolation - RRDSET_FLAG_HETEROGENEOUS = (1 << 10), // if set, the chart is not homogeneous (dimensions in it have multiple algorithms, multipliers or dividers) - RRDSET_FLAG_HOMOGENEOUS_CHECK = (1 << 11), // if set, the chart should be checked to determine if the dimensions are homogeneous - RRDSET_FLAG_HIDDEN = (1 << 12), // if set, do not show this chart on the dashboard, but use it for exporting - RRDSET_FLAG_SYNC_CLOCK = (1 << 13), // if set, microseconds on next data collection will be ignored (the chart will be synced to now) - RRDSET_FLAG_OBSOLETE_DIMENSIONS = (1 << 14), // this is marked by the collector/module when a chart has obsolete dimensions + RRDSET_FLAG_STORE_FIRST = (1 << 8), // if set, do not eliminate the first collection during interpolation + RRDSET_FLAG_HETEROGENEOUS = (1 << 9), // if set, the chart is not homogeneous (dimensions in it have multiple algorithms, multipliers or dividers) + RRDSET_FLAG_HOMOGENEOUS_CHECK = (1 << 10), // if set, the chart should be checked to determine if the dimensions are homogeneous + RRDSET_FLAG_HIDDEN = (1 << 11), // if set, do not show this chart on the dashboard, but use it for exporting + RRDSET_FLAG_SYNC_CLOCK = (1 << 12), // if set, microseconds on next data collection will be ignored (the chart will be synced to now) + RRDSET_FLAG_OBSOLETE_DIMENSIONS = (1 << 13), // this is marked by the collector/module when a chart has obsolete dimensions - RRDSET_FLAG_METADATA_UPDATE = (1 << 16), // Mark that metadata needs to be stored - RRDSET_FLAG_ANOMALY_DETECTION = (1 << 18), // flag to identify anomaly detection charts. - RRDSET_FLAG_INDEXED_ID = (1 << 19), // the rrdset is indexed by its id - RRDSET_FLAG_INDEXED_NAME = (1 << 20), // the rrdset is indexed by its name + RRDSET_FLAG_METADATA_UPDATE = (1 << 14), // Mark that metadata needs to be stored + RRDSET_FLAG_ANOMALY_DETECTION = (1 << 15), // flag to identify anomaly detection charts. + RRDSET_FLAG_INDEXED_ID = (1 << 16), // the rrdset is indexed by its id + RRDSET_FLAG_INDEXED_NAME = (1 << 17), // the rrdset is indexed by its name - RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 21), + RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 18), - RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS = (1 << 22), // the sending side has replication in progress - RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 23), // the sending side has completed replication - RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS = (1 << 24), // the receiving side has replication in progress - RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 25), // the receiving side has completed replication + RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS = (1 << 19), // the sending side has replication in progress + RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 20), // the sending side has completed replication + RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS = (1 << 21), // the receiving side has replication in progress + RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 22), // the receiving side has completed replication - RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 26), // a custom variable has been updated and needs to be exposed to parent + RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 23), // a custom variable has been updated and needs to be exposed to parent - RRDSET_FLAG_COLLECTION_FINISHED = (1 << 27), // when set, data collection is not available for this chart + RRDSET_FLAG_COLLECTION_FINISHED = (1 << 24), // when set, data collection is not available for this chart - RRDSET_FLAG_HAS_RRDCALC_LINKED = (1 << 28), // this chart has at least one rrdcal linked + RRDSET_FLAG_HAS_RRDCALC_LINKED = (1 << 25), // this chart has at least one rrdcal linked } RRDSET_FLAGS; -#define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_SEQ_CST) & (flag)) -#define rrdset_flag_set(st, flag) __atomic_or_fetch(&((st)->flags), flag, __ATOMIC_SEQ_CST) -#define rrdset_flag_clear(st, flag) __atomic_and_fetch(&((st)->flags), ~(flag), __ATOMIC_SEQ_CST) +#define rrdset_flag_get(st) __atomic_load_n(&((st)->flags), __ATOMIC_ACQUIRE) +#define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_ACQUIRE) & (flag)) +#define rrdset_flag_set(st, flag) __atomic_or_fetch(&((st)->flags), flag, __ATOMIC_RELEASE) +#define rrdset_flag_clear(st, flag) __atomic_and_fetch(&((st)->flags), ~(flag), __ATOMIC_RELEASE) #define rrdset_is_replicating(st) (rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS|RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS) \ && !rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED|RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED)) +struct pluginsd_rrddim { + RRDDIM_ACQUIRED *rda; + RRDDIM *rd; + const char *id; +}; + struct rrdset { uuid_t chart_uuid; // the global UUID for this chart @@ -749,6 +766,8 @@ struct rrdset { DICTIONARY *rrddimvar_root_index; // dimension variables // we use this dictionary to manage their allocation + uint32_t version; // the metadata version (auto-increment) + RRDSET_TYPE chart_type; // line, area, stacked // ------------------------------------------------------------------------ @@ -768,8 +787,11 @@ struct rrdset { RRDHOST *rrdhost; // pointer to RRDHOST this chart belongs to - RRDINSTANCE_ACQUIRED *rrdinstance; // the rrdinstance of this chart - RRDCONTEXT_ACQUIRED *rrdcontext; // the rrdcontext this chart belongs to + struct { + RRDINSTANCE_ACQUIRED *rrdinstance; // the rrdinstance of this chart + RRDCONTEXT_ACQUIRED *rrdcontext; // the rrdcontext this chart belongs to + bool collected; + } rrdcontexts; // ------------------------------------------------------------------------ // data collection members @@ -792,7 +814,15 @@ struct rrdset { // ------------------------------------------------------------------------ // data collection - streaming to parents, temp variables - time_t upstream_resync_time_s; // the timestamp up to which we should resync clock upstream + struct { + struct { + uint32_t sent_version; + uint32_t chart_slot; + uint32_t dim_last_slot_used; + + time_t resync_time_s; // the timestamp up to which we should resync clock upstream + } sender; + } rrdpush; // ------------------------------------------------------------------------ // db mode SAVE, MAP specifics @@ -835,10 +865,12 @@ struct rrdset { struct { SPINLOCK spinlock; // used only for cleanup pid_t collector_tid; + bool dims_with_slots; bool set; uint32_t pos; + int32_t last_slot; uint32_t size; - RRDDIM_ACQUIRED **rda; + struct pluginsd_rrddim *prd_array; } pluginsd; #ifdef NETDATA_LOG_REPLICATION_REQUESTS @@ -861,6 +893,54 @@ struct rrdset { #define rrdset_name(st) string2str((st)->name) #define rrdset_id(st) string2str((st)->id) +static inline uint32_t rrdset_metadata_version(RRDSET *st) { + return __atomic_load_n(&st->version, __ATOMIC_RELAXED); +} + +static inline uint32_t rrdset_metadata_upstream_version(RRDSET *st) { + return __atomic_load_n(&st->rrdpush.sender.sent_version, __ATOMIC_RELAXED); +} + +void rrdset_metadata_updated(RRDSET *st); + +static inline void rrdset_metadata_exposed_upstream(RRDSET *st, uint32_t version) { + __atomic_store_n(&st->rrdpush.sender.sent_version, version, __ATOMIC_RELAXED); +} + +static inline bool rrdset_check_upstream_exposed(RRDSET *st) { + return rrdset_metadata_version(st) == rrdset_metadata_upstream_version(st); +} + +static inline uint32_t rrddim_metadata_version(RRDDIM *rd) { + // the metadata version of the dimension, is the version of the chart + return rrdset_metadata_version(rd->rrdset); +} + +static inline uint32_t rrddim_metadata_upstream_version(RRDDIM *rd) { + return __atomic_load_n(&rd->rrdpush.sender.sent_version, __ATOMIC_RELAXED); +} + +void rrddim_metadata_updated(RRDDIM *rd); + +static inline void rrddim_metadata_exposed_upstream(RRDDIM *rd, uint32_t version) { + __atomic_store_n(&rd->rrdpush.sender.sent_version, version, __ATOMIC_RELAXED); +} + +static inline void rrddim_metadata_exposed_upstream_clear(RRDDIM *rd) { + __atomic_store_n(&rd->rrdpush.sender.sent_version, 0, __ATOMIC_RELAXED); +} + +static inline bool rrddim_check_upstream_exposed(RRDDIM *rd) { + return rrddim_metadata_upstream_version(rd) != 0; +} + +// the collector sets the exposed flag, but anyone can remove it +// still, it can be removed, after the collector has finished +// so, it is safe to check it without atomics +static inline bool rrddim_check_upstream_exposed_collector(RRDDIM *rd) { + return rd->rrdset->version == rd->rrdpush.sender.sent_version; +} + STRING *rrd_string_strdupz(const char *s); // ---------------------------------------------------------------------------- @@ -943,7 +1023,7 @@ typedef enum __attribute__ ((__packed__)) rrdhost_flags { #ifdef NETDATA_INTERNAL_CHECKS #define rrdset_debug(st, fmt, args...) do { if(unlikely(debug_flags & D_RRD_STATS && rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) \ - debug_int(__FILE__, __FUNCTION__, __LINE__, "%s: " fmt, rrdset_name(st), ##args); } while(0) + netdata_logger(NDLS_DEBUG, NDLP_DEBUG, __FILE__, __FUNCTION__, __LINE__, "%s: " fmt, rrdset_name(st), ##args); } while(0) #else #define rrdset_debug(st, fmt, args...) debug_dummy() #endif @@ -963,6 +1043,7 @@ typedef enum __attribute__ ((__packed__)) { RRDHOST_OPTION_REPLICATION = (1 << 5), // when set, we support replication for this host RRDHOST_OPTION_VIRTUAL_HOST = (1 << 6), // when set, this host is a virtual one + RRDHOST_OPTION_EPHEMERAL_HOST = (1 << 7), // when set, this host is an ephemeral one } RRDHOST_OPTIONS; #define rrdhost_option_check(host, flag) ((host)->options & (flag)) @@ -1060,7 +1141,6 @@ typedef struct health { time_t health_delay_up_to; // a timestamp to delay alarms processing up to STRING *health_default_exec; // the full path of the alarms notifications program STRING *health_default_recipient; // the default recipient for all alarms - int health_log_entries_written; // the number of alarm events written to the alarms event log uint32_t health_default_warn_repeat_every; // the default value for the interval between repeating warning notifications uint32_t health_default_crit_repeat_every; // the default value for the interval between repeating critical notifications unsigned int health_enabled; // 1 when this host has health enabled @@ -1151,6 +1231,31 @@ struct rrdhost { // ------------------------------------------------------------------------ // streaming of data to remote hosts - rrdpush sender + struct { + struct { + struct { + struct { + SPINLOCK spinlock; + + bool ignore; // when set, freeing slots will not put them in the available + uint32_t used; + uint32_t size; + uint32_t *array; + } available; // keep track of the available chart slots per host + + uint32_t last_used; // the last slot we used for a chart (increments only) + } pluginsd_chart_slots; + } send; + + struct { + struct { + SPINLOCK spinlock; // lock for the management of the allocation + uint32_t size; + RRDSET **array; + } pluginsd_chart_slots; + } receive; + } rrdpush; + char *rrdpush_send_destination; // where to send metrics to char *rrdpush_send_api_key; // the api key at the receiving netdata struct rrdpush_destinations *destinations; // a linked list of possible destinations @@ -1168,7 +1273,7 @@ struct rrdhost { struct sender_state *sender; netdata_thread_t rrdpush_sender_thread; // the sender thread size_t rrdpush_sender_replicating_charts; // the number of charts currently being replicated to a parent - void *aclk_sync_host_config; + struct aclk_sync_cfg_t *aclk_config; uint32_t rrdpush_receiver_connection_counter; // the number of times this receiver has connected uint32_t rrdpush_sender_connection_counter; // the number of times this sender has connected @@ -1321,6 +1426,7 @@ void rrddim_index_destroy(RRDSET *st); // ---------------------------------------------------------------------------- extern time_t rrdhost_free_orphan_time_s; +extern time_t rrdhost_free_ephemeral_time_s; int rrd_init(char *hostname, struct rrdhost_system_info *system_info, bool unittest); @@ -1329,30 +1435,29 @@ RRDHOST *rrdhost_find_by_guid(const char *guid); RRDHOST *find_host_by_node_id(char *node_id); RRDHOST *rrdhost_find_or_create( - const char *hostname - , const char *registry_hostname - , const char *guid - , const char *os - , const char *timezone - , const char *abbrev_timezone - , int32_t utc_offset - , const char *tags - , const char *program_name - , const char *program_version - , int update_every - , long history - , RRD_MEMORY_MODE mode - , unsigned int health_enabled - , unsigned int rrdpush_enabled - , char *rrdpush_destination - , char *rrdpush_api_key - , char *rrdpush_send_charts_matching - , bool rrdpush_enable_replication - , time_t rrdpush_seconds_to_replicate - , time_t rrdpush_replication_step - , struct rrdhost_system_info *system_info - , bool is_archived -); + const char *hostname, + const char *registry_hostname, + const char *guid, + const char *os, + const char *timezone, + const char *abbrev_timezone, + int32_t utc_offset, + const char *tags, + const char *program_name, + const char *program_version, + int update_every, + long history, + RRD_MEMORY_MODE mode, + unsigned int health_enabled, + unsigned int rrdpush_enabled, + char *rrdpush_destination, + char *rrdpush_api_key, + char *rrdpush_send_charts_matching, + bool rrdpush_enable_replication, + time_t rrdpush_seconds_to_replicate, + time_t rrdpush_replication_step, + struct rrdhost_system_info *system_info, + bool is_archived); int rrdhost_set_system_info_variable(struct rrdhost_system_info *system_info, char *name, char *value); @@ -1438,8 +1543,8 @@ void rrdset_timed_next(RRDSET *st, struct timeval now, usec_t microseconds); void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next); void rrdset_done(RRDSET *st); -void rrdset_is_obsolete(RRDSET *st); -void rrdset_isnot_obsolete(RRDSET *st); +void rrdset_is_obsolete___safe_from_collector_thread(RRDSET *st); +void rrdset_isnot_obsolete___safe_from_collector_thread(RRDSET *st); // checks if the RRDSET should be offered to viewers #define rrdset_is_available_for_viewers(st) (!rrdset_flag_check(st, RRDSET_FLAG_HIDDEN) && !rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE) && rrdset_number_of_dimensions(st) && (st)->rrd_memory_mode != RRD_MEMORY_MODE_NONE) @@ -1488,8 +1593,8 @@ RRDDIM *rrddim_find_active(RRDSET *st, const char *id); int rrddim_hide(RRDSET *st, const char *id); int rrddim_unhide(RRDSET *st, const char *id); -void rrddim_is_obsolete(RRDSET *st, RRDDIM *rd); -void rrddim_isnot_obsolete(RRDSET *st, RRDDIM *rd); +void rrddim_is_obsolete___safe_from_collector_thread(RRDSET *st, RRDDIM *rd); +void rrddim_isnot_obsolete___safe_from_collector_thread(RRDSET *st, RRDDIM *rd); collected_number rrddim_timed_set_by_pointer(RRDSET *st, RRDDIM *rd, struct timeval collected_time, collected_number value); collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value); @@ -1557,7 +1662,28 @@ static inline void rrdhost_retention(RRDHOST *host, time_t now, bool online, tim *to = online ? now : last_time_s; } +void rrdhost_pluginsd_send_chart_slots_free(RRDHOST *host); +void rrdhost_pluginsd_receive_chart_slots_free(RRDHOST *host); +void rrdset_pluginsd_receive_unslot_and_cleanup(RRDSET *st); +void rrdset_pluginsd_receive_unslot(RRDSET *st); + // ---------------------------------------------------------------------------- +static inline double rrddim_get_last_stored_value(RRDDIM *rd_dim, double *max_value, double div) { + if (!rd_dim) + return NAN; + + if (isnan(div) || div == 0.0) + div = 1.0; + + double value = rd_dim->collector.last_stored_value / div; + value = ABS(value); + + *max_value = MAX(*max_value, value); + + return value; +} + +// // RRD DB engine declarations #ifdef ENABLE_DBENGINE |