diff options
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r-- | streaming/rrdpush.h | 232 |
1 files changed, 171 insertions, 61 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 1eb39cc6c..c5f7618c1 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -10,32 +10,83 @@ #define CONNECTED_TO_SIZE 100 -#define STREAM_VERSION_CLAIM 3 -#define STREAM_VERSION_CLABELS 4 -#define STREAM_VERSION_COMPRESSION 5 -#define VERSION_GAP_FILLING 6 +// ---------------------------------------------------------------------------- +// obsolete versions - do not use anymore + +#define STREAM_OLD_VERSION_CLAIM 3 +#define STREAM_OLD_VERSION_CLABELS 4 +#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production + +// ---------------------------------------------------------------------------- +// capabilities negotiation + +typedef enum { + // do not use the first 3 bits + STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol + STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels) + STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol) + // v3 = claiming supported + // v4 = chart labels supported + // v5 = lz4 compression supported + STREAM_CAP_VCAPS = (1 << 6), // capabilities negotiation supported + STREAM_CAP_HLABELS = (1 << 7), // host labels supported + STREAM_CAP_CLAIM = (1 << 8), // claiming supported + STREAM_CAP_CLABELS = (1 << 9), // chart labels supported + STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported + STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported + STREAM_CAP_REPLICATION = (1 << 12), // replication supported + STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data + + // this must be signed int, so don't use the last bit + // needed for negotiating errors between parent and child +} STREAM_CAPABILITIES; #ifdef ENABLE_COMPRESSION -#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_COMPRESSION) +#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION #else -#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_CLABELS) -#endif //ENABLE_COMPRESSION +#define STREAM_HAS_COMPRESSION 0 +#endif // ENABLE_COMPRESSION + +#define STREAM_OUR_CAPABILITIES ( \ + STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | \ + STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | \ + STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS | STREAM_CAP_REPLICATION | STREAM_CAP_BINARY ) + +#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability))) + +// ---------------------------------------------------------------------------- +// stream handshake + +#define HTTP_HEADER_SIZE 8192 #define STREAMING_PROTOCOL_VERSION "1.1" -#define START_STREAMING_PROMPT "Hit me baby, push them over..." -#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..." +#define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..." +#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..." #define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version=" #define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back" #define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server" #define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info." -#define HTTP_HEADER_SIZE 8192 - typedef enum { - RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW, - RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW -} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY; + STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION + STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS + STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM + STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS + STREAM_HANDSHAKE_OK_V1 = 1, + STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1, + STREAM_HANDSHAKE_ERROR_LOCALHOST = -2, + STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3, + STREAM_HANDSHAKE_ERROR_DENIED = -4, + STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5, + STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6, + STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7, + STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8, + STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9 +} STREAM_HANDSHAKE; + + +// ---------------------------------------------------------------------------- typedef struct { char *os_name; @@ -47,8 +98,8 @@ typedef struct { #ifdef ENABLE_COMPRESSION struct compressor_state { - char *buffer; - size_t buffer_size; + char *compression_result_buffer; + size_t compression_result_buffer_size; struct compressor_data *data; // Compression API specific data void (*reset)(struct compressor_state *state); size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer); @@ -56,21 +107,14 @@ struct compressor_state { }; struct decompressor_state { - char *buffer; - size_t buffer_size; - size_t buffer_len; - size_t buffer_pos; - char *out_buffer; - size_t out_buffer_len; - size_t out_buffer_pos; + size_t signature_size; size_t total_compressed; size_t total_uncompressed; size_t packet_count; - struct decompressor_data *data; // Decompression API specific data + struct decompressor_stream *stream; // Decompression API specific data void (*reset)(struct decompressor_state *state); size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size); - size_t (*put)(struct decompressor_state *state, const char *data, size_t size); - size_t (*decompress)(struct decompressor_state *state); + size_t (*decompress)(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state); size_t (*get)(struct decompressor_state *state, char *data, size_t size); void (*destroy)(struct decompressor_state **state); @@ -80,11 +124,17 @@ struct decompressor_state { // Thread-local storage // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. +typedef enum { + SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown + SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression +} SENDER_FLAGS; + struct sender_state { RRDHOST *host; - pid_t task_id; - unsigned int overflow:1; - int timeout, default_port; + pid_t tid; // the thread id of the sender, from gettid() + SENDER_FLAGS flags; + int timeout; + int default_port; usec_t reconnect_delay; char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c size_t begin; @@ -92,22 +142,62 @@ struct sender_state { size_t sent_bytes; size_t sent_bytes_on_this_connection; size_t send_attempts; - time_t last_sent_t; + time_t last_traffic_seen_t; size_t not_connected_loops; // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here. netdata_mutex_t mutex; struct circular_buffer *buffer; - BUFFER *build; - char read_buffer[512]; + char read_buffer[PLUGINSD_LINE_MAX + 1]; int read_len; - int32_t version; + STREAM_CAPABILITIES capabilities; + + int rrdpush_sender_pipe[2]; // collector to sender thread signaling + int rrdpush_sender_socket; + #ifdef ENABLE_COMPRESSION - unsigned int rrdpush_compression; struct compressor_state *compressor; #endif +#ifdef ENABLE_HTTPS + struct netdata_ssl ssl; // structure used to encrypt the connection +#endif + + struct { + DICTIONARY *requests; // de-duplication of replication requests, per chart + + struct { + size_t pending_requests; // the currently outstanding replication requests + size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart) + } atomic; + + struct { + bool reached_max; // used to avoid resetting the replication thread too frequently + } unsafe; // protected by sender mutex + + } replication; + + struct { + size_t buffer_used_percentage; // the current utilization of the sending buffer + usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC + } atomic; }; +#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED); +#define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED) + +#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED); +#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED) + +#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED) + +#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED) + struct receiver_state { RRDHOST *host; netdata_thread_t thread; @@ -127,9 +217,9 @@ struct receiver_state { char *program_version; struct rrdhost_system_info *system_info; int update_every; - uint32_t stream_version; + STREAM_CAPABILITIES capabilities; time_t last_msg_t; - char read_buffer[1024]; // Need to allow RRD_ID_LENGTH_MAX * 4 + the other fields + char read_buffer[PLUGINSD_LINE_MAX + 1]; int read_len; unsigned int shutdown:1; // Tell the thread to exit unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!) @@ -140,14 +230,18 @@ struct receiver_state { unsigned int rrdpush_compression; struct decompressor_state *decompressor; #endif + + time_t replication_first_time_t; }; struct rrdpush_destinations { - char destination[CONNECTED_TO_SIZE + 1]; - int disabled_no_proper_reply; - int disabled_because_of_localhost; - time_t disabled_already_streaming; - int disabled_because_of_denied_access; + STRING *destination; + + const char *last_error; + time_t postpone_reconnection_until; + STREAM_HANDSHAKE last_handshake; + + struct rrdpush_destinations *prev; struct rrdpush_destinations *next; }; @@ -158,27 +252,35 @@ extern unsigned int default_compression_enabled; extern char *default_rrdpush_destination; extern char *default_rrdpush_api_key; extern char *default_rrdpush_send_charts_matching; +extern bool default_rrdpush_enable_replication; +extern time_t default_rrdpush_seconds_to_replicate; +extern time_t default_rrdpush_replication_step; extern unsigned int remote_clock_resync_iterations; -extern void sender_init(RRDHOST *parent); -extern struct rrdpush_destinations *destinations_init(const char *destinations); -void sender_start(struct sender_state *s); -void sender_commit(struct sender_state *s); -extern int rrdpush_init(); -extern int configured_as_parent(); -extern void rrdset_done_push(RRDSET *st); -extern void rrdset_push_chart_definition_now(RRDSET *st); -extern void *rrdpush_sender_thread(void *ptr); -extern void rrdpush_send_labels(RRDHOST *host); -extern void rrdpush_claimed_id(RRDHOST *host); - -extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url); -extern void rrdpush_sender_thread_stop(RRDHOST *host); - -extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv); -extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); -extern int connect_to_one_of_destinations( - struct rrdpush_destinations *destinations, +void rrdpush_destinations_init(RRDHOST *host); +void rrdpush_destinations_free(RRDHOST *host); + +void sender_init(RRDHOST *host); + +BUFFER *sender_start(struct sender_state *s); +void sender_commit(struct sender_state *s, BUFFER *wb); +void sender_cancel(struct sender_state *s); +int rrdpush_init(); +bool rrdpush_receiver_needs_dbengine(); +int configured_as_parent(); +void rrdset_done_push(RRDSET *st); +bool rrdset_push_chart_definition_now(RRDSET *st); +void *rrdpush_sender_thread(void *ptr); +void rrdpush_send_host_labels(RRDHOST *host); +void rrdpush_claimed_id(RRDHOST *host); + +int rrdpush_receiver_thread_spawn(struct web_client *w, char *url); +void rrdpush_sender_thread_stop(RRDHOST *host); + +void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva); +void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); +int connect_to_one_of_destinations( + RRDHOST *host, int default_port, struct timeval *timeout, size_t *reconnects_counter, @@ -186,10 +288,18 @@ extern int connect_to_one_of_destinations( size_t connected_to_size, struct rrdpush_destinations **destination); +void rrdpush_signal_sender_to_wake_up(struct sender_state *s); + #ifdef ENABLE_COMPRESSION struct compressor_state *create_compressor(); struct decompressor_state *create_decompressor(); -size_t is_compressed_data(const char *data, size_t data_size); #endif +void log_receiver_capabilities(struct receiver_state *rpt); +void log_sender_capabilities(struct sender_state *s); +STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version); +int32_t stream_capabilities_to_vn(uint32_t caps); + +#include "replication.h" + #endif //NETDATA_RRDPUSH_H |