summaryrefslogtreecommitdiffstats
path: root/database/rrd.h
diff options
context:
space:
mode:
Diffstat (limited to 'database/rrd.h')
-rw-r--r--database/rrd.h266
1 files changed, 196 insertions, 70 deletions
diff --git a/database/rrd.h b/database/rrd.h
index 9c5ad6f2..5f4bee03 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -239,7 +239,6 @@ typedef enum __attribute__ ((__packed__)) rrddim_options {
RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS = (1 << 1), // do not offer RESET or OVERFLOW info to callers
RRDDIM_OPTION_BACKFILLED_HIGH_TIERS = (1 << 2), // when set, we have backfilled higher tiers
RRDDIM_OPTION_UPDATED = (1 << 3), // single-threaded collector updated flag
- RRDDIM_OPTION_EXPOSED = (1 << 4), // single-threaded collector exposed flag
// this is 8-bit
} RRDDIM_OPTIONS;
@@ -253,20 +252,22 @@ typedef enum __attribute__ ((__packed__)) rrddim_flags {
RRDDIM_FLAG_NONE = 0,
RRDDIM_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 0),
- RRDDIM_FLAG_OBSOLETE = (1 << 2), // this is marked by the collector/module as obsolete
+ RRDDIM_FLAG_OBSOLETE = (1 << 1), // this is marked by the collector/module as obsolete
// No new values have been collected for this dimension since agent start, or it was marked RRDDIM_FLAG_OBSOLETE at
// least rrdset_free_obsolete_time seconds ago.
- RRDDIM_FLAG_ARCHIVED = (1 << 3),
- RRDDIM_FLAG_METADATA_UPDATE = (1 << 4), // Metadata needs to go to the database
+ RRDDIM_FLAG_ARCHIVED = (1 << 2),
+ RRDDIM_FLAG_METADATA_UPDATE = (1 << 3), // Metadata needs to go to the database
- RRDDIM_FLAG_META_HIDDEN = (1 << 6), // Status of hidden option in the metadata database
+ RRDDIM_FLAG_META_HIDDEN = (1 << 4), // Status of hidden option in the metadata database
+ RRDDIM_FLAG_ML_MODEL_LOAD = (1 << 5), // Do ML LOAD for this dimension
// this is 8 bit
} RRDDIM_FLAGS;
-#define rrddim_flag_check(rd, flag) (__atomic_load_n(&((rd)->flags), __ATOMIC_SEQ_CST) & (flag))
-#define rrddim_flag_set(rd, flag) __atomic_or_fetch(&((rd)->flags), (flag), __ATOMIC_SEQ_CST)
-#define rrddim_flag_clear(rd, flag) __atomic_and_fetch(&((rd)->flags), ~(flag), __ATOMIC_SEQ_CST)
+#define rrddim_flag_get(rd) __atomic_load_n(&((rd)->flags), __ATOMIC_ACQUIRE)
+#define rrddim_flag_check(rd, flag) (__atomic_load_n(&((rd)->flags), __ATOMIC_ACQUIRE) & (flag))
+#define rrddim_flag_set(rd, flag) __atomic_or_fetch(&((rd)->flags), (flag), __ATOMIC_RELEASE)
+#define rrddim_flag_clear(rd, flag) __atomic_and_fetch(&((rd)->flags), ~(flag), __ATOMIC_RELEASE)
// ----------------------------------------------------------------------------
// engine-specific iterator state for dimension data collection
@@ -312,7 +313,11 @@ struct rrddim {
struct rrdset *rrdset;
rrd_ml_dimension_t *ml_dimension; // machine learning data about this dimension
- RRDMETRIC_ACQUIRED *rrdmetric; // the rrdmetric of this dimension
+
+ struct {
+ RRDMETRIC_ACQUIRED *rrdmetric; // the rrdmetric of this dimension
+ bool collected;
+ } rrdcontexts;
#ifdef NETDATA_LOG_COLLECTION_ERRORS
usec_t rrddim_store_metric_last_ut; // the timestamp we last called rrddim_store_metric()
@@ -332,6 +337,16 @@ struct rrddim {
} db;
// ------------------------------------------------------------------------
+ // streaming
+
+ struct {
+ struct {
+ uint32_t sent_version;
+ uint32_t dim_slot;
+ } sender;
+ } rrdpush;
+
+ // ------------------------------------------------------------------------
// data collection members
struct {
@@ -367,10 +382,6 @@ size_t rrddim_size(void);
#define rrddim_set_updated(rd) (rd)->collector.options |= RRDDIM_OPTION_UPDATED
#define rrddim_clear_updated(rd) (rd)->collector.options &= ~RRDDIM_OPTION_UPDATED
-#define rrddim_check_exposed(rd) ((rd)->collector.options & RRDDIM_OPTION_EXPOSED)
-#define rrddim_set_exposed(rd) (rd)->collector.options |= RRDDIM_OPTION_EXPOSED
-#define rrddim_clear_exposed(rd) (rd)->collector.options &= ~RRDDIM_OPTION_EXPOSED
-
// returns the RRDDIM cache filename, or NULL if it does not exist
const char *rrddim_cache_filename(RRDDIM *rd);
@@ -685,41 +696,47 @@ typedef enum __attribute__ ((__packed__)) rrdset_flags {
RRDSET_FLAG_UPSTREAM_SEND = (1 << 6), // if set, this chart should be sent upstream (streaming)
RRDSET_FLAG_UPSTREAM_IGNORE = (1 << 7), // if set, this chart should not be sent upstream (streaming)
- RRDSET_FLAG_UPSTREAM_EXPOSED = (1 << 8), // if set, we have sent this chart definition to netdata parent (streaming)
- RRDSET_FLAG_STORE_FIRST = (1 << 9), // if set, do not eliminate the first collection during interpolation
- RRDSET_FLAG_HETEROGENEOUS = (1 << 10), // if set, the chart is not homogeneous (dimensions in it have multiple algorithms, multipliers or dividers)
- RRDSET_FLAG_HOMOGENEOUS_CHECK = (1 << 11), // if set, the chart should be checked to determine if the dimensions are homogeneous
- RRDSET_FLAG_HIDDEN = (1 << 12), // if set, do not show this chart on the dashboard, but use it for exporting
- RRDSET_FLAG_SYNC_CLOCK = (1 << 13), // if set, microseconds on next data collection will be ignored (the chart will be synced to now)
- RRDSET_FLAG_OBSOLETE_DIMENSIONS = (1 << 14), // this is marked by the collector/module when a chart has obsolete dimensions
+ RRDSET_FLAG_STORE_FIRST = (1 << 8), // if set, do not eliminate the first collection during interpolation
+ RRDSET_FLAG_HETEROGENEOUS = (1 << 9), // if set, the chart is not homogeneous (dimensions in it have multiple algorithms, multipliers or dividers)
+ RRDSET_FLAG_HOMOGENEOUS_CHECK = (1 << 10), // if set, the chart should be checked to determine if the dimensions are homogeneous
+ RRDSET_FLAG_HIDDEN = (1 << 11), // if set, do not show this chart on the dashboard, but use it for exporting
+ RRDSET_FLAG_SYNC_CLOCK = (1 << 12), // if set, microseconds on next data collection will be ignored (the chart will be synced to now)
+ RRDSET_FLAG_OBSOLETE_DIMENSIONS = (1 << 13), // this is marked by the collector/module when a chart has obsolete dimensions
- RRDSET_FLAG_METADATA_UPDATE = (1 << 16), // Mark that metadata needs to be stored
- RRDSET_FLAG_ANOMALY_DETECTION = (1 << 18), // flag to identify anomaly detection charts.
- RRDSET_FLAG_INDEXED_ID = (1 << 19), // the rrdset is indexed by its id
- RRDSET_FLAG_INDEXED_NAME = (1 << 20), // the rrdset is indexed by its name
+ RRDSET_FLAG_METADATA_UPDATE = (1 << 14), // Mark that metadata needs to be stored
+ RRDSET_FLAG_ANOMALY_DETECTION = (1 << 15), // flag to identify anomaly detection charts.
+ RRDSET_FLAG_INDEXED_ID = (1 << 16), // the rrdset is indexed by its id
+ RRDSET_FLAG_INDEXED_NAME = (1 << 17), // the rrdset is indexed by its name
- RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 21),
+ RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 18),
- RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS = (1 << 22), // the sending side has replication in progress
- RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 23), // the sending side has completed replication
- RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS = (1 << 24), // the receiving side has replication in progress
- RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 25), // the receiving side has completed replication
+ RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS = (1 << 19), // the sending side has replication in progress
+ RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 20), // the sending side has completed replication
+ RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS = (1 << 21), // the receiving side has replication in progress
+ RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 22), // the receiving side has completed replication
- RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 26), // a custom variable has been updated and needs to be exposed to parent
+ RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 23), // a custom variable has been updated and needs to be exposed to parent
- RRDSET_FLAG_COLLECTION_FINISHED = (1 << 27), // when set, data collection is not available for this chart
+ RRDSET_FLAG_COLLECTION_FINISHED = (1 << 24), // when set, data collection is not available for this chart
- RRDSET_FLAG_HAS_RRDCALC_LINKED = (1 << 28), // this chart has at least one rrdcal linked
+ RRDSET_FLAG_HAS_RRDCALC_LINKED = (1 << 25), // this chart has at least one rrdcal linked
} RRDSET_FLAGS;
-#define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_SEQ_CST) & (flag))
-#define rrdset_flag_set(st, flag) __atomic_or_fetch(&((st)->flags), flag, __ATOMIC_SEQ_CST)
-#define rrdset_flag_clear(st, flag) __atomic_and_fetch(&((st)->flags), ~(flag), __ATOMIC_SEQ_CST)
+#define rrdset_flag_get(st) __atomic_load_n(&((st)->flags), __ATOMIC_ACQUIRE)
+#define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_ACQUIRE) & (flag))
+#define rrdset_flag_set(st, flag) __atomic_or_fetch(&((st)->flags), flag, __ATOMIC_RELEASE)
+#define rrdset_flag_clear(st, flag) __atomic_and_fetch(&((st)->flags), ~(flag), __ATOMIC_RELEASE)
#define rrdset_is_replicating(st) (rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS|RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS) \
&& !rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED|RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
+struct pluginsd_rrddim {
+ RRDDIM_ACQUIRED *rda;
+ RRDDIM *rd;
+ const char *id;
+};
+
struct rrdset {
uuid_t chart_uuid; // the global UUID for this chart
@@ -749,6 +766,8 @@ struct rrdset {
DICTIONARY *rrddimvar_root_index; // dimension variables
// we use this dictionary to manage their allocation
+ uint32_t version; // the metadata version (auto-increment)
+
RRDSET_TYPE chart_type; // line, area, stacked
// ------------------------------------------------------------------------
@@ -768,8 +787,11 @@ struct rrdset {
RRDHOST *rrdhost; // pointer to RRDHOST this chart belongs to
- RRDINSTANCE_ACQUIRED *rrdinstance; // the rrdinstance of this chart
- RRDCONTEXT_ACQUIRED *rrdcontext; // the rrdcontext this chart belongs to
+ struct {
+ RRDINSTANCE_ACQUIRED *rrdinstance; // the rrdinstance of this chart
+ RRDCONTEXT_ACQUIRED *rrdcontext; // the rrdcontext this chart belongs to
+ bool collected;
+ } rrdcontexts;
// ------------------------------------------------------------------------
// data collection members
@@ -792,7 +814,15 @@ struct rrdset {
// ------------------------------------------------------------------------
// data collection - streaming to parents, temp variables
- time_t upstream_resync_time_s; // the timestamp up to which we should resync clock upstream
+ struct {
+ struct {
+ uint32_t sent_version;
+ uint32_t chart_slot;
+ uint32_t dim_last_slot_used;
+
+ time_t resync_time_s; // the timestamp up to which we should resync clock upstream
+ } sender;
+ } rrdpush;
// ------------------------------------------------------------------------
// db mode SAVE, MAP specifics
@@ -835,10 +865,12 @@ struct rrdset {
struct {
SPINLOCK spinlock; // used only for cleanup
pid_t collector_tid;
+ bool dims_with_slots;
bool set;
uint32_t pos;
+ int32_t last_slot;
uint32_t size;
- RRDDIM_ACQUIRED **rda;
+ struct pluginsd_rrddim *prd_array;
} pluginsd;
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
@@ -861,6 +893,54 @@ struct rrdset {
#define rrdset_name(st) string2str((st)->name)
#define rrdset_id(st) string2str((st)->id)
+static inline uint32_t rrdset_metadata_version(RRDSET *st) {
+ return __atomic_load_n(&st->version, __ATOMIC_RELAXED);
+}
+
+static inline uint32_t rrdset_metadata_upstream_version(RRDSET *st) {
+ return __atomic_load_n(&st->rrdpush.sender.sent_version, __ATOMIC_RELAXED);
+}
+
+void rrdset_metadata_updated(RRDSET *st);
+
+static inline void rrdset_metadata_exposed_upstream(RRDSET *st, uint32_t version) {
+ __atomic_store_n(&st->rrdpush.sender.sent_version, version, __ATOMIC_RELAXED);
+}
+
+static inline bool rrdset_check_upstream_exposed(RRDSET *st) {
+ return rrdset_metadata_version(st) == rrdset_metadata_upstream_version(st);
+}
+
+static inline uint32_t rrddim_metadata_version(RRDDIM *rd) {
+ // the metadata version of the dimension, is the version of the chart
+ return rrdset_metadata_version(rd->rrdset);
+}
+
+static inline uint32_t rrddim_metadata_upstream_version(RRDDIM *rd) {
+ return __atomic_load_n(&rd->rrdpush.sender.sent_version, __ATOMIC_RELAXED);
+}
+
+void rrddim_metadata_updated(RRDDIM *rd);
+
+static inline void rrddim_metadata_exposed_upstream(RRDDIM *rd, uint32_t version) {
+ __atomic_store_n(&rd->rrdpush.sender.sent_version, version, __ATOMIC_RELAXED);
+}
+
+static inline void rrddim_metadata_exposed_upstream_clear(RRDDIM *rd) {
+ __atomic_store_n(&rd->rrdpush.sender.sent_version, 0, __ATOMIC_RELAXED);
+}
+
+static inline bool rrddim_check_upstream_exposed(RRDDIM *rd) {
+ return rrddim_metadata_upstream_version(rd) != 0;
+}
+
+// the collector sets the exposed flag, but anyone can remove it
+// still, it can be removed, after the collector has finished
+// so, it is safe to check it without atomics
+static inline bool rrddim_check_upstream_exposed_collector(RRDDIM *rd) {
+ return rd->rrdset->version == rd->rrdpush.sender.sent_version;
+}
+
STRING *rrd_string_strdupz(const char *s);
// ----------------------------------------------------------------------------
@@ -943,7 +1023,7 @@ typedef enum __attribute__ ((__packed__)) rrdhost_flags {
#ifdef NETDATA_INTERNAL_CHECKS
#define rrdset_debug(st, fmt, args...) do { if(unlikely(debug_flags & D_RRD_STATS && rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) \
- debug_int(__FILE__, __FUNCTION__, __LINE__, "%s: " fmt, rrdset_name(st), ##args); } while(0)
+ netdata_logger(NDLS_DEBUG, NDLP_DEBUG, __FILE__, __FUNCTION__, __LINE__, "%s: " fmt, rrdset_name(st), ##args); } while(0)
#else
#define rrdset_debug(st, fmt, args...) debug_dummy()
#endif
@@ -963,6 +1043,7 @@ typedef enum __attribute__ ((__packed__)) {
RRDHOST_OPTION_REPLICATION = (1 << 5), // when set, we support replication for this host
RRDHOST_OPTION_VIRTUAL_HOST = (1 << 6), // when set, this host is a virtual one
+ RRDHOST_OPTION_EPHEMERAL_HOST = (1 << 7), // when set, this host is an ephemeral one
} RRDHOST_OPTIONS;
#define rrdhost_option_check(host, flag) ((host)->options & (flag))
@@ -1060,7 +1141,6 @@ typedef struct health {
time_t health_delay_up_to; // a timestamp to delay alarms processing up to
STRING *health_default_exec; // the full path of the alarms notifications program
STRING *health_default_recipient; // the default recipient for all alarms
- int health_log_entries_written; // the number of alarm events written to the alarms event log
uint32_t health_default_warn_repeat_every; // the default value for the interval between repeating warning notifications
uint32_t health_default_crit_repeat_every; // the default value for the interval between repeating critical notifications
unsigned int health_enabled; // 1 when this host has health enabled
@@ -1151,6 +1231,31 @@ struct rrdhost {
// ------------------------------------------------------------------------
// streaming of data to remote hosts - rrdpush sender
+ struct {
+ struct {
+ struct {
+ struct {
+ SPINLOCK spinlock;
+
+ bool ignore; // when set, freeing slots will not put them in the available
+ uint32_t used;
+ uint32_t size;
+ uint32_t *array;
+ } available; // keep track of the available chart slots per host
+
+ uint32_t last_used; // the last slot we used for a chart (increments only)
+ } pluginsd_chart_slots;
+ } send;
+
+ struct {
+ struct {
+ SPINLOCK spinlock; // lock for the management of the allocation
+ uint32_t size;
+ RRDSET **array;
+ } pluginsd_chart_slots;
+ } receive;
+ } rrdpush;
+
char *rrdpush_send_destination; // where to send metrics to
char *rrdpush_send_api_key; // the api key at the receiving netdata
struct rrdpush_destinations *destinations; // a linked list of possible destinations
@@ -1168,7 +1273,7 @@ struct rrdhost {
struct sender_state *sender;
netdata_thread_t rrdpush_sender_thread; // the sender thread
size_t rrdpush_sender_replicating_charts; // the number of charts currently being replicated to a parent
- void *aclk_sync_host_config;
+ struct aclk_sync_cfg_t *aclk_config;
uint32_t rrdpush_receiver_connection_counter; // the number of times this receiver has connected
uint32_t rrdpush_sender_connection_counter; // the number of times this sender has connected
@@ -1321,6 +1426,7 @@ void rrddim_index_destroy(RRDSET *st);
// ----------------------------------------------------------------------------
extern time_t rrdhost_free_orphan_time_s;
+extern time_t rrdhost_free_ephemeral_time_s;
int rrd_init(char *hostname, struct rrdhost_system_info *system_info, bool unittest);
@@ -1329,30 +1435,29 @@ RRDHOST *rrdhost_find_by_guid(const char *guid);
RRDHOST *find_host_by_node_id(char *node_id);
RRDHOST *rrdhost_find_or_create(
- const char *hostname
- , const char *registry_hostname
- , const char *guid
- , const char *os
- , const char *timezone
- , const char *abbrev_timezone
- , int32_t utc_offset
- , const char *tags
- , const char *program_name
- , const char *program_version
- , int update_every
- , long history
- , RRD_MEMORY_MODE mode
- , unsigned int health_enabled
- , unsigned int rrdpush_enabled
- , char *rrdpush_destination
- , char *rrdpush_api_key
- , char *rrdpush_send_charts_matching
- , bool rrdpush_enable_replication
- , time_t rrdpush_seconds_to_replicate
- , time_t rrdpush_replication_step
- , struct rrdhost_system_info *system_info
- , bool is_archived
-);
+ const char *hostname,
+ const char *registry_hostname,
+ const char *guid,
+ const char *os,
+ const char *timezone,
+ const char *abbrev_timezone,
+ int32_t utc_offset,
+ const char *tags,
+ const char *program_name,
+ const char *program_version,
+ int update_every,
+ long history,
+ RRD_MEMORY_MODE mode,
+ unsigned int health_enabled,
+ unsigned int rrdpush_enabled,
+ char *rrdpush_destination,
+ char *rrdpush_api_key,
+ char *rrdpush_send_charts_matching,
+ bool rrdpush_enable_replication,
+ time_t rrdpush_seconds_to_replicate,
+ time_t rrdpush_replication_step,
+ struct rrdhost_system_info *system_info,
+ bool is_archived);
int rrdhost_set_system_info_variable(struct rrdhost_system_info *system_info, char *name, char *value);
@@ -1438,8 +1543,8 @@ void rrdset_timed_next(RRDSET *st, struct timeval now, usec_t microseconds);
void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next);
void rrdset_done(RRDSET *st);
-void rrdset_is_obsolete(RRDSET *st);
-void rrdset_isnot_obsolete(RRDSET *st);
+void rrdset_is_obsolete___safe_from_collector_thread(RRDSET *st);
+void rrdset_isnot_obsolete___safe_from_collector_thread(RRDSET *st);
// checks if the RRDSET should be offered to viewers
#define rrdset_is_available_for_viewers(st) (!rrdset_flag_check(st, RRDSET_FLAG_HIDDEN) && !rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE) && rrdset_number_of_dimensions(st) && (st)->rrd_memory_mode != RRD_MEMORY_MODE_NONE)
@@ -1488,8 +1593,8 @@ RRDDIM *rrddim_find_active(RRDSET *st, const char *id);
int rrddim_hide(RRDSET *st, const char *id);
int rrddim_unhide(RRDSET *st, const char *id);
-void rrddim_is_obsolete(RRDSET *st, RRDDIM *rd);
-void rrddim_isnot_obsolete(RRDSET *st, RRDDIM *rd);
+void rrddim_is_obsolete___safe_from_collector_thread(RRDSET *st, RRDDIM *rd);
+void rrddim_isnot_obsolete___safe_from_collector_thread(RRDSET *st, RRDDIM *rd);
collected_number rrddim_timed_set_by_pointer(RRDSET *st, RRDDIM *rd, struct timeval collected_time, collected_number value);
collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value);
@@ -1557,7 +1662,28 @@ static inline void rrdhost_retention(RRDHOST *host, time_t now, bool online, tim
*to = online ? now : last_time_s;
}
+void rrdhost_pluginsd_send_chart_slots_free(RRDHOST *host);
+void rrdhost_pluginsd_receive_chart_slots_free(RRDHOST *host);
+void rrdset_pluginsd_receive_unslot_and_cleanup(RRDSET *st);
+void rrdset_pluginsd_receive_unslot(RRDSET *st);
+
// ----------------------------------------------------------------------------
+static inline double rrddim_get_last_stored_value(RRDDIM *rd_dim, double *max_value, double div) {
+ if (!rd_dim)
+ return NAN;
+
+ if (isnan(div) || div == 0.0)
+ div = 1.0;
+
+ double value = rd_dim->collector.last_stored_value / div;
+ value = ABS(value);
+
+ *max_value = MAX(*max_value, value);
+
+ return value;
+}
+
+//
// RRD DB engine declarations
#ifdef ENABLE_DBENGINE