summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h41
1 files changed, 36 insertions, 5 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index ff895844..f97c8ddf 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -72,6 +72,9 @@ STREAM_CAPABILITIES stream_our_capabilities();
#define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
#define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
#define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
+#define START_STREAMING_ERROR_BUSY_TRY_LATER "The server is too busy now to accept this request. Try later."
+#define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later."
+#define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later."
typedef enum {
STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
@@ -87,12 +90,27 @@ typedef enum {
STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
- STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9
+ STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9,
+ STREAM_HANDSHAKE_BUSY_TRY_LATER = -10,
+ STREAM_HANDSHAKE_INTERNAL_ERROR = -11,
+ STREAM_HANDSHAKE_INITIALIZATION = -12,
} STREAM_HANDSHAKE;
// ----------------------------------------------------------------------------
+typedef enum __attribute__((packed)) {
+ STREAM_TRAFFIC_TYPE_REPLICATION,
+ STREAM_TRAFFIC_TYPE_FUNCTIONS,
+ STREAM_TRAFFIC_TYPE_METADATA,
+ STREAM_TRAFFIC_TYPE_DATA,
+
+ // terminator
+ STREAM_TRAFFIC_TYPE_MAX,
+} STREAM_TRAFFIC_TYPE;
+
+// ----------------------------------------------------------------------------
+
typedef struct {
char *os_name;
char *os_id;
@@ -148,6 +166,7 @@ struct sender_state {
size_t sent_bytes_on_this_connection;
size_t send_attempts;
time_t last_traffic_seen_t;
+ time_t last_state_since_t; // the timestamp of the last state (online/offline) change
size_t not_connected_loops;
// Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
// the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
@@ -157,6 +176,8 @@ struct sender_state {
int read_len;
STREAM_CAPABILITIES capabilities;
+ size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
+
int rrdpush_sender_pipe[2]; // collector to sender thread signaling
int rrdpush_sender_socket;
@@ -166,7 +187,7 @@ struct sender_state {
struct compressor_state *compressor;
#endif
#ifdef ENABLE_HTTPS
- struct netdata_ssl ssl; // structure used to encrypt the connection
+ NETDATA_SSL ssl; // structure used to encrypt the connection
#endif
struct {
@@ -176,6 +197,8 @@ struct sender_state {
struct {
DICTIONARY *requests; // de-duplication of replication requests, per chart
+ time_t oldest_request_after_t; // the timestamp of the oldest replication request
+ time_t latest_completed_before_t; // the timestamp of the latest replication request
struct {
size_t pending_requests; // the currently outstanding replication requests
@@ -221,6 +244,7 @@ struct sender_state {
struct receiver_state {
RRDHOST *host;
+ pid_t tid;
netdata_thread_t thread;
int fd;
char *key;
@@ -266,7 +290,7 @@ struct receiver_state {
} config;
#ifdef ENABLE_HTTPS
- struct netdata_ssl ssl;
+ NETDATA_SSL ssl;
#endif
#ifdef ENABLE_COMPRESSION
unsigned int rrdpush_compression;
@@ -278,8 +302,10 @@ struct receiver_state {
struct rrdpush_destinations {
STRING *destination;
+ bool ssl;
const char *last_error;
+ time_t last_attempt;
time_t postpone_reconnection_until;
STREAM_HANDSHAKE last_handshake;
@@ -303,7 +329,7 @@ void rrdpush_destinations_init(RRDHOST *host);
void rrdpush_destinations_free(RRDHOST *host);
BUFFER *sender_start(struct sender_state *s);
-void sender_commit(struct sender_state *s, BUFFER *wb);
+void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type);
int rrdpush_init();
bool rrdpush_receiver_needs_dbengine();
int configured_as_parent();
@@ -351,7 +377,9 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
struct compressor_state *create_compressor();
struct decompressor_state *create_decompressor();
#endif
-
+void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
+const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
+void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key);
void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
void log_receiver_capabilities(struct receiver_state *rpt);
void log_sender_capabilities(struct sender_state *s);
@@ -363,6 +391,9 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason);
void sender_thread_buffer_free(void);
+void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
+void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
+
#include "replication.h"
#endif //NETDATA_RRDPUSH_H