summaryrefslogtreecommitdiffstats
path: root/database/rrd.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-08 16:27:04 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-08 16:27:04 +0000
commita836a244a3d2bdd4da1ee2641e3e957850668cea (patch)
treecb87c75b3677fab7144f868435243f864048a1e6 /database/rrd.h
parentAdding upstream version 1.38.1. (diff)
downloadnetdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz
netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--database/rrd.h432
1 files changed, 296 insertions, 136 deletions
diff --git a/database/rrd.h b/database/rrd.h
index 42eeb1655..0f67a3b77 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -30,9 +30,9 @@ typedef struct rrdhost_acquired RRDHOST_ACQUIRED;
typedef struct rrdset_acquired RRDSET_ACQUIRED;
typedef struct rrddim_acquired RRDDIM_ACQUIRED;
-typedef struct ml_host ml_host_t;
-typedef struct ml_chart ml_chart_t;
-typedef struct ml_dimension ml_dimension_t;
+typedef struct ml_host rrd_ml_host_t;
+typedef struct ml_chart rrd_ml_chart_t;
+typedef struct ml_dimension rrd_ml_dimension_t;
typedef enum __attribute__ ((__packed__)) {
QUERY_SOURCE_UNKNOWN = 0,
@@ -54,6 +54,9 @@ typedef enum __attribute__ ((__packed__)) storage_priority {
STORAGE_PRIORITY_LOW,
STORAGE_PRIORITY_BEST_EFFORT,
+ // synchronous query, not to be dispatched to workers or queued
+ STORAGE_PRIORITY_SYNCHRONOUS,
+
STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE,
} STORAGE_PRIORITY;
@@ -106,30 +109,39 @@ RRD_MEMORY_MODE rrd_memory_mode_id(const char *name);
typedef struct storage_query_handle STORAGE_QUERY_HANDLE;
+typedef enum __attribute__ ((__packed__)) {
+ STORAGE_ENGINE_BACKEND_RRDDIM = 1,
+ STORAGE_ENGINE_BACKEND_DBENGINE = 2,
+} STORAGE_ENGINE_BACKEND;
+
+#define is_valid_backend(backend) ((backend) >= STORAGE_ENGINE_BACKEND_RRDDIM && (backend) <= STORAGE_ENGINE_BACKEND_DBENGINE)
+
// iterator state for RRD dimension data queries
struct storage_engine_query_handle {
time_t start_time_s;
time_t end_time_s;
STORAGE_PRIORITY priority;
- STORAGE_QUERY_HANDLE* handle;
+ STORAGE_ENGINE_BACKEND backend;
+ STORAGE_QUERY_HANDLE *handle;
};
-typedef struct storage_point {
- NETDATA_DOUBLE min; // when count > 1, this is the minimum among them
- NETDATA_DOUBLE max; // when count > 1, this is the maximum among them
- NETDATA_DOUBLE sum; // the point sum - divided by count gives the average
+// ----------------------------------------------------------------------------
+// chart types
- // end_time - start_time = point duration
- time_t start_time_s; // the time the point starts
- time_t end_time_s; // the time the point ends
+typedef enum __attribute__ ((__packed__)) rrdset_type {
+ RRDSET_TYPE_LINE = 0,
+ RRDSET_TYPE_AREA = 1,
+ RRDSET_TYPE_STACKED = 2,
+} RRDSET_TYPE;
- size_t count; // the number of original points aggregated
- size_t anomaly_count; // the number of original points found anomalous
+#define RRDSET_TYPE_LINE_NAME "line"
+#define RRDSET_TYPE_AREA_NAME "area"
+#define RRDSET_TYPE_STACKED_NAME "stacked"
- SN_FLAGS flags; // flags stored with the point
-} STORAGE_POINT;
+RRDSET_TYPE rrdset_type_id(const char *name);
+const char *rrdset_type_name(RRDSET_TYPE chart_type);
-#include "rrdcontext.h"
+#include "contexts/rrdcontext.h"
extern bool unittest_running;
extern bool dbengine_enabled;
@@ -158,39 +170,23 @@ extern time_t rrdset_free_obsolete_time_s;
#if defined(ENV32BIT)
#define MIN_LIBUV_WORKER_THREADS 8
-#define MAX_LIBUV_WORKER_THREADS 64
+#define MAX_LIBUV_WORKER_THREADS 128
#define RESERVED_LIBUV_WORKER_THREADS 3
#else
#define MIN_LIBUV_WORKER_THREADS 16
-#define MAX_LIBUV_WORKER_THREADS 128
+#define MAX_LIBUV_WORKER_THREADS 1024
#define RESERVED_LIBUV_WORKER_THREADS 6
#endif
extern int libuv_worker_threads;
+extern bool ieee754_doubles;
-#define RRD_ID_LENGTH_MAX 200
+#define RRD_ID_LENGTH_MAX 1000
typedef long long total_number;
#define TOTAL_NUMBER_FORMAT "%lld"
// ----------------------------------------------------------------------------
-// chart types
-
-typedef enum __attribute__ ((__packed__)) rrdset_type {
- RRDSET_TYPE_LINE = 0,
- RRDSET_TYPE_AREA = 1,
- RRDSET_TYPE_STACKED = 2,
-} RRDSET_TYPE;
-
-#define RRDSET_TYPE_LINE_NAME "line"
-#define RRDSET_TYPE_AREA_NAME "area"
-#define RRDSET_TYPE_STACKED_NAME "stacked"
-
-RRDSET_TYPE rrdset_type_id(const char *name);
-const char *rrdset_type_name(RRDSET_TYPE chart_type);
-
-
-// ----------------------------------------------------------------------------
// algorithms types
typedef enum __attribute__ ((__packed__)) rrd_algorithm {
@@ -279,7 +275,11 @@ void rrdlabels_destroy(DICTIONARY *labels_dict);
void rrdlabels_add(DICTIONARY *dict, const char *name, const char *value, RRDLABEL_SRC ls);
void rrdlabels_add_pair(DICTIONARY *dict, const char *string, RRDLABEL_SRC ls);
void rrdlabels_get_value_to_buffer_or_null(DICTIONARY *labels, BUFFER *wb, const char *key, const char *quote, const char *null);
-void rrdlabels_get_value_to_char_or_null(DICTIONARY *labels, char **value, const char *key);
+void rrdlabels_value_to_buffer_array_item_or_null(DICTIONARY *labels, BUFFER *wb, const char *key);
+void rrdlabels_get_value_strdup_or_null(DICTIONARY *labels, char **value, const char *key);
+void rrdlabels_get_value_strcpyz(DICTIONARY *labels, char *dst, size_t dst_len, const char *key);
+STRING *rrdlabels_get_value_string_dup(DICTIONARY *labels, const char *key);
+STRING *rrdlabels_get_value_to_buffer_or_unset(DICTIONARY *labels, BUFFER *wb, const char *key, const char *unset);
void rrdlabels_flush(DICTIONARY *labels_dict);
void rrdlabels_unmark_all(DICTIONARY *labels);
@@ -290,8 +290,10 @@ int rrdlabels_sorted_walkthrough_read(DICTIONARY *labels, int (*callback)(const
void rrdlabels_log_to_buffer(DICTIONARY *labels, BUFFER *wb);
bool rrdlabels_match_simple_pattern(DICTIONARY *labels, const char *simple_pattern_txt);
-bool rrdlabels_match_simple_pattern_parsed(DICTIONARY *labels, SIMPLE_PATTERN *pattern, char equal);
+
+bool rrdlabels_match_simple_pattern_parsed(DICTIONARY *labels, SIMPLE_PATTERN *pattern, char equal, size_t *searches);
int rrdlabels_to_buffer(DICTIONARY *labels, BUFFER *wb, const char *before_each, const char *equal, const char *quote, const char *between_them, bool (*filter_callback)(const char *name, const char *value, RRDLABEL_SRC ls, void *data), void *filter_data, void (*name_sanitizer)(char *dst, const char *src, size_t dst_size), void (*value_sanitizer)(char *dst, const char *src, size_t dst_size));
+void rrdlabels_to_buffer_json_members(DICTIONARY *labels, BUFFER *wb);
void rrdlabels_migrate_to_these(DICTIONARY *dst, DICTIONARY *src);
void rrdlabels_copy(DICTIONARY *dst, DICTIONARY *src);
@@ -307,19 +309,20 @@ bool exporting_labels_filter_callback(const char *name, const char *value, RRDLA
// ----------------------------------------------------------------------------
// engine-specific iterator state for dimension data collection
-typedef struct storage_collect_handle STORAGE_COLLECT_HANDLE;
+typedef struct storage_collect_handle {
+ STORAGE_ENGINE_BACKEND backend;
+} STORAGE_COLLECT_HANDLE;
// ----------------------------------------------------------------------------
// Storage tier data for every dimension
struct rrddim_tier {
STORAGE_POINT virtual_point;
- size_t tier_grouping;
+ STORAGE_ENGINE_BACKEND backend;
+ uint32_t tier_grouping;
time_t next_point_end_time_s;
STORAGE_METRIC_HANDLE *db_metric_handle; // the metric handle inside the database
STORAGE_COLLECT_HANDLE *db_collection_handle; // the data collection handle
- struct storage_engine_collect_ops *collect_ops;
- struct storage_engine_query_ops *query_ops;
};
void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s);
@@ -354,7 +357,7 @@ struct rrddim {
// ------------------------------------------------------------------------
// operational state members
- ml_dimension_t *ml_dimension; // machine learning data about this dimension
+ rrd_ml_dimension_t *ml_dimension; // machine learning data about this dimension
// ------------------------------------------------------------------------
// linking to siblings and parents
@@ -417,82 +420,215 @@ size_t rrddim_memory_file_header_size(void);
void rrddim_memory_file_save(RRDDIM *rd);
-// ----------------------------------------------------------------------------
+// ------------------------------------------------------------------------
+// DATA COLLECTION STORAGE OPS
-#define storage_point_unset(x) do { \
- (x).min = (x).max = (x).sum = NAN; \
- (x).count = 0; \
- (x).anomaly_count = 0; \
- (x).flags = SN_FLAG_NONE; \
- (x).start_time_s = 0; \
- (x).end_time_s = 0; \
- } while(0)
-
-#define storage_point_empty(x, start_s, end_s) do { \
- (x).min = (x).max = (x).sum = NAN; \
- (x).count = 1; \
- (x).anomaly_count = 0; \
- (x).flags = SN_FLAG_NONE; \
- (x).start_time_s = start_s; \
- (x).end_time_s = end_s; \
- } while(0)
-
-#define STORAGE_POINT_UNSET { .min = NAN, .max = NAN, .sum = NAN, .count = 0, .anomaly_count = 0, .flags = SN_FLAG_NONE, .start_time_s = 0, .end_time_s = 0 }
-
-#define storage_point_is_unset(x) (!(x).count)
-#define storage_point_is_gap(x) (!netdata_double_isnumber((x).sum))
+STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
+STORAGE_METRICS_GROUP *rrddim_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
+static inline STORAGE_METRICS_GROUP *storage_engine_metrics_group_get(STORAGE_ENGINE_BACKEND backend __maybe_unused, STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
+ internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend");
-// ------------------------------------------------------------------------
-// function pointers that handle data collection
-struct storage_engine_collect_ops {
- // an initialization function to run before starting collection
- STORAGE_COLLECT_HANDLE *(*init)(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
+#ifdef ENABLE_DBENGINE
+ if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_metrics_group_get(db_instance, uuid);
+#endif
+ return rrddim_metrics_group_get(db_instance, uuid);
+}
+
+void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg);
+void rrddim_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg);
+static inline void storage_engine_metrics_group_release(STORAGE_ENGINE_BACKEND backend __maybe_unused, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) {
+ internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ rrdeng_metrics_group_release(db_instance, smg);
+ else
+#endif
+ rrddim_metrics_group_release(db_instance, smg);
+}
- // run this to store each metric into the database
- void (*store_metric)(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number, NETDATA_DOUBLE min_value,
- NETDATA_DOUBLE max_value, uint16_t count, uint16_t anomaly_count, SN_FLAGS flags);
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
+STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
+static inline STORAGE_COLLECT_HANDLE *storage_metric_store_init(STORAGE_ENGINE_BACKEND backend __maybe_unused, STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) {
+ internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend");
- // run this to flush / reset the current data collection sequence
- void (*flush)(STORAGE_COLLECT_HANDLE *collection_handle);
+#ifdef ENABLE_DBENGINE
+ if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_store_metric_init(db_metric_handle, update_every, smg);
+#endif
+ return rrddim_collect_init(db_metric_handle, update_every, smg);
+}
- // a finalization function to run after collection is over
- // returns 1 if it's safe to delete the dimension
- int (*finalize)(STORAGE_COLLECT_HANDLE *collection_handle);
+void rrdeng_store_metric_next(
+ STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut,
+ NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value,
+ uint16_t count, uint16_t anomaly_count, SN_FLAGS flags);
- void (*change_collection_frequency)(STORAGE_COLLECT_HANDLE *collection_handle, int update_every);
+void rrddim_collect_store_metric(
+ STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut,
+ NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value,
+ uint16_t count, uint16_t anomaly_count, SN_FLAGS flags);
+
+static inline void storage_engine_store_metric(
+ STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut,
+ NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value,
+ uint16_t count, uint16_t anomaly_count, SN_FLAGS flags) {
+ internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_store_metric_next(collection_handle, point_in_time_ut,
+ n, min_value, max_value,
+ count, anomaly_count, flags);
+#endif
+ return rrddim_collect_store_metric(collection_handle, point_in_time_ut,
+ n, min_value, max_value,
+ count, anomaly_count, flags);
+}
+
+void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle);
+void rrddim_store_metric_flush(STORAGE_COLLECT_HANDLE *collection_handle);
+static inline void storage_engine_store_flush(STORAGE_COLLECT_HANDLE *collection_handle) {
+ if(unlikely(!collection_handle))
+ return;
+
+ internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ else
+#endif
+ rrddim_store_metric_flush(collection_handle);
+}
+
+int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle);
+int rrddim_collect_finalize(STORAGE_COLLECT_HANDLE *collection_handle);
+// a finalization function to run after collection is over
+// returns 1 if it's safe to delete the dimension
+static inline int storage_engine_store_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
+ internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_store_metric_finalize(collection_handle);
+#endif
+
+ return rrddim_collect_finalize(collection_handle);
+}
+
+void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every);
+void rrddim_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every);
+static inline void storage_engine_store_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) {
+ internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ rrdeng_store_metric_change_collection_frequency(collection_handle, update_every);
+ else
+#endif
+ rrddim_store_metric_change_collection_frequency(collection_handle, update_every);
+}
- STORAGE_METRICS_GROUP *(*metrics_group_get)(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
- void (*metrics_group_release)(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *sa);
-};
// ----------------------------------------------------------------------------
+// STORAGE ENGINE QUERY OPS
+
+time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
+time_t rrddim_query_oldest_time_s(STORAGE_METRIC_HANDLE *db_metric_handle);
+static inline time_t storage_engine_oldest_time_s(STORAGE_ENGINE_BACKEND backend __maybe_unused, STORAGE_METRIC_HANDLE *db_metric_handle) {
+ internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_metric_oldest_time(db_metric_handle);
+#endif
+ return rrddim_query_oldest_time_s(db_metric_handle);
+}
-// function pointers that handle database queries
-struct storage_engine_query_ops {
- // run this before starting a series of next_metric() database queries
- void (*init)(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle, time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority);
+time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
+time_t rrddim_query_latest_time_s(STORAGE_METRIC_HANDLE *db_metric_handle);
+static inline time_t storage_engine_latest_time_s(STORAGE_ENGINE_BACKEND backend __maybe_unused, STORAGE_METRIC_HANDLE *db_metric_handle) {
+ internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend");
- // run this to load each metric number from the database
- STORAGE_POINT (*next_metric)(struct storage_engine_query_handle *handle);
+#ifdef ENABLE_DBENGINE
+ if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_metric_latest_time(db_metric_handle);
+#endif
+ return rrddim_query_latest_time_s(db_metric_handle);
+}
- // run this to test if the series of next_metric() database queries is finished
- int (*is_finished)(struct storage_engine_query_handle *handle);
+void rrdeng_load_metric_init(
+ STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrddim_handle,
+ time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority);
- // run this after finishing a series of load_metric() database queries
- void (*finalize)(struct storage_engine_query_handle *handle);
+void rrddim_query_init(
+ STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle,
+ time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority);
- // get the timestamp of the last entry of this metric
- time_t (*latest_time_s)(STORAGE_METRIC_HANDLE *db_metric_handle);
+static inline void storage_engine_query_init(
+ STORAGE_ENGINE_BACKEND backend __maybe_unused,
+ STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle,
+ time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority) {
+ internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend");
- // get the timestamp of the first entry of this metric
- time_t (*oldest_time_s)(STORAGE_METRIC_HANDLE *db_metric_handle);
+#ifdef ENABLE_DBENGINE
+ if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ rrdeng_load_metric_init(db_metric_handle, handle, start_time_s, end_time_s, priority);
+ else
+#endif
+ rrddim_query_init(db_metric_handle, handle, start_time_s, end_time_s, priority);
+}
- // adapt 'before' timestamp to the optimal for the query
- // can only move 'before' ahead (to the future)
- time_t (*align_to_optimal_before)(struct storage_engine_query_handle *handle);
-};
+STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle);
+STORAGE_POINT rrddim_query_next_metric(struct storage_engine_query_handle *handle);
+static inline STORAGE_POINT storage_engine_query_next_metric(struct storage_engine_query_handle *handle) {
+ internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend");
-typedef struct storage_engine STORAGE_ENGINE;
+#ifdef ENABLE_DBENGINE
+ if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_load_metric_next(handle);
+#endif
+ return rrddim_query_next_metric(handle);
+}
+
+int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrddim_handle);
+int rrddim_query_is_finished(struct storage_engine_query_handle *handle);
+static inline int storage_engine_query_is_finished(struct storage_engine_query_handle *handle) {
+ internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_load_metric_is_finished(handle);
+#endif
+ return rrddim_query_is_finished(handle);
+}
+
+void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_handle);
+void rrddim_query_finalize(struct storage_engine_query_handle *handle);
+static inline void storage_engine_query_finalize(struct storage_engine_query_handle *handle) {
+ internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ rrdeng_load_metric_finalize(handle);
+ else
+#endif
+ rrddim_query_finalize(handle);
+}
+
+time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle);
+time_t rrddim_query_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle);
+static inline time_t storage_engine_align_to_optimal_before(struct storage_engine_query_handle *handle) {
+ internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_load_align_to_optimal_before(handle);
+#endif
+ return rrddim_query_align_to_optimal_before(handle);
+}
// ------------------------------------------------------------------------
// function pointers for all APIs provided by a storage engine
@@ -503,17 +639,14 @@ typedef struct storage_engine_api {
void (*metric_release)(STORAGE_METRIC_HANDLE *);
STORAGE_METRIC_HANDLE *(*metric_dup)(STORAGE_METRIC_HANDLE *);
bool (*metric_retention_by_uuid)(STORAGE_INSTANCE *db_instance, uuid_t *uuid, time_t *first_entry_s, time_t *last_entry_s);
-
- // operations
- struct storage_engine_collect_ops collect_ops;
- struct storage_engine_query_ops query_ops;
} STORAGE_ENGINE_API;
-struct storage_engine {
+typedef struct storage_engine {
+ STORAGE_ENGINE_BACKEND backend;
RRD_MEMORY_MODE id;
const char* name;
STORAGE_ENGINE_API api;
-};
+} STORAGE_ENGINE;
STORAGE_ENGINE* storage_engine_get(RRD_MEMORY_MODE mmode);
STORAGE_ENGINE* storage_engine_find(const char* name);
@@ -617,7 +750,7 @@ struct rrdset {
DICTIONARY *rrddimvar_root_index; // dimension variables
// we use this dictionary to manage their allocation
- ml_chart_t *ml_chart;
+ rrd_ml_chart_t *ml_chart;
// ------------------------------------------------------------------------
// operational state members
@@ -702,6 +835,13 @@ struct rrdset {
RRDCALC *base; // double linked list of RRDCALC related to this RRDSET
} alerts;
+ struct {
+ size_t pos;
+ size_t size;
+ size_t used;
+ RRDDIM_ACQUIRED **rda;
+ } pluginsd;
+
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
struct {
bool log_next_data_collection;
@@ -757,35 +897,41 @@ bool rrdset_memory_load_or_create_map_save(RRDSET *st_on_file, RRD_MEMORY_MODE m
// and may lead to missing information.
typedef enum __attribute__ ((__packed__)) rrdhost_flags {
+
+ // Careful not to overlap with rrdhost_options to avoid bugs if
+ // rrdhost_flags_xxx is used instead of rrdhost_option_xxx or vice-versa
// Orphan, Archived and Obsolete flags
- RRDHOST_FLAG_ORPHAN = (1 << 10), // this host is orphan (not receiving data)
- RRDHOST_FLAG_ARCHIVED = (1 << 11), // The host is archived, no collected charts yet
- RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS = (1 << 12), // the host has pending chart obsoletions
- RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS = (1 << 13), // the host has pending dimension obsoletions
+ RRDHOST_FLAG_ORPHAN = (1 << 8), // this host is orphan (not receiving data)
+ RRDHOST_FLAG_ARCHIVED = (1 << 9), // The host is archived, no collected charts yet
+ RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS = (1 << 10), // the host has pending chart obsoletions
+ RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS = (1 << 11), // the host has pending dimension obsoletions
// Streaming sender
- RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED = (1 << 14), // the host has initialized rrdpush structures
- RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN = (1 << 15), // When set, the sender thread is running
- RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED = (1 << 16), // When set, the host is connected to a parent
- RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS = (1 << 17), // when set, rrdset_done() should push metrics to parent
- RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS = (1 << 18), // when set, we have logged the status of metrics streaming
+ RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED = (1 << 12), // the host has initialized rrdpush structures
+ RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN = (1 << 13), // When set, the sender thread is running
+ RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED = (1 << 14), // When set, the host is connected to a parent
+ RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS = (1 << 15), // when set, rrdset_done() should push metrics to parent
+ RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS = (1 << 16), // when set, we have logged the status of metrics streaming
// Health
- RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 20), // contains charts and dims with uninitialized variables
- RRDHOST_FLAG_INITIALIZED_HEALTH = (1 << 21), // the host has initialized health structures
+ RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 17), // contains charts and dims with uninitialized variables
+ RRDHOST_FLAG_INITIALIZED_HEALTH = (1 << 18), // the host has initialized health structures
// Exporting
- RRDHOST_FLAG_EXPORTING_SEND = (1 << 22), // send it to external databases
- RRDHOST_FLAG_EXPORTING_DONT_SEND = (1 << 23), // don't send it to external databases
+ RRDHOST_FLAG_EXPORTING_SEND = (1 << 19), // send it to external databases
+ RRDHOST_FLAG_EXPORTING_DONT_SEND = (1 << 20), // don't send it to external databases
// ACLK
- RRDHOST_FLAG_ACLK_STREAM_CONTEXTS = (1 << 24), // when set, we should send ACLK stream context updates
+ RRDHOST_FLAG_ACLK_STREAM_CONTEXTS = (1 << 21), // when set, we should send ACLK stream context updates
+ RRDHOST_FLAG_ACLK_STREAM_ALERTS = (1 << 22), // set when the receiver part is disconnected
// Metadata
- RRDHOST_FLAG_METADATA_UPDATE = (1 << 25), // metadata needs to be stored in the database
- RRDHOST_FLAG_METADATA_LABELS = (1 << 26), // metadata needs to be stored in the database
- RRDHOST_FLAG_METADATA_INFO = (1 << 27), // metadata needs to be stored in the database
- RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database
+ RRDHOST_FLAG_METADATA_UPDATE = (1 << 23), // metadata needs to be stored in the database
+ RRDHOST_FLAG_METADATA_LABELS = (1 << 24), // metadata needs to be stored in the database
+ RRDHOST_FLAG_METADATA_INFO = (1 << 25), // metadata needs to be stored in the database
+ RRDHOST_FLAG_PENDING_CONTEXT_LOAD = (1 << 26), // metadata needs to be stored in the database
+ RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS = (1 << 27), // metadata needs to be stored in the database
+ RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database
RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED = (1 << 29), // set when the receiver part is disconnected
} RRDHOST_FLAGS;
@@ -954,6 +1100,8 @@ struct rrdhost_system_info {
int mc_version;
};
+struct rrdhost_system_info *rrdhost_labels_to_system_info(DICTIONARY *labels);
+
struct rrdhost {
char machine_guid[GUID_LEN + 1]; // the unique ID of this host
@@ -982,13 +1130,12 @@ struct rrdhost {
// the actual per tier is at .db[tier].mode
char *cache_dir; // the directory to save RRD cache files
- char *varlib_dir; // the directory to save health log
struct {
RRD_MEMORY_MODE mode; // the db mode for this tier
STORAGE_ENGINE *eng; // the storage engine API for this tier
STORAGE_INSTANCE *instance; // the db instance for this tier
- size_t tier_grouping; // tier 0 iterations aggregated on this tier
+ uint32_t tier_grouping; // tier 0 iterations aggregated on this tier
} db[RRD_STORAGE_TIERS];
struct rrdhost_system_info *system_info; // information collected from the host environment
@@ -1011,7 +1158,7 @@ struct rrdhost {
struct sender_state *sender;
netdata_thread_t rrdpush_sender_thread; // the sender thread
size_t rrdpush_sender_replicating_charts; // the number of charts currently being replicated to a parent
- void *dbsync_worker;
+ void *aclk_sync_host_config;
// ------------------------------------------------------------------------
// streaming of data from remote hosts - rrdpush receiver
@@ -1042,6 +1189,7 @@ struct rrdhost {
uint32_t health_last_processed_id; // the last processed health id from the log
uint32_t health_max_unique_id; // the max alarm log unique id given for the host
uint32_t health_max_alarm_id; // the max alarm id given for the host
+ size_t health_transitions; // the number of times an alert changed state
// ------------------------------------------------------------------------
// locks
@@ -1050,7 +1198,7 @@ struct rrdhost {
// ------------------------------------------------------------------------
// ML handle
- ml_host_t *ml_host;
+ rrd_ml_host_t *ml_host;
// ------------------------------------------------------------------------
// Support for host-level labels
@@ -1070,9 +1218,11 @@ struct rrdhost {
DICTIONARY *rrdvars; // the host's chart variables index
// this includes custom host variables
- RRDCONTEXTS *rrdctx_hub_queue;
- RRDCONTEXTS *rrdctx_post_processing_queue;
- RRDCONTEXTS *rrdctx;
+ struct {
+ DICTIONARY *contexts;
+ DICTIONARY *hub_queue;
+ DICTIONARY *pp_queue;
+ } rrdctx;
uuid_t host_uuid; // Global GUID for this host
uuid_t *node_id; // Cloud node_id
@@ -1110,6 +1260,10 @@ extern RRDHOST *localhost;
extern DICTIONARY *rrdhost_root_index;
size_t rrdhost_hosts_available(void);
+RRDHOST_ACQUIRED *rrdhost_find_and_acquire(const char *machine_guid);
+RRDHOST *rrdhost_acquired_to_rrdhost(RRDHOST_ACQUIRED *rha);
+void rrdhost_acquired_release(RRDHOST_ACQUIRED *rha);
+
// ----------------------------------------------------------------------------
#define rrdhost_foreach_read(var) \
@@ -1145,6 +1299,7 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info, bool unitt
RRDHOST *rrdhost_find_by_hostname(const char *hostname);
RRDHOST *rrdhost_find_by_guid(const char *guid);
+RRDHOST *find_host_by_node_id(char *node_id);
RRDHOST *rrdhost_find_or_create(
const char *hostname
@@ -1217,6 +1372,11 @@ void rrdset_update_heterogeneous_flag(RRDSET *st);
time_t rrdset_set_update_every_s(RRDSET *st, time_t update_every_s);
RRDSET *rrdset_find(RRDHOST *host, const char *id);
+
+RRDSET_ACQUIRED *rrdset_find_and_acquire(RRDHOST *host, const char *id);
+RRDSET *rrdset_acquired_to_rrdset(RRDSET_ACQUIRED *rsa);
+void rrdset_acquired_release(RRDSET_ACQUIRED *rsa);
+
#define rrdset_find_localhost(id) rrdset_find(localhost, id)
/* This will not return charts that are archived */
static inline RRDSET *rrdset_find_active_localhost(const char *id)
@@ -1339,13 +1499,13 @@ void rrdset_delete_files(RRDSET *st);
void rrdset_save(RRDSET *st);
void rrdset_free(RRDSET *st);
+void rrddim_free(RRDSET *st, RRDDIM *rd);
+
#ifdef NETDATA_RRD_INTERNALS
char *rrdhost_cache_dir_for_rrdset_alloc(RRDHOST *host, const char *id);
const char *rrdset_cache_dir(RRDSET *st);
-void rrddim_free(RRDSET *st, RRDDIM *rd);
-
void rrdset_reset(RRDSET *st);
void rrdset_delete_obsolete_dimensions(RRDSET *st);