diff options
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r-- | streaming/rrdpush.h | 224 |
1 files changed, 95 insertions, 129 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index c3c14233f..1459c881e 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -18,12 +18,14 @@ #define STREAM_OLD_VERSION_CLAIM 3 #define STREAM_OLD_VERSION_CLABELS 4 -#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production +#define STREAM_OLD_VERSION_LZ4 5 // ---------------------------------------------------------------------------- // capabilities negotiation typedef enum { + STREAM_CAP_NONE = 0, + // do not use the first 3 bits // they used to be versions 1, 2 and 3 // before we introduce capabilities @@ -38,7 +40,7 @@ typedef enum { STREAM_CAP_HLABELS = (1 << 7), // host labels supported STREAM_CAP_CLAIM = (1 << 8), // claiming supported STREAM_CAP_CLABELS = (1 << 9), // chart labels supported - STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported + STREAM_CAP_LZ4 = (1 << 10), // lz4 compression supported STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported STREAM_CAP_REPLICATION = (1 << 12), // replication supported STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data @@ -46,22 +48,47 @@ typedef enum { STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming + STREAM_CAP_SLOTS = (1 << 18), // the sender can appoint a unique slot for each chart + STREAM_CAP_ZSTD = (1 << 19), // ZSTD compression supported + STREAM_CAP_GZIP = (1 << 20), // GZIP compression supported + STREAM_CAP_BROTLI = (1 << 21), // BROTLI compression supported STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set // this must be signed int, so don't use the last bit // needed for negotiating errors between parent and child } STREAM_CAPABILITIES; -#ifdef ENABLE_RRDPUSH_COMPRESSION -#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION +#ifdef ENABLE_LZ4 +#define STREAM_CAP_LZ4_AVAILABLE STREAM_CAP_LZ4 +#else +#define STREAM_CAP_LZ4_AVAILABLE 0 +#endif // ENABLE_LZ4 + +#ifdef ENABLE_ZSTD +#define STREAM_CAP_ZSTD_AVAILABLE STREAM_CAP_ZSTD #else -#define STREAM_HAS_COMPRESSION 0 -#endif // ENABLE_RRDPUSH_COMPRESSION +#define STREAM_CAP_ZSTD_AVAILABLE 0 +#endif // ENABLE_ZSTD + +#ifdef ENABLE_BROTLI +#define STREAM_CAP_BROTLI_AVAILABLE STREAM_CAP_BROTLI +#else +#define STREAM_CAP_BROTLI_AVAILABLE 0 +#endif // ENABLE_BROTLI + +#define STREAM_CAP_COMPRESSIONS_AVAILABLE (STREAM_CAP_LZ4_AVAILABLE|STREAM_CAP_ZSTD_AVAILABLE|STREAM_CAP_BROTLI_AVAILABLE|STREAM_CAP_GZIP) + +extern STREAM_CAPABILITIES globally_disabled_capabilities; STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender); #define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability)) +static inline bool stream_has_more_than_one_capability_of(STREAM_CAPABILITIES caps, STREAM_CAPABILITIES mask) { + STREAM_CAPABILITIES common = (STREAM_CAPABILITIES)(caps & mask); + return (common & (common - 1)) != 0 && common != 0; +} + // ---------------------------------------------------------------------------- // stream handshake @@ -79,6 +106,31 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender); #define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later." #define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later." +#define RRDPUSH_STATUS_CONNECTED "CONNECTED" +#define RRDPUSH_STATUS_ALREADY_CONNECTED "ALREADY CONNECTED" +#define RRDPUSH_STATUS_DISCONNECTED "DISCONNECTED" +#define RRDPUSH_STATUS_RATE_LIMIT "RATE LIMIT TRY LATER" +#define RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS "INITIALIZATION IN PROGRESS RETRY LATER" +#define RRDPUSH_STATUS_INTERNAL_SERVER_ERROR "INTERNAL SERVER ERROR DROPPING CONNECTION" +#define RRDPUSH_STATUS_DUPLICATE_RECEIVER "DUPLICATE RECEIVER DROPPING CONNECTION" +#define RRDPUSH_STATUS_CANT_REPLY "CANT REPLY DROPPING CONNECTION" +#define RRDPUSH_STATUS_NO_HOSTNAME "NO HOSTNAME PERMISSION DENIED" +#define RRDPUSH_STATUS_NO_API_KEY "NO API KEY PERMISSION DENIED" +#define RRDPUSH_STATUS_INVALID_API_KEY "INVALID API KEY PERMISSION DENIED" +#define RRDPUSH_STATUS_NO_MACHINE_GUID "NO MACHINE GUID PERMISSION DENIED" +#define RRDPUSH_STATUS_MACHINE_GUID_DISABLED "MACHINE GUID DISABLED PERMISSION DENIED" +#define RRDPUSH_STATUS_INVALID_MACHINE_GUID "INVALID MACHINE GUID PERMISSION DENIED" +#define RRDPUSH_STATUS_API_KEY_DISABLED "API KEY DISABLED PERMISSION DENIED" +#define RRDPUSH_STATUS_NOT_ALLOWED_IP "NOT ALLOWED IP PERMISSION DENIED" +#define RRDPUSH_STATUS_LOCALHOST "LOCALHOST PERMISSION DENIED" +#define RRDPUSH_STATUS_PERMISSION_DENIED "PERMISSION DENIED" +#define RRDPUSH_STATUS_BAD_HANDSHAKE "BAD HANDSHAKE" +#define RRDPUSH_STATUS_TIMEOUT "TIMEOUT" +#define RRDPUSH_STATUS_CANT_UPGRADE_CONNECTION "CANT UPGRADE CONNECTION" +#define RRDPUSH_STATUS_SSL_ERROR "SSL ERROR" +#define RRDPUSH_STATUS_INVALID_SSL_CERTIFICATE "INVALID SSL CERTIFICATE" +#define RRDPUSH_STATUS_CANT_ESTABLISH_SSL_CONNECTION "CANT ESTABLISH SSL CONNECTION" + typedef enum { STREAM_HANDSHAKE_OK_V3 = 3, // v3+ STREAM_HANDSHAKE_OK_V2 = 2, // v2 @@ -101,11 +153,16 @@ typedef enum { STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15, STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16, STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17, - STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR = -18, + STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR = -18, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20, STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21, STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22, + STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER = -23, + STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF = -24, + STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED = -25, + STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT = -26, + STREAM_HANDSHAKE_ERROR_HTTP_UPGRADE = -27, } STREAM_HANDSHAKE; @@ -120,100 +177,7 @@ typedef struct { char *kernel_version; } stream_encoded_t; -#ifdef ENABLE_RRDPUSH_COMPRESSION -// signature MUST end with a newline -#define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24)) -#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24)) -#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE 4 - -struct compressor_state { - bool initialized; - char *compression_result_buffer; - size_t compression_result_buffer_size; - struct { - void *lz4_stream; - char *input_ring_buffer; - size_t input_ring_buffer_size; - size_t input_ring_buffer_pos; - } stream; - size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer); - void (*destroy)(struct compressor_state **state); -}; - -void rrdpush_compressor_reset(struct compressor_state *state); -void rrdpush_compressor_destroy(struct compressor_state *state); -size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out); - -struct decompressor_state { - bool initialized; - size_t signature_size; - size_t total_compressed; - size_t total_uncompressed; - size_t packet_count; - struct { - void *lz4_stream; - char *buffer; - size_t size; - size_t write_at; - size_t read_at; - } stream; -}; - -void rrdpush_decompressor_destroy(struct decompressor_state *state); -void rrdpush_decompressor_reset(struct decompressor_state *state); -size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); - -static inline size_t rrdpush_decompress_decode_header(const char *data, size_t data_size) { - if (unlikely(!data || !data_size)) - return 0; - - if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE)) - return 0; - - uint32_t sign = *(uint32_t *)data; - if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE)) - return 0; - - size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7)); - return length; -} - -static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) { - if(unlikely(state->stream.read_at != state->stream.write_at)) - fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!"); - - return rrdpush_decompress_decode_header(header, header_size); -} - -static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) { - if(unlikely(state->stream.read_at > state->stream.write_at)) - fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions"); - - return state->stream.write_at - state->stream.read_at; -} - -static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) { - if (unlikely(!state || !size || !dst)) - return 0; - - size_t remaining = rrdpush_decompressed_bytes_in_buffer(state); - - if(unlikely(!remaining)) - return 0; - - size_t bytes_to_return = size; - if(bytes_to_return > remaining) - bytes_to_return = remaining; - - memcpy(dst, state->stream.buffer + state->stream.read_at, bytes_to_return); - state->stream.read_at += bytes_to_return; - - if(unlikely(state->stream.read_at > state->stream.write_at)) - fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions"); - - return bytes_to_return; -} -#endif +#include "compression.h" // Thread-local storage // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. @@ -223,6 +187,7 @@ typedef enum __attribute__((packed)) { STREAM_TRAFFIC_TYPE_FUNCTIONS, STREAM_TRAFFIC_TYPE_METADATA, STREAM_TRAFFIC_TYPE_DATA, + STREAM_TRAFFIC_TYPE_DYNCFG, // terminator STREAM_TRAFFIC_TYPE_MAX, @@ -230,7 +195,6 @@ typedef enum __attribute__((packed)) { typedef enum __attribute__((packed)) { SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown - SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression } SENDER_FLAGS; struct function_payload_state { @@ -263,6 +227,7 @@ struct sender_state { char read_buffer[PLUGINSD_LINE_MAX + 1]; ssize_t read_len; STREAM_CAPABILITIES capabilities; + STREAM_CAPABILITIES disabled_capabilities; size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX]; @@ -274,9 +239,12 @@ struct sender_state { uint16_t hops; -#ifdef ENABLE_RRDPUSH_COMPRESSION + struct line_splitter line; struct compressor_state compressor; -#endif // ENABLE_RRDPUSH_COMPRESSION + +#ifdef NETDATA_LOG_STREAM_SENDER + FILE *stream_log_fp; +#endif #ifdef ENABLE_HTTPS NETDATA_SSL ssl; // structure used to encrypt the connection @@ -306,6 +274,8 @@ struct sender_state { usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC time_t last_buffer_recreate_s; // true when the sender buffer should be re-created } atomic; + + int parent_using_h2o; }; #define sender_lock(sender) spinlock_lock(&(sender)->spinlock) @@ -362,19 +332,6 @@ typedef struct stream_node_instance { } STREAM_NODE_INSTANCE; */ -struct buffered_reader { - ssize_t read_len; - ssize_t pos; - char read_buffer[PLUGINSD_LINE_MAX + 1]; -}; - -bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst); -static inline void buffered_reader_init(struct buffered_reader *reader) { - reader->read_buffer[0] = '\0'; - reader->read_len = 0; - reader->pos = 0; -} - struct receiver_state { RRDHOST *host; pid_t tid; @@ -421,6 +378,7 @@ struct receiver_state { time_t rrdpush_replication_step; char *rrdpush_destination; // DONT FREE - it is allocated in appconfig unsigned int rrdpush_compression; + STREAM_CAPABILITIES compression_priorities[COMPRESSION_ALGORITHM_MAX]; } config; #ifdef ENABLE_HTTPS @@ -429,17 +387,24 @@ struct receiver_state { time_t replication_first_time_t; -#ifdef ENABLE_RRDPUSH_COMPRESSION struct decompressor_state decompressor; -#endif // ENABLE_RRDPUSH_COMPRESSION /* struct { uint32_t count; STREAM_NODE_INSTANCE *array; } instances; */ + +#ifdef ENABLE_H2O + void *h2o_ctx; +#endif }; +#ifdef ENABLE_H2O +#define is_h2o_rrdpush(x) ((x)->h2o_ctx != NULL) +#define unless_h2o_rrdpush(x) if(!is_h2o_rrdpush(x)) +#endif + struct rrdpush_destinations { STRING *destination; bool ssl; @@ -453,9 +418,7 @@ struct rrdpush_destinations { }; extern unsigned int default_rrdpush_enabled; -#ifdef ENABLE_RRDPUSH_COMPRESSION extern unsigned int default_rrdpush_compression_enabled; -#endif // ENABLE_RRDPUSH_COMPRESSION extern char *default_rrdpush_destination; extern char *default_rrdpush_api_key; extern char *default_rrdpush_send_charts_matching; @@ -498,11 +461,10 @@ void rrdpush_send_dyncfg(RRDHOST *host); #define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended #define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended -int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string); +int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx); void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait); void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva); -void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); int connect_to_one_of_destinations( RRDHOST *host, int default_port, @@ -514,18 +476,15 @@ int connect_to_one_of_destinations( void rrdpush_signal_sender_to_wake_up(struct sender_state *s); -#ifdef ENABLE_RRDPUSH_COMPRESSION -struct compressor_state *create_compressor(); -#endif // ENABLE_RRDPUSH_COMPRESSION - void rrdpush_reset_destinations_postpone_time(RRDHOST *host); const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error); void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key); -void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status); +void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority); void log_receiver_capabilities(struct receiver_state *rpt); void log_sender_capabilities(struct sender_state *s); STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender); int32_t stream_capabilities_to_vn(uint32_t caps); +void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps); void receiver_state_free(struct receiver_state *rpt); bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason); @@ -781,6 +740,13 @@ void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name); void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type); -void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);//x +void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags); +void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name); + +bool rrdpush_compression_initialize(struct sender_state *s); +bool rrdpush_decompression_initialize(struct receiver_state *rpt); +void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order); +void rrdpush_select_receiver_compression_algorithm(struct receiver_state *rpt); +void rrdpush_compression_deactivate(struct sender_state *s); #endif //NETDATA_RRDPUSH_H |