diff options
Diffstat (limited to '')
-rw-r--r-- | streaming/rrdpush.h | 66 |
1 files changed, 55 insertions, 11 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index a0c7e8de..94c1320e 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -3,12 +3,14 @@ #ifndef NETDATA_RRDPUSH_H #define NETDATA_RRDPUSH_H 1 -#include "database/rrd.h" #include "libnetdata/libnetdata.h" -#include "web/server/web_client.h" #include "daemon/common.h" +#include "web/server/web_client.h" +#include "database/rrd.h" #define CONNECTED_TO_SIZE 100 +#define CBUFFER_INITIAL_SIZE (16 * 1024) +#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2) // ---------------------------------------------------------------------------- // obsolete versions - do not use anymore @@ -22,6 +24,9 @@ typedef enum { // do not use the first 3 bits + // they used to be versions 1, 2 and 3 + // before we introduce capabilities + STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels) STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol) @@ -37,6 +42,7 @@ typedef enum { STREAM_CAP_REPLICATION = (1 << 12), // replication supported STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data + STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set // this must be signed int, so don't use the last bit // needed for negotiating errors between parent and child } STREAM_CAPABILITIES; @@ -125,8 +131,8 @@ struct decompressor_state { // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. typedef enum { - SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown - SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression + SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown + SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression } SENDER_FLAGS; struct sender_state { @@ -155,6 +161,8 @@ struct sender_state { int rrdpush_sender_pipe[2]; // collector to sender thread signaling int rrdpush_sender_socket; + uint16_t hops; + #ifdef ENABLE_COMPRESSION struct compressor_state *compressor; #endif @@ -163,6 +171,11 @@ struct sender_state { #endif struct { + bool shutdown; + const char *reason; + } exit; + + struct { DICTIONARY *requests; // de-duplication of replication requests, per chart struct { @@ -176,9 +189,13 @@ struct sender_state { struct { size_t buffer_used_percentage; // the current utilization of the sending buffer usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC + time_t last_buffer_recreate_s; // true when the sender buffer should be re-created } atomic; }; +#define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED) +#define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED) + #define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST) #define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST) @@ -216,13 +233,34 @@ struct receiver_state { char *program_name; // Duplicated in pluginsd char *program_version; struct rrdhost_system_info *system_info; - int update_every; STREAM_CAPABILITIES capabilities; time_t last_msg_t; char read_buffer[PLUGINSD_LINE_MAX + 1]; int read_len; - unsigned int shutdown:1; // Tell the thread to exit - unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!) + + uint16_t hops; + + struct { + bool shutdown; // signal the streaming parser to exit + const char *reason; // the reason of disconnection to log + } exit; + + struct { + RRD_MEMORY_MODE mode; + int history; + int update_every; + int health_enabled; // CONFIG_BOOLEAN_YES, CONFIG_BOOLEAN_NO, CONFIG_BOOLEAN_AUTO + time_t alarms_delay; + int rrdpush_enabled; + char *rrdpush_api_key; // DONT FREE - it is allocated in appconfig + char *rrdpush_send_charts_matching; // DONT FREE - it is allocated in appconfig + bool rrdpush_enable_replication; + time_t rrdpush_seconds_to_replicate; + time_t rrdpush_replication_step; + char *rrdpush_destination; // DONT FREE - it is allocated in appconfig + unsigned int rrdpush_compression; + } config; + #ifdef ENABLE_HTTPS struct netdata_ssl ssl; #endif @@ -260,11 +298,8 @@ extern unsigned int remote_clock_resync_iterations; void rrdpush_destinations_init(RRDHOST *host); void rrdpush_destinations_free(RRDHOST *host); -void sender_init(RRDHOST *host); - BUFFER *sender_start(struct sender_state *s); void sender_commit(struct sender_state *s, BUFFER *wb); -void sender_cancel(struct sender_state *s); int rrdpush_init(); bool rrdpush_receiver_needs_dbengine(); int configured_as_parent(); @@ -274,8 +309,11 @@ void *rrdpush_sender_thread(void *ptr); void rrdpush_send_host_labels(RRDHOST *host); void rrdpush_claimed_id(RRDHOST *host); +#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended +#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended + int rrdpush_receiver_thread_spawn(struct web_client *w, char *url); -void rrdpush_sender_thread_stop(RRDHOST *host); +void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait); void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva); void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); @@ -295,11 +333,17 @@ struct compressor_state *create_compressor(); struct decompressor_state *create_decompressor(); #endif +void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status); void log_receiver_capabilities(struct receiver_state *rpt); void log_sender_capabilities(struct sender_state *s); STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version); int32_t stream_capabilities_to_vn(uint32_t caps); +void receiver_state_free(struct receiver_state *rpt); +bool stop_streaming_receiver(RRDHOST *host, const char *reason); + +void sender_thread_buffer_free(void); + #include "replication.h" #endif //NETDATA_RRDPUSH_H |