diff options
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r-- | streaming/rrdpush.h | 41 |
1 files changed, 36 insertions, 5 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index ff8958440..f97c8ddfb 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -72,6 +72,9 @@ STREAM_CAPABILITIES stream_our_capabilities(); #define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back" #define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server" #define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info." +#define START_STREAMING_ERROR_BUSY_TRY_LATER "The server is too busy now to accept this request. Try later." +#define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later." +#define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later." typedef enum { STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION @@ -87,12 +90,27 @@ typedef enum { STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6, STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7, STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8, - STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9 + STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9, + STREAM_HANDSHAKE_BUSY_TRY_LATER = -10, + STREAM_HANDSHAKE_INTERNAL_ERROR = -11, + STREAM_HANDSHAKE_INITIALIZATION = -12, } STREAM_HANDSHAKE; // ---------------------------------------------------------------------------- +typedef enum __attribute__((packed)) { + STREAM_TRAFFIC_TYPE_REPLICATION, + STREAM_TRAFFIC_TYPE_FUNCTIONS, + STREAM_TRAFFIC_TYPE_METADATA, + STREAM_TRAFFIC_TYPE_DATA, + + // terminator + STREAM_TRAFFIC_TYPE_MAX, +} STREAM_TRAFFIC_TYPE; + +// ---------------------------------------------------------------------------- + typedef struct { char *os_name; char *os_id; @@ -148,6 +166,7 @@ struct sender_state { size_t sent_bytes_on_this_connection; size_t send_attempts; time_t last_traffic_seen_t; + time_t last_state_since_t; // the timestamp of the last state (online/offline) change size_t not_connected_loops; // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here. @@ -157,6 +176,8 @@ struct sender_state { int read_len; STREAM_CAPABILITIES capabilities; + size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX]; + int rrdpush_sender_pipe[2]; // collector to sender thread signaling int rrdpush_sender_socket; @@ -166,7 +187,7 @@ struct sender_state { struct compressor_state *compressor; #endif #ifdef ENABLE_HTTPS - struct netdata_ssl ssl; // structure used to encrypt the connection + NETDATA_SSL ssl; // structure used to encrypt the connection #endif struct { @@ -176,6 +197,8 @@ struct sender_state { struct { DICTIONARY *requests; // de-duplication of replication requests, per chart + time_t oldest_request_after_t; // the timestamp of the oldest replication request + time_t latest_completed_before_t; // the timestamp of the latest replication request struct { size_t pending_requests; // the currently outstanding replication requests @@ -221,6 +244,7 @@ struct sender_state { struct receiver_state { RRDHOST *host; + pid_t tid; netdata_thread_t thread; int fd; char *key; @@ -266,7 +290,7 @@ struct receiver_state { } config; #ifdef ENABLE_HTTPS - struct netdata_ssl ssl; + NETDATA_SSL ssl; #endif #ifdef ENABLE_COMPRESSION unsigned int rrdpush_compression; @@ -278,8 +302,10 @@ struct receiver_state { struct rrdpush_destinations { STRING *destination; + bool ssl; const char *last_error; + time_t last_attempt; time_t postpone_reconnection_until; STREAM_HANDSHAKE last_handshake; @@ -303,7 +329,7 @@ void rrdpush_destinations_init(RRDHOST *host); void rrdpush_destinations_free(RRDHOST *host); BUFFER *sender_start(struct sender_state *s); -void sender_commit(struct sender_state *s, BUFFER *wb); +void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type); int rrdpush_init(); bool rrdpush_receiver_needs_dbengine(); int configured_as_parent(); @@ -351,7 +377,9 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s); struct compressor_state *create_compressor(); struct decompressor_state *create_decompressor(); #endif - +void rrdpush_reset_destinations_postpone_time(RRDHOST *host); +const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error); +void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key); void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status); void log_receiver_capabilities(struct receiver_state *rpt); void log_sender_capabilities(struct sender_state *s); @@ -363,6 +391,9 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason); void sender_thread_buffer_free(void); +void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused); +void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused); + #include "replication.h" #endif //NETDATA_RRDPUSH_H |