summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/rrdpush.h493
1 files changed, 430 insertions, 63 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index f97c8ddfb..73bd438c9 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -43,19 +43,20 @@ typedef enum {
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
+ STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
// needed for negotiating errors between parent and child
} STREAM_CAPABILITIES;
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
#else
#define STREAM_HAS_COMPRESSION 0
-#endif // ENABLE_COMPRESSION
+#endif // ENABLE_RRDPUSH_COMPRESSION
-STREAM_CAPABILITIES stream_our_capabilities();
+STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
@@ -77,11 +78,10 @@ STREAM_CAPABILITIES stream_our_capabilities();
#define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later."
typedef enum {
- STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
- STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS
- STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM
- STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS
- STREAM_HANDSHAKE_OK_V1 = 1,
+ STREAM_HANDSHAKE_OK_V3 = 3, // v3+
+ STREAM_HANDSHAKE_OK_V2 = 2, // v2
+ STREAM_HANDSHAKE_OK_V1 = 1, // v1
+ STREAM_HANDSHAKE_NEVER = 0, // never tried to connect
STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
@@ -94,20 +94,19 @@ typedef enum {
STREAM_HANDSHAKE_BUSY_TRY_LATER = -10,
STREAM_HANDSHAKE_INTERNAL_ERROR = -11,
STREAM_HANDSHAKE_INITIALIZATION = -12,
-} STREAM_HANDSHAKE;
-
-
-// ----------------------------------------------------------------------------
+ STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP = -13,
+ STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER = -14,
+ STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15,
+ STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16,
+ STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17,
+ STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR = -18,
+ STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19,
+ STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20,
+ STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21,
+ STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22,
-typedef enum __attribute__((packed)) {
- STREAM_TRAFFIC_TYPE_REPLICATION,
- STREAM_TRAFFIC_TYPE_FUNCTIONS,
- STREAM_TRAFFIC_TYPE_METADATA,
- STREAM_TRAFFIC_TYPE_DATA,
+} STREAM_HANDSHAKE;
- // terminator
- STREAM_TRAFFIC_TYPE_MAX,
-} STREAM_TRAFFIC_TYPE;
// ----------------------------------------------------------------------------
@@ -119,35 +118,115 @@ typedef struct {
char *kernel_version;
} stream_encoded_t;
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+// signature MUST end with a newline
+#define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
+#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
+#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE 4
+
struct compressor_state {
+ bool initialized;
char *compression_result_buffer;
size_t compression_result_buffer_size;
- struct compressor_data *data; // Compression API specific data
- void (*reset)(struct compressor_state *state);
+ struct {
+ void *lz4_stream;
+ char *input_ring_buffer;
+ size_t input_ring_buffer_size;
+ size_t input_ring_buffer_pos;
+ } stream;
size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
void (*destroy)(struct compressor_state **state);
};
+void rrdpush_compressor_reset(struct compressor_state *state);
+void rrdpush_compressor_destroy(struct compressor_state *state);
+size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out);
+
struct decompressor_state {
+ bool initialized;
size_t signature_size;
size_t total_compressed;
size_t total_uncompressed;
size_t packet_count;
- struct decompressor_stream *stream; // Decompression API specific data
- void (*reset)(struct decompressor_state *state);
- size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
- size_t (*decompress)(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
- size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
- size_t (*get)(struct decompressor_state *state, char *data, size_t size);
- void (*destroy)(struct decompressor_state **state);
+ struct {
+ void *lz4_stream;
+ char *buffer;
+ size_t size;
+ size_t write_at;
+ size_t read_at;
+ } stream;
};
+
+void rrdpush_decompressor_destroy(struct decompressor_state *state);
+void rrdpush_decompressor_reset(struct decompressor_state *state);
+size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
+
+static inline size_t rrdpush_decompress_decode_header(const char *data, size_t data_size) {
+ if (unlikely(!data || !data_size))
+ return 0;
+
+ if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
+ return 0;
+
+ uint32_t sign = *(uint32_t *)data;
+ if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
+ return 0;
+
+ size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
+ return length;
+}
+
+static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
+ if(unlikely(state->stream.read_at != state->stream.write_at))
+ fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
+
+ return rrdpush_decompress_decode_header(header, header_size);
+}
+
+static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
+ if(unlikely(state->stream.read_at > state->stream.write_at))
+ fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
+
+ return state->stream.write_at - state->stream.read_at;
+}
+
+static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
+ if (unlikely(!state || !size || !dst))
+ return 0;
+
+ size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);
+
+ if(unlikely(!remaining))
+ return 0;
+
+ size_t bytes_to_return = size;
+ if(bytes_to_return > remaining)
+ bytes_to_return = remaining;
+
+ memcpy(dst, state->stream.buffer + state->stream.read_at, bytes_to_return);
+ state->stream.read_at += bytes_to_return;
+
+ if(unlikely(state->stream.read_at > state->stream.write_at))
+ fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
+
+ return bytes_to_return;
+}
#endif
// Thread-local storage
- // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
+// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
-typedef enum {
+typedef enum __attribute__((packed)) {
+ STREAM_TRAFFIC_TYPE_REPLICATION = 0,
+ STREAM_TRAFFIC_TYPE_FUNCTIONS,
+ STREAM_TRAFFIC_TYPE_METADATA,
+ STREAM_TRAFFIC_TYPE_DATA,
+
+ // terminator
+ STREAM_TRAFFIC_TYPE_MAX,
+} STREAM_TRAFFIC_TYPE;
+
+typedef enum __attribute__((packed)) {
SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
@@ -158,7 +237,7 @@ struct sender_state {
SENDER_FLAGS flags;
int timeout;
int default_port;
- usec_t reconnect_delay;
+ uint32_t reconnect_delay;
char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
size_t begin;
size_t reconnects_counter;
@@ -170,10 +249,10 @@ struct sender_state {
size_t not_connected_loops;
// Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
// the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
- netdata_mutex_t mutex;
+ SPINLOCK spinlock;
struct circular_buffer *buffer;
char read_buffer[PLUGINSD_LINE_MAX + 1];
- int read_len;
+ ssize_t read_len;
STREAM_CAPABILITIES capabilities;
size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
@@ -183,16 +262,17 @@ struct sender_state {
uint16_t hops;
-#ifdef ENABLE_COMPRESSION
- struct compressor_state *compressor;
-#endif
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ struct compressor_state compressor;
+#endif // ENABLE_RRDPUSH_COMPRESSION
+
#ifdef ENABLE_HTTPS
NETDATA_SSL ssl; // structure used to encrypt the connection
#endif
struct {
bool shutdown;
- const char *reason;
+ STREAM_HANDSHAKE reason;
} exit;
struct {
@@ -216,6 +296,9 @@ struct sender_state {
} atomic;
};
+#define sender_lock(sender) spinlock_lock(&(sender)->spinlock)
+#define sender_unlock(sender) spinlock_unlock(&(sender)->spinlock)
+
#define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
#define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
#define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)
@@ -242,6 +325,44 @@ struct sender_state {
#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
+/*
+typedef enum {
+ STREAM_NODE_INSTANCE_FEATURE_CLOUD_ONLINE = (1 << 0),
+ STREAM_NODE_INSTANCE_FEATURE_VIRTUAL_HOST = (1 << 1),
+ STREAM_NODE_INSTANCE_FEATURE_HEALTH_ENABLED = (1 << 2),
+ STREAM_NODE_INSTANCE_FEATURE_ML_SELF = (1 << 3),
+ STREAM_NODE_INSTANCE_FEATURE_ML_RECEIVED = (1 << 4),
+ STREAM_NODE_INSTANCE_FEATURE_SSL = (1 << 5),
+} STREAM_NODE_INSTANCE_FEATURES;
+
+typedef struct stream_node_instance {
+ uuid_t uuid;
+ STRING *agent;
+ STREAM_NODE_INSTANCE_FEATURES features;
+ uint32_t hops;
+
+ // receiver information on that agent
+ int32_t capabilities;
+ uint32_t local_port;
+ uint32_t remote_port;
+ STRING *local_ip;
+ STRING *remote_ip;
+} STREAM_NODE_INSTANCE;
+*/
+
+struct buffered_reader {
+ ssize_t read_len;
+ ssize_t pos;
+ char read_buffer[PLUGINSD_LINE_MAX + 1];
+};
+
+char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size);
+static inline void buffered_reader_init(struct buffered_reader *reader) {
+ reader->read_buffer[0] = '\0';
+ reader->read_len = 0;
+ reader->pos = 0;
+}
+
struct receiver_state {
RRDHOST *host;
pid_t tid;
@@ -263,14 +384,14 @@ struct receiver_state {
struct rrdhost_system_info *system_info;
STREAM_CAPABILITIES capabilities;
time_t last_msg_t;
- char read_buffer[PLUGINSD_LINE_MAX + 1];
- int read_len;
+
+ struct buffered_reader reader;
uint16_t hops;
struct {
bool shutdown; // signal the streaming parser to exit
- const char *reason; // the reason of disconnection to log
+ STREAM_HANDSHAKE reason;
} exit;
struct {
@@ -279,6 +400,7 @@ struct receiver_state {
int update_every;
int health_enabled; // CONFIG_BOOLEAN_YES, CONFIG_BOOLEAN_NO, CONFIG_BOOLEAN_AUTO
time_t alarms_delay;
+ uint32_t alarms_history;
int rrdpush_enabled;
char *rrdpush_api_key; // DONT FREE - it is allocated in appconfig
char *rrdpush_send_charts_matching; // DONT FREE - it is allocated in appconfig
@@ -292,31 +414,36 @@ struct receiver_state {
#ifdef ENABLE_HTTPS
NETDATA_SSL ssl;
#endif
-#ifdef ENABLE_COMPRESSION
- unsigned int rrdpush_compression;
- struct decompressor_state *decompressor;
-#endif
time_t replication_first_time_t;
+
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ struct decompressor_state decompressor;
+#endif // ENABLE_RRDPUSH_COMPRESSION
+/*
+ struct {
+ uint32_t count;
+ STREAM_NODE_INSTANCE *array;
+ } instances;
+*/
};
struct rrdpush_destinations {
STRING *destination;
bool ssl;
-
- const char *last_error;
- time_t last_attempt;
+ uint32_t attempts;
+ time_t since;
time_t postpone_reconnection_until;
- STREAM_HANDSHAKE last_handshake;
+ STREAM_HANDSHAKE reason;
struct rrdpush_destinations *prev;
struct rrdpush_destinations *next;
};
extern unsigned int default_rrdpush_enabled;
-#ifdef ENABLE_COMPRESSION
-extern unsigned int default_compression_enabled;
-#endif
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+extern unsigned int default_rrdpush_compression_enabled;
+#endif // ENABLE_RRDPUSH_COMPRESSION
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
@@ -352,13 +479,14 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
bool rrdset_push_chart_definition_now(RRDSET *st);
void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
-void rrdpush_claimed_id(RRDHOST *host);
+void rrdpush_send_claimed_id(RRDHOST *host);
+void rrdpush_send_global_functions(RRDHOST *host);
#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string);
-void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait);
+void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait);
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
@@ -373,27 +501,266 @@ int connect_to_one_of_destinations(
void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
struct compressor_state *create_compressor();
-struct decompressor_state *create_decompressor();
-#endif
+#endif // ENABLE_RRDPUSH_COMPRESSION
+
void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key);
void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
void log_receiver_capabilities(struct receiver_state *rpt);
void log_sender_capabilities(struct sender_state *s);
-STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
+STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender);
int32_t stream_capabilities_to_vn(uint32_t caps);
void receiver_state_free(struct receiver_state *rpt);
-bool stop_streaming_receiver(RRDHOST *host, const char *reason);
+bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason);
void sender_thread_buffer_free(void);
-void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
-void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
-
#include "replication.h"
+typedef enum __attribute__((packed)) {
+ RRDHOST_DB_STATUS_INITIALIZING = 0,
+ RRDHOST_DB_STATUS_QUERYABLE,
+} RRDHOST_DB_STATUS;
+
+static inline const char *rrdhost_db_status_to_string(RRDHOST_DB_STATUS status) {
+ switch(status) {
+ default:
+ case RRDHOST_DB_STATUS_INITIALIZING:
+ return "initializing";
+
+ case RRDHOST_DB_STATUS_QUERYABLE:
+ return "online";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_DB_LIVENESS_STALE = 0,
+ RRDHOST_DB_LIVENESS_LIVE,
+} RRDHOST_DB_LIVENESS;
+
+static inline const char *rrdhost_db_liveness_to_string(RRDHOST_DB_LIVENESS status) {
+ switch(status) {
+ default:
+ case RRDHOST_DB_LIVENESS_STALE:
+ return "stale";
+
+ case RRDHOST_DB_LIVENESS_LIVE:
+ return "live";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_INGEST_STATUS_ARCHIVED = 0,
+ RRDHOST_INGEST_STATUS_INITIALIZING,
+ RRDHOST_INGEST_STATUS_REPLICATING,
+ RRDHOST_INGEST_STATUS_ONLINE,
+ RRDHOST_INGEST_STATUS_OFFLINE,
+} RRDHOST_INGEST_STATUS;
+
+static inline const char *rrdhost_ingest_status_to_string(RRDHOST_INGEST_STATUS status) {
+ switch(status) {
+ case RRDHOST_INGEST_STATUS_ARCHIVED:
+ return "archived";
+
+ case RRDHOST_INGEST_STATUS_INITIALIZING:
+ return "initializing";
+
+ case RRDHOST_INGEST_STATUS_REPLICATING:
+ return "replicating";
+
+ case RRDHOST_INGEST_STATUS_ONLINE:
+ return "online";
+
+ default:
+ case RRDHOST_INGEST_STATUS_OFFLINE:
+ return "offline";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_INGEST_TYPE_LOCALHOST = 0,
+ RRDHOST_INGEST_TYPE_VIRTUAL,
+ RRDHOST_INGEST_TYPE_CHILD,
+ RRDHOST_INGEST_TYPE_ARCHIVED,
+} RRDHOST_INGEST_TYPE;
+
+static inline const char *rrdhost_ingest_type_to_string(RRDHOST_INGEST_TYPE type) {
+ switch(type) {
+ case RRDHOST_INGEST_TYPE_LOCALHOST:
+ return "localhost";
+
+ case RRDHOST_INGEST_TYPE_VIRTUAL:
+ return "virtual";
+
+ case RRDHOST_INGEST_TYPE_CHILD:
+ return "child";
+
+ default:
+ case RRDHOST_INGEST_TYPE_ARCHIVED:
+ return "archived";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_STREAM_STATUS_DISABLED = 0,
+ RRDHOST_STREAM_STATUS_REPLICATING,
+ RRDHOST_STREAM_STATUS_ONLINE,
+ RRDHOST_STREAM_STATUS_OFFLINE,
+} RRDHOST_STREAMING_STATUS;
+
+static inline const char *rrdhost_streaming_status_to_string(RRDHOST_STREAMING_STATUS status) {
+ switch(status) {
+ case RRDHOST_STREAM_STATUS_DISABLED:
+ return "disabled";
+
+ case RRDHOST_STREAM_STATUS_REPLICATING:
+ return "replicating";
+
+ case RRDHOST_STREAM_STATUS_ONLINE:
+ return "online";
+
+ default:
+ case RRDHOST_STREAM_STATUS_OFFLINE:
+ return "offline";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_ML_STATUS_DISABLED = 0,
+ RRDHOST_ML_STATUS_OFFLINE,
+ RRDHOST_ML_STATUS_RUNNING,
+} RRDHOST_ML_STATUS;
+
+static inline const char *rrdhost_ml_status_to_string(RRDHOST_ML_STATUS status) {
+ switch(status) {
+ case RRDHOST_ML_STATUS_RUNNING:
+ return "online";
+
+ case RRDHOST_ML_STATUS_OFFLINE:
+ return "offline";
+
+ default:
+ case RRDHOST_ML_STATUS_DISABLED:
+ return "disabled";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_ML_TYPE_DISABLED = 0,
+ RRDHOST_ML_TYPE_SELF,
+ RRDHOST_ML_TYPE_RECEIVED,
+} RRDHOST_ML_TYPE;
+
+static inline const char *rrdhost_ml_type_to_string(RRDHOST_ML_TYPE type) {
+ switch(type) {
+ case RRDHOST_ML_TYPE_SELF:
+ return "self";
+
+ case RRDHOST_ML_TYPE_RECEIVED:
+ return "received";
+
+ default:
+ case RRDHOST_ML_TYPE_DISABLED:
+ return "disabled";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_HEALTH_STATUS_DISABLED = 0,
+ RRDHOST_HEALTH_STATUS_INITIALIZING,
+ RRDHOST_HEALTH_STATUS_RUNNING,
+} RRDHOST_HEALTH_STATUS;
+
+static inline const char *rrdhost_health_status_to_string(RRDHOST_HEALTH_STATUS status) {
+ switch(status) {
+ default:
+ case RRDHOST_HEALTH_STATUS_DISABLED:
+ return "disabled";
+
+ case RRDHOST_HEALTH_STATUS_INITIALIZING:
+ return "initializing";
+
+ case RRDHOST_HEALTH_STATUS_RUNNING:
+ return "online";
+ }
+}
+
+typedef struct rrdhost_status {
+ RRDHOST *host;
+ time_t now;
+
+ struct {
+ RRDHOST_DB_STATUS status;
+ RRDHOST_DB_LIVENESS liveness;
+ RRD_MEMORY_MODE mode;
+ time_t first_time_s;
+ time_t last_time_s;
+ size_t metrics;
+ size_t instances;
+ size_t contexts;
+ } db;
+
+ struct {
+ RRDHOST_ML_STATUS status;
+ RRDHOST_ML_TYPE type;
+ struct ml_metrics_statistics metrics;
+ } ml;
+
+ struct {
+ size_t hops;
+ RRDHOST_INGEST_TYPE type;
+ RRDHOST_INGEST_STATUS status;
+ SOCKET_PEERS peers;
+ bool ssl;
+ STREAM_CAPABILITIES capabilities;
+ uint32_t id;
+ time_t since;
+ STREAM_HANDSHAKE reason;
+
+ struct {
+ bool in_progress;
+ NETDATA_DOUBLE completion;
+ size_t instances;
+ } replication;
+ } ingest;
+
+ struct {
+ size_t hops;
+ RRDHOST_STREAMING_STATUS status;
+ SOCKET_PEERS peers;
+ bool ssl;
+ bool compression;
+ STREAM_CAPABILITIES capabilities;
+ uint32_t id;
+ time_t since;
+ STREAM_HANDSHAKE reason;
+
+ struct {
+ bool in_progress;
+ NETDATA_DOUBLE completion;
+ size_t instances;
+ } replication;
+
+ size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
+ } stream;
+
+ struct {
+ RRDHOST_HEALTH_STATUS status;
+ struct {
+ uint32_t undefined;
+ uint32_t uninitialized;
+ uint32_t clear;
+ uint32_t warning;
+ uint32_t critical;
+ } alerts;
+ } health;
+} RRDHOST_STATUS;
+
+void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
+bool rrdhost_state_cloud_emulation(RRDHOST *host);
+
#endif //NETDATA_RRDPUSH_H