diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
commit | a836a244a3d2bdd4da1ee2641e3e957850668cea (patch) | |
tree | cb87c75b3677fab7144f868435243f864048a1e6 /database/rrd.h | |
parent | Adding upstream version 1.38.1. (diff) | |
download | netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip |
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/rrd.h')
-rw-r--r-- | database/rrd.h | 432 |
1 files changed, 296 insertions, 136 deletions
diff --git a/database/rrd.h b/database/rrd.h index 42eeb1655..0f67a3b77 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -30,9 +30,9 @@ typedef struct rrdhost_acquired RRDHOST_ACQUIRED; typedef struct rrdset_acquired RRDSET_ACQUIRED; typedef struct rrddim_acquired RRDDIM_ACQUIRED; -typedef struct ml_host ml_host_t; -typedef struct ml_chart ml_chart_t; -typedef struct ml_dimension ml_dimension_t; +typedef struct ml_host rrd_ml_host_t; +typedef struct ml_chart rrd_ml_chart_t; +typedef struct ml_dimension rrd_ml_dimension_t; typedef enum __attribute__ ((__packed__)) { QUERY_SOURCE_UNKNOWN = 0, @@ -54,6 +54,9 @@ typedef enum __attribute__ ((__packed__)) storage_priority { STORAGE_PRIORITY_LOW, STORAGE_PRIORITY_BEST_EFFORT, + // synchronous query, not to be dispatched to workers or queued + STORAGE_PRIORITY_SYNCHRONOUS, + STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE, } STORAGE_PRIORITY; @@ -106,30 +109,39 @@ RRD_MEMORY_MODE rrd_memory_mode_id(const char *name); typedef struct storage_query_handle STORAGE_QUERY_HANDLE; +typedef enum __attribute__ ((__packed__)) { + STORAGE_ENGINE_BACKEND_RRDDIM = 1, + STORAGE_ENGINE_BACKEND_DBENGINE = 2, +} STORAGE_ENGINE_BACKEND; + +#define is_valid_backend(backend) ((backend) >= STORAGE_ENGINE_BACKEND_RRDDIM && (backend) <= STORAGE_ENGINE_BACKEND_DBENGINE) + // iterator state for RRD dimension data queries struct storage_engine_query_handle { time_t start_time_s; time_t end_time_s; STORAGE_PRIORITY priority; - STORAGE_QUERY_HANDLE* handle; + STORAGE_ENGINE_BACKEND backend; + STORAGE_QUERY_HANDLE *handle; }; -typedef struct storage_point { - NETDATA_DOUBLE min; // when count > 1, this is the minimum among them - NETDATA_DOUBLE max; // when count > 1, this is the maximum among them - NETDATA_DOUBLE sum; // the point sum - divided by count gives the average +// ---------------------------------------------------------------------------- +// chart types - // end_time - start_time = point duration - time_t start_time_s; // the time the point starts - time_t end_time_s; // the time the point ends +typedef enum __attribute__ ((__packed__)) rrdset_type { + RRDSET_TYPE_LINE = 0, + RRDSET_TYPE_AREA = 1, + RRDSET_TYPE_STACKED = 2, +} RRDSET_TYPE; - size_t count; // the number of original points aggregated - size_t anomaly_count; // the number of original points found anomalous +#define RRDSET_TYPE_LINE_NAME "line" +#define RRDSET_TYPE_AREA_NAME "area" +#define RRDSET_TYPE_STACKED_NAME "stacked" - SN_FLAGS flags; // flags stored with the point -} STORAGE_POINT; +RRDSET_TYPE rrdset_type_id(const char *name); +const char *rrdset_type_name(RRDSET_TYPE chart_type); -#include "rrdcontext.h" +#include "contexts/rrdcontext.h" extern bool unittest_running; extern bool dbengine_enabled; @@ -158,39 +170,23 @@ extern time_t rrdset_free_obsolete_time_s; #if defined(ENV32BIT) #define MIN_LIBUV_WORKER_THREADS 8 -#define MAX_LIBUV_WORKER_THREADS 64 +#define MAX_LIBUV_WORKER_THREADS 128 #define RESERVED_LIBUV_WORKER_THREADS 3 #else #define MIN_LIBUV_WORKER_THREADS 16 -#define MAX_LIBUV_WORKER_THREADS 128 +#define MAX_LIBUV_WORKER_THREADS 1024 #define RESERVED_LIBUV_WORKER_THREADS 6 #endif extern int libuv_worker_threads; +extern bool ieee754_doubles; -#define RRD_ID_LENGTH_MAX 200 +#define RRD_ID_LENGTH_MAX 1000 typedef long long total_number; #define TOTAL_NUMBER_FORMAT "%lld" // ---------------------------------------------------------------------------- -// chart types - -typedef enum __attribute__ ((__packed__)) rrdset_type { - RRDSET_TYPE_LINE = 0, - RRDSET_TYPE_AREA = 1, - RRDSET_TYPE_STACKED = 2, -} RRDSET_TYPE; - -#define RRDSET_TYPE_LINE_NAME "line" -#define RRDSET_TYPE_AREA_NAME "area" -#define RRDSET_TYPE_STACKED_NAME "stacked" - -RRDSET_TYPE rrdset_type_id(const char *name); -const char *rrdset_type_name(RRDSET_TYPE chart_type); - - -// ---------------------------------------------------------------------------- // algorithms types typedef enum __attribute__ ((__packed__)) rrd_algorithm { @@ -279,7 +275,11 @@ void rrdlabels_destroy(DICTIONARY *labels_dict); void rrdlabels_add(DICTIONARY *dict, const char *name, const char *value, RRDLABEL_SRC ls); void rrdlabels_add_pair(DICTIONARY *dict, const char *string, RRDLABEL_SRC ls); void rrdlabels_get_value_to_buffer_or_null(DICTIONARY *labels, BUFFER *wb, const char *key, const char *quote, const char *null); -void rrdlabels_get_value_to_char_or_null(DICTIONARY *labels, char **value, const char *key); +void rrdlabels_value_to_buffer_array_item_or_null(DICTIONARY *labels, BUFFER *wb, const char *key); +void rrdlabels_get_value_strdup_or_null(DICTIONARY *labels, char **value, const char *key); +void rrdlabels_get_value_strcpyz(DICTIONARY *labels, char *dst, size_t dst_len, const char *key); +STRING *rrdlabels_get_value_string_dup(DICTIONARY *labels, const char *key); +STRING *rrdlabels_get_value_to_buffer_or_unset(DICTIONARY *labels, BUFFER *wb, const char *key, const char *unset); void rrdlabels_flush(DICTIONARY *labels_dict); void rrdlabels_unmark_all(DICTIONARY *labels); @@ -290,8 +290,10 @@ int rrdlabels_sorted_walkthrough_read(DICTIONARY *labels, int (*callback)(const void rrdlabels_log_to_buffer(DICTIONARY *labels, BUFFER *wb); bool rrdlabels_match_simple_pattern(DICTIONARY *labels, const char *simple_pattern_txt); -bool rrdlabels_match_simple_pattern_parsed(DICTIONARY *labels, SIMPLE_PATTERN *pattern, char equal); + +bool rrdlabels_match_simple_pattern_parsed(DICTIONARY *labels, SIMPLE_PATTERN *pattern, char equal, size_t *searches); int rrdlabels_to_buffer(DICTIONARY *labels, BUFFER *wb, const char *before_each, const char *equal, const char *quote, const char *between_them, bool (*filter_callback)(const char *name, const char *value, RRDLABEL_SRC ls, void *data), void *filter_data, void (*name_sanitizer)(char *dst, const char *src, size_t dst_size), void (*value_sanitizer)(char *dst, const char *src, size_t dst_size)); +void rrdlabels_to_buffer_json_members(DICTIONARY *labels, BUFFER *wb); void rrdlabels_migrate_to_these(DICTIONARY *dst, DICTIONARY *src); void rrdlabels_copy(DICTIONARY *dst, DICTIONARY *src); @@ -307,19 +309,20 @@ bool exporting_labels_filter_callback(const char *name, const char *value, RRDLA // ---------------------------------------------------------------------------- // engine-specific iterator state for dimension data collection -typedef struct storage_collect_handle STORAGE_COLLECT_HANDLE; +typedef struct storage_collect_handle { + STORAGE_ENGINE_BACKEND backend; +} STORAGE_COLLECT_HANDLE; // ---------------------------------------------------------------------------- // Storage tier data for every dimension struct rrddim_tier { STORAGE_POINT virtual_point; - size_t tier_grouping; + STORAGE_ENGINE_BACKEND backend; + uint32_t tier_grouping; time_t next_point_end_time_s; STORAGE_METRIC_HANDLE *db_metric_handle; // the metric handle inside the database STORAGE_COLLECT_HANDLE *db_collection_handle; // the data collection handle - struct storage_engine_collect_ops *collect_ops; - struct storage_engine_query_ops *query_ops; }; void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s); @@ -354,7 +357,7 @@ struct rrddim { // ------------------------------------------------------------------------ // operational state members - ml_dimension_t *ml_dimension; // machine learning data about this dimension + rrd_ml_dimension_t *ml_dimension; // machine learning data about this dimension // ------------------------------------------------------------------------ // linking to siblings and parents @@ -417,82 +420,215 @@ size_t rrddim_memory_file_header_size(void); void rrddim_memory_file_save(RRDDIM *rd); -// ---------------------------------------------------------------------------- +// ------------------------------------------------------------------------ +// DATA COLLECTION STORAGE OPS -#define storage_point_unset(x) do { \ - (x).min = (x).max = (x).sum = NAN; \ - (x).count = 0; \ - (x).anomaly_count = 0; \ - (x).flags = SN_FLAG_NONE; \ - (x).start_time_s = 0; \ - (x).end_time_s = 0; \ - } while(0) - -#define storage_point_empty(x, start_s, end_s) do { \ - (x).min = (x).max = (x).sum = NAN; \ - (x).count = 1; \ - (x).anomaly_count = 0; \ - (x).flags = SN_FLAG_NONE; \ - (x).start_time_s = start_s; \ - (x).end_time_s = end_s; \ - } while(0) - -#define STORAGE_POINT_UNSET { .min = NAN, .max = NAN, .sum = NAN, .count = 0, .anomaly_count = 0, .flags = SN_FLAG_NONE, .start_time_s = 0, .end_time_s = 0 } - -#define storage_point_is_unset(x) (!(x).count) -#define storage_point_is_gap(x) (!netdata_double_isnumber((x).sum)) +STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +STORAGE_METRICS_GROUP *rrddim_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +static inline STORAGE_METRICS_GROUP *storage_engine_metrics_group_get(STORAGE_ENGINE_BACKEND backend __maybe_unused, STORAGE_INSTANCE *db_instance, uuid_t *uuid) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); -// ------------------------------------------------------------------------ -// function pointers that handle data collection -struct storage_engine_collect_ops { - // an initialization function to run before starting collection - STORAGE_COLLECT_HANDLE *(*init)(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_metrics_group_get(db_instance, uuid); +#endif + return rrddim_metrics_group_get(db_instance, uuid); +} + +void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); +void rrddim_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); +static inline void storage_engine_metrics_group_release(STORAGE_ENGINE_BACKEND backend __maybe_unused, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + rrdeng_metrics_group_release(db_instance, smg); + else +#endif + rrddim_metrics_group_release(db_instance, smg); +} - // run this to store each metric into the database - void (*store_metric)(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number, NETDATA_DOUBLE min_value, - NETDATA_DOUBLE max_value, uint16_t count, uint16_t anomaly_count, SN_FLAGS flags); +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); +STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); +static inline STORAGE_COLLECT_HANDLE *storage_metric_store_init(STORAGE_ENGINE_BACKEND backend __maybe_unused, STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); - // run this to flush / reset the current data collection sequence - void (*flush)(STORAGE_COLLECT_HANDLE *collection_handle); +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_store_metric_init(db_metric_handle, update_every, smg); +#endif + return rrddim_collect_init(db_metric_handle, update_every, smg); +} - // a finalization function to run after collection is over - // returns 1 if it's safe to delete the dimension - int (*finalize)(STORAGE_COLLECT_HANDLE *collection_handle); +void rrdeng_store_metric_next( + STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, + NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, + uint16_t count, uint16_t anomaly_count, SN_FLAGS flags); - void (*change_collection_frequency)(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); +void rrddim_collect_store_metric( + STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, + NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, + uint16_t count, uint16_t anomaly_count, SN_FLAGS flags); + +static inline void storage_engine_store_metric( + STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, + NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, + uint16_t count, uint16_t anomaly_count, SN_FLAGS flags) { + internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_store_metric_next(collection_handle, point_in_time_ut, + n, min_value, max_value, + count, anomaly_count, flags); +#endif + return rrddim_collect_store_metric(collection_handle, point_in_time_ut, + n, min_value, max_value, + count, anomaly_count, flags); +} + +void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle); +void rrddim_store_metric_flush(STORAGE_COLLECT_HANDLE *collection_handle); +static inline void storage_engine_store_flush(STORAGE_COLLECT_HANDLE *collection_handle) { + if(unlikely(!collection_handle)) + return; + + internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + rrdeng_store_metric_flush_current_page(collection_handle); + else +#endif + rrddim_store_metric_flush(collection_handle); +} + +int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle); +int rrddim_collect_finalize(STORAGE_COLLECT_HANDLE *collection_handle); +// a finalization function to run after collection is over +// returns 1 if it's safe to delete the dimension +static inline int storage_engine_store_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { + internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_store_metric_finalize(collection_handle); +#endif + + return rrddim_collect_finalize(collection_handle); +} + +void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); +void rrddim_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); +static inline void storage_engine_store_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) { + internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + rrdeng_store_metric_change_collection_frequency(collection_handle, update_every); + else +#endif + rrddim_store_metric_change_collection_frequency(collection_handle, update_every); +} - STORAGE_METRICS_GROUP *(*metrics_group_get)(STORAGE_INSTANCE *db_instance, uuid_t *uuid); - void (*metrics_group_release)(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *sa); -}; // ---------------------------------------------------------------------------- +// STORAGE ENGINE QUERY OPS + +time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle); +time_t rrddim_query_oldest_time_s(STORAGE_METRIC_HANDLE *db_metric_handle); +static inline time_t storage_engine_oldest_time_s(STORAGE_ENGINE_BACKEND backend __maybe_unused, STORAGE_METRIC_HANDLE *db_metric_handle) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_metric_oldest_time(db_metric_handle); +#endif + return rrddim_query_oldest_time_s(db_metric_handle); +} -// function pointers that handle database queries -struct storage_engine_query_ops { - // run this before starting a series of next_metric() database queries - void (*init)(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle, time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority); +time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle); +time_t rrddim_query_latest_time_s(STORAGE_METRIC_HANDLE *db_metric_handle); +static inline time_t storage_engine_latest_time_s(STORAGE_ENGINE_BACKEND backend __maybe_unused, STORAGE_METRIC_HANDLE *db_metric_handle) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); - // run this to load each metric number from the database - STORAGE_POINT (*next_metric)(struct storage_engine_query_handle *handle); +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_metric_latest_time(db_metric_handle); +#endif + return rrddim_query_latest_time_s(db_metric_handle); +} - // run this to test if the series of next_metric() database queries is finished - int (*is_finished)(struct storage_engine_query_handle *handle); +void rrdeng_load_metric_init( + STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrddim_handle, + time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority); - // run this after finishing a series of load_metric() database queries - void (*finalize)(struct storage_engine_query_handle *handle); +void rrddim_query_init( + STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle, + time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority); - // get the timestamp of the last entry of this metric - time_t (*latest_time_s)(STORAGE_METRIC_HANDLE *db_metric_handle); +static inline void storage_engine_query_init( + STORAGE_ENGINE_BACKEND backend __maybe_unused, + STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle, + time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); - // get the timestamp of the first entry of this metric - time_t (*oldest_time_s)(STORAGE_METRIC_HANDLE *db_metric_handle); +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + rrdeng_load_metric_init(db_metric_handle, handle, start_time_s, end_time_s, priority); + else +#endif + rrddim_query_init(db_metric_handle, handle, start_time_s, end_time_s, priority); +} - // adapt 'before' timestamp to the optimal for the query - // can only move 'before' ahead (to the future) - time_t (*align_to_optimal_before)(struct storage_engine_query_handle *handle); -}; +STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle); +STORAGE_POINT rrddim_query_next_metric(struct storage_engine_query_handle *handle); +static inline STORAGE_POINT storage_engine_query_next_metric(struct storage_engine_query_handle *handle) { + internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend"); -typedef struct storage_engine STORAGE_ENGINE; +#ifdef ENABLE_DBENGINE + if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_load_metric_next(handle); +#endif + return rrddim_query_next_metric(handle); +} + +int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrddim_handle); +int rrddim_query_is_finished(struct storage_engine_query_handle *handle); +static inline int storage_engine_query_is_finished(struct storage_engine_query_handle *handle) { + internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_load_metric_is_finished(handle); +#endif + return rrddim_query_is_finished(handle); +} + +void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_handle); +void rrddim_query_finalize(struct storage_engine_query_handle *handle); +static inline void storage_engine_query_finalize(struct storage_engine_query_handle *handle) { + internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + rrdeng_load_metric_finalize(handle); + else +#endif + rrddim_query_finalize(handle); +} + +time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle); +time_t rrddim_query_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle); +static inline time_t storage_engine_align_to_optimal_before(struct storage_engine_query_handle *handle) { + internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_load_align_to_optimal_before(handle); +#endif + return rrddim_query_align_to_optimal_before(handle); +} // ------------------------------------------------------------------------ // function pointers for all APIs provided by a storage engine @@ -503,17 +639,14 @@ typedef struct storage_engine_api { void (*metric_release)(STORAGE_METRIC_HANDLE *); STORAGE_METRIC_HANDLE *(*metric_dup)(STORAGE_METRIC_HANDLE *); bool (*metric_retention_by_uuid)(STORAGE_INSTANCE *db_instance, uuid_t *uuid, time_t *first_entry_s, time_t *last_entry_s); - - // operations - struct storage_engine_collect_ops collect_ops; - struct storage_engine_query_ops query_ops; } STORAGE_ENGINE_API; -struct storage_engine { +typedef struct storage_engine { + STORAGE_ENGINE_BACKEND backend; RRD_MEMORY_MODE id; const char* name; STORAGE_ENGINE_API api; -}; +} STORAGE_ENGINE; STORAGE_ENGINE* storage_engine_get(RRD_MEMORY_MODE mmode); STORAGE_ENGINE* storage_engine_find(const char* name); @@ -617,7 +750,7 @@ struct rrdset { DICTIONARY *rrddimvar_root_index; // dimension variables // we use this dictionary to manage their allocation - ml_chart_t *ml_chart; + rrd_ml_chart_t *ml_chart; // ------------------------------------------------------------------------ // operational state members @@ -702,6 +835,13 @@ struct rrdset { RRDCALC *base; // double linked list of RRDCALC related to this RRDSET } alerts; + struct { + size_t pos; + size_t size; + size_t used; + RRDDIM_ACQUIRED **rda; + } pluginsd; + #ifdef NETDATA_LOG_REPLICATION_REQUESTS struct { bool log_next_data_collection; @@ -757,35 +897,41 @@ bool rrdset_memory_load_or_create_map_save(RRDSET *st_on_file, RRD_MEMORY_MODE m // and may lead to missing information. typedef enum __attribute__ ((__packed__)) rrdhost_flags { + + // Careful not to overlap with rrdhost_options to avoid bugs if + // rrdhost_flags_xxx is used instead of rrdhost_option_xxx or vice-versa // Orphan, Archived and Obsolete flags - RRDHOST_FLAG_ORPHAN = (1 << 10), // this host is orphan (not receiving data) - RRDHOST_FLAG_ARCHIVED = (1 << 11), // The host is archived, no collected charts yet - RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS = (1 << 12), // the host has pending chart obsoletions - RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS = (1 << 13), // the host has pending dimension obsoletions + RRDHOST_FLAG_ORPHAN = (1 << 8), // this host is orphan (not receiving data) + RRDHOST_FLAG_ARCHIVED = (1 << 9), // The host is archived, no collected charts yet + RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS = (1 << 10), // the host has pending chart obsoletions + RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS = (1 << 11), // the host has pending dimension obsoletions // Streaming sender - RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED = (1 << 14), // the host has initialized rrdpush structures - RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN = (1 << 15), // When set, the sender thread is running - RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED = (1 << 16), // When set, the host is connected to a parent - RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS = (1 << 17), // when set, rrdset_done() should push metrics to parent - RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS = (1 << 18), // when set, we have logged the status of metrics streaming + RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED = (1 << 12), // the host has initialized rrdpush structures + RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN = (1 << 13), // When set, the sender thread is running + RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED = (1 << 14), // When set, the host is connected to a parent + RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS = (1 << 15), // when set, rrdset_done() should push metrics to parent + RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS = (1 << 16), // when set, we have logged the status of metrics streaming // Health - RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 20), // contains charts and dims with uninitialized variables - RRDHOST_FLAG_INITIALIZED_HEALTH = (1 << 21), // the host has initialized health structures + RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 17), // contains charts and dims with uninitialized variables + RRDHOST_FLAG_INITIALIZED_HEALTH = (1 << 18), // the host has initialized health structures // Exporting - RRDHOST_FLAG_EXPORTING_SEND = (1 << 22), // send it to external databases - RRDHOST_FLAG_EXPORTING_DONT_SEND = (1 << 23), // don't send it to external databases + RRDHOST_FLAG_EXPORTING_SEND = (1 << 19), // send it to external databases + RRDHOST_FLAG_EXPORTING_DONT_SEND = (1 << 20), // don't send it to external databases // ACLK - RRDHOST_FLAG_ACLK_STREAM_CONTEXTS = (1 << 24), // when set, we should send ACLK stream context updates + RRDHOST_FLAG_ACLK_STREAM_CONTEXTS = (1 << 21), // when set, we should send ACLK stream context updates + RRDHOST_FLAG_ACLK_STREAM_ALERTS = (1 << 22), // set when the receiver part is disconnected // Metadata - RRDHOST_FLAG_METADATA_UPDATE = (1 << 25), // metadata needs to be stored in the database - RRDHOST_FLAG_METADATA_LABELS = (1 << 26), // metadata needs to be stored in the database - RRDHOST_FLAG_METADATA_INFO = (1 << 27), // metadata needs to be stored in the database - RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database + RRDHOST_FLAG_METADATA_UPDATE = (1 << 23), // metadata needs to be stored in the database + RRDHOST_FLAG_METADATA_LABELS = (1 << 24), // metadata needs to be stored in the database + RRDHOST_FLAG_METADATA_INFO = (1 << 25), // metadata needs to be stored in the database + RRDHOST_FLAG_PENDING_CONTEXT_LOAD = (1 << 26), // metadata needs to be stored in the database + RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS = (1 << 27), // metadata needs to be stored in the database + RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED = (1 << 29), // set when the receiver part is disconnected } RRDHOST_FLAGS; @@ -954,6 +1100,8 @@ struct rrdhost_system_info { int mc_version; }; +struct rrdhost_system_info *rrdhost_labels_to_system_info(DICTIONARY *labels); + struct rrdhost { char machine_guid[GUID_LEN + 1]; // the unique ID of this host @@ -982,13 +1130,12 @@ struct rrdhost { // the actual per tier is at .db[tier].mode char *cache_dir; // the directory to save RRD cache files - char *varlib_dir; // the directory to save health log struct { RRD_MEMORY_MODE mode; // the db mode for this tier STORAGE_ENGINE *eng; // the storage engine API for this tier STORAGE_INSTANCE *instance; // the db instance for this tier - size_t tier_grouping; // tier 0 iterations aggregated on this tier + uint32_t tier_grouping; // tier 0 iterations aggregated on this tier } db[RRD_STORAGE_TIERS]; struct rrdhost_system_info *system_info; // information collected from the host environment @@ -1011,7 +1158,7 @@ struct rrdhost { struct sender_state *sender; netdata_thread_t rrdpush_sender_thread; // the sender thread size_t rrdpush_sender_replicating_charts; // the number of charts currently being replicated to a parent - void *dbsync_worker; + void *aclk_sync_host_config; // ------------------------------------------------------------------------ // streaming of data from remote hosts - rrdpush receiver @@ -1042,6 +1189,7 @@ struct rrdhost { uint32_t health_last_processed_id; // the last processed health id from the log uint32_t health_max_unique_id; // the max alarm log unique id given for the host uint32_t health_max_alarm_id; // the max alarm id given for the host + size_t health_transitions; // the number of times an alert changed state // ------------------------------------------------------------------------ // locks @@ -1050,7 +1198,7 @@ struct rrdhost { // ------------------------------------------------------------------------ // ML handle - ml_host_t *ml_host; + rrd_ml_host_t *ml_host; // ------------------------------------------------------------------------ // Support for host-level labels @@ -1070,9 +1218,11 @@ struct rrdhost { DICTIONARY *rrdvars; // the host's chart variables index // this includes custom host variables - RRDCONTEXTS *rrdctx_hub_queue; - RRDCONTEXTS *rrdctx_post_processing_queue; - RRDCONTEXTS *rrdctx; + struct { + DICTIONARY *contexts; + DICTIONARY *hub_queue; + DICTIONARY *pp_queue; + } rrdctx; uuid_t host_uuid; // Global GUID for this host uuid_t *node_id; // Cloud node_id @@ -1110,6 +1260,10 @@ extern RRDHOST *localhost; extern DICTIONARY *rrdhost_root_index; size_t rrdhost_hosts_available(void); +RRDHOST_ACQUIRED *rrdhost_find_and_acquire(const char *machine_guid); +RRDHOST *rrdhost_acquired_to_rrdhost(RRDHOST_ACQUIRED *rha); +void rrdhost_acquired_release(RRDHOST_ACQUIRED *rha); + // ---------------------------------------------------------------------------- #define rrdhost_foreach_read(var) \ @@ -1145,6 +1299,7 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info, bool unitt RRDHOST *rrdhost_find_by_hostname(const char *hostname); RRDHOST *rrdhost_find_by_guid(const char *guid); +RRDHOST *find_host_by_node_id(char *node_id); RRDHOST *rrdhost_find_or_create( const char *hostname @@ -1217,6 +1372,11 @@ void rrdset_update_heterogeneous_flag(RRDSET *st); time_t rrdset_set_update_every_s(RRDSET *st, time_t update_every_s); RRDSET *rrdset_find(RRDHOST *host, const char *id); + +RRDSET_ACQUIRED *rrdset_find_and_acquire(RRDHOST *host, const char *id); +RRDSET *rrdset_acquired_to_rrdset(RRDSET_ACQUIRED *rsa); +void rrdset_acquired_release(RRDSET_ACQUIRED *rsa); + #define rrdset_find_localhost(id) rrdset_find(localhost, id) /* This will not return charts that are archived */ static inline RRDSET *rrdset_find_active_localhost(const char *id) @@ -1339,13 +1499,13 @@ void rrdset_delete_files(RRDSET *st); void rrdset_save(RRDSET *st); void rrdset_free(RRDSET *st); +void rrddim_free(RRDSET *st, RRDDIM *rd); + #ifdef NETDATA_RRD_INTERNALS char *rrdhost_cache_dir_for_rrdset_alloc(RRDHOST *host, const char *id); const char *rrdset_cache_dir(RRDSET *st); -void rrddim_free(RRDSET *st, RRDDIM *rd); - void rrdset_reset(RRDSET *st); void rrdset_delete_obsolete_dimensions(RRDSET *st); |