summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h224
1 files changed, 95 insertions, 129 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index c3c14233f..1459c881e 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -18,12 +18,14 @@
#define STREAM_OLD_VERSION_CLAIM 3
#define STREAM_OLD_VERSION_CLABELS 4
-#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production
+#define STREAM_OLD_VERSION_LZ4 5
// ----------------------------------------------------------------------------
// capabilities negotiation
typedef enum {
+ STREAM_CAP_NONE = 0,
+
// do not use the first 3 bits
// they used to be versions 1, 2 and 3
// before we introduce capabilities
@@ -38,7 +40,7 @@ typedef enum {
STREAM_CAP_HLABELS = (1 << 7), // host labels supported
STREAM_CAP_CLAIM = (1 << 8), // claiming supported
STREAM_CAP_CLABELS = (1 << 9), // chart labels supported
- STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported
+ STREAM_CAP_LZ4 = (1 << 10), // lz4 compression supported
STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
STREAM_CAP_REPLICATION = (1 << 12), // replication supported
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
@@ -46,22 +48,47 @@ typedef enum {
STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming
+ STREAM_CAP_SLOTS = (1 << 18), // the sender can appoint a unique slot for each chart
+ STREAM_CAP_ZSTD = (1 << 19), // ZSTD compression supported
+ STREAM_CAP_GZIP = (1 << 20), // GZIP compression supported
+ STREAM_CAP_BROTLI = (1 << 21), // BROTLI compression supported
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
// needed for negotiating errors between parent and child
} STREAM_CAPABILITIES;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
-#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
+#ifdef ENABLE_LZ4
+#define STREAM_CAP_LZ4_AVAILABLE STREAM_CAP_LZ4
+#else
+#define STREAM_CAP_LZ4_AVAILABLE 0
+#endif // ENABLE_LZ4
+
+#ifdef ENABLE_ZSTD
+#define STREAM_CAP_ZSTD_AVAILABLE STREAM_CAP_ZSTD
#else
-#define STREAM_HAS_COMPRESSION 0
-#endif // ENABLE_RRDPUSH_COMPRESSION
+#define STREAM_CAP_ZSTD_AVAILABLE 0
+#endif // ENABLE_ZSTD
+
+#ifdef ENABLE_BROTLI
+#define STREAM_CAP_BROTLI_AVAILABLE STREAM_CAP_BROTLI
+#else
+#define STREAM_CAP_BROTLI_AVAILABLE 0
+#endif // ENABLE_BROTLI
+
+#define STREAM_CAP_COMPRESSIONS_AVAILABLE (STREAM_CAP_LZ4_AVAILABLE|STREAM_CAP_ZSTD_AVAILABLE|STREAM_CAP_BROTLI_AVAILABLE|STREAM_CAP_GZIP)
+
+extern STREAM_CAPABILITIES globally_disabled_capabilities;
STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
+static inline bool stream_has_more_than_one_capability_of(STREAM_CAPABILITIES caps, STREAM_CAPABILITIES mask) {
+ STREAM_CAPABILITIES common = (STREAM_CAPABILITIES)(caps & mask);
+ return (common & (common - 1)) != 0 && common != 0;
+}
+
// ----------------------------------------------------------------------------
// stream handshake
@@ -79,6 +106,31 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
#define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later."
#define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later."
+#define RRDPUSH_STATUS_CONNECTED "CONNECTED"
+#define RRDPUSH_STATUS_ALREADY_CONNECTED "ALREADY CONNECTED"
+#define RRDPUSH_STATUS_DISCONNECTED "DISCONNECTED"
+#define RRDPUSH_STATUS_RATE_LIMIT "RATE LIMIT TRY LATER"
+#define RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS "INITIALIZATION IN PROGRESS RETRY LATER"
+#define RRDPUSH_STATUS_INTERNAL_SERVER_ERROR "INTERNAL SERVER ERROR DROPPING CONNECTION"
+#define RRDPUSH_STATUS_DUPLICATE_RECEIVER "DUPLICATE RECEIVER DROPPING CONNECTION"
+#define RRDPUSH_STATUS_CANT_REPLY "CANT REPLY DROPPING CONNECTION"
+#define RRDPUSH_STATUS_NO_HOSTNAME "NO HOSTNAME PERMISSION DENIED"
+#define RRDPUSH_STATUS_NO_API_KEY "NO API KEY PERMISSION DENIED"
+#define RRDPUSH_STATUS_INVALID_API_KEY "INVALID API KEY PERMISSION DENIED"
+#define RRDPUSH_STATUS_NO_MACHINE_GUID "NO MACHINE GUID PERMISSION DENIED"
+#define RRDPUSH_STATUS_MACHINE_GUID_DISABLED "MACHINE GUID DISABLED PERMISSION DENIED"
+#define RRDPUSH_STATUS_INVALID_MACHINE_GUID "INVALID MACHINE GUID PERMISSION DENIED"
+#define RRDPUSH_STATUS_API_KEY_DISABLED "API KEY DISABLED PERMISSION DENIED"
+#define RRDPUSH_STATUS_NOT_ALLOWED_IP "NOT ALLOWED IP PERMISSION DENIED"
+#define RRDPUSH_STATUS_LOCALHOST "LOCALHOST PERMISSION DENIED"
+#define RRDPUSH_STATUS_PERMISSION_DENIED "PERMISSION DENIED"
+#define RRDPUSH_STATUS_BAD_HANDSHAKE "BAD HANDSHAKE"
+#define RRDPUSH_STATUS_TIMEOUT "TIMEOUT"
+#define RRDPUSH_STATUS_CANT_UPGRADE_CONNECTION "CANT UPGRADE CONNECTION"
+#define RRDPUSH_STATUS_SSL_ERROR "SSL ERROR"
+#define RRDPUSH_STATUS_INVALID_SSL_CERTIFICATE "INVALID SSL CERTIFICATE"
+#define RRDPUSH_STATUS_CANT_ESTABLISH_SSL_CONNECTION "CANT ESTABLISH SSL CONNECTION"
+
typedef enum {
STREAM_HANDSHAKE_OK_V3 = 3, // v3+
STREAM_HANDSHAKE_OK_V2 = 2, // v2
@@ -101,11 +153,16 @@ typedef enum {
STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15,
STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16,
STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17,
- STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR = -18,
+ STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR = -18,
STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19,
STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20,
STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21,
STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22,
+ STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER = -23,
+ STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF = -24,
+ STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED = -25,
+ STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT = -26,
+ STREAM_HANDSHAKE_ERROR_HTTP_UPGRADE = -27,
} STREAM_HANDSHAKE;
@@ -120,100 +177,7 @@ typedef struct {
char *kernel_version;
} stream_encoded_t;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
-// signature MUST end with a newline
-#define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
-#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
-#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE 4
-
-struct compressor_state {
- bool initialized;
- char *compression_result_buffer;
- size_t compression_result_buffer_size;
- struct {
- void *lz4_stream;
- char *input_ring_buffer;
- size_t input_ring_buffer_size;
- size_t input_ring_buffer_pos;
- } stream;
- size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
- void (*destroy)(struct compressor_state **state);
-};
-
-void rrdpush_compressor_reset(struct compressor_state *state);
-void rrdpush_compressor_destroy(struct compressor_state *state);
-size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out);
-
-struct decompressor_state {
- bool initialized;
- size_t signature_size;
- size_t total_compressed;
- size_t total_uncompressed;
- size_t packet_count;
- struct {
- void *lz4_stream;
- char *buffer;
- size_t size;
- size_t write_at;
- size_t read_at;
- } stream;
-};
-
-void rrdpush_decompressor_destroy(struct decompressor_state *state);
-void rrdpush_decompressor_reset(struct decompressor_state *state);
-size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
-
-static inline size_t rrdpush_decompress_decode_header(const char *data, size_t data_size) {
- if (unlikely(!data || !data_size))
- return 0;
-
- if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
- return 0;
-
- uint32_t sign = *(uint32_t *)data;
- if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
- return 0;
-
- size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
- return length;
-}
-
-static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
- if(unlikely(state->stream.read_at != state->stream.write_at))
- fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
-
- return rrdpush_decompress_decode_header(header, header_size);
-}
-
-static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
- if(unlikely(state->stream.read_at > state->stream.write_at))
- fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
-
- return state->stream.write_at - state->stream.read_at;
-}
-
-static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
- if (unlikely(!state || !size || !dst))
- return 0;
-
- size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);
-
- if(unlikely(!remaining))
- return 0;
-
- size_t bytes_to_return = size;
- if(bytes_to_return > remaining)
- bytes_to_return = remaining;
-
- memcpy(dst, state->stream.buffer + state->stream.read_at, bytes_to_return);
- state->stream.read_at += bytes_to_return;
-
- if(unlikely(state->stream.read_at > state->stream.write_at))
- fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
-
- return bytes_to_return;
-}
-#endif
+#include "compression.h"
// Thread-local storage
// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
@@ -223,6 +187,7 @@ typedef enum __attribute__((packed)) {
STREAM_TRAFFIC_TYPE_FUNCTIONS,
STREAM_TRAFFIC_TYPE_METADATA,
STREAM_TRAFFIC_TYPE_DATA,
+ STREAM_TRAFFIC_TYPE_DYNCFG,
// terminator
STREAM_TRAFFIC_TYPE_MAX,
@@ -230,7 +195,6 @@ typedef enum __attribute__((packed)) {
typedef enum __attribute__((packed)) {
SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
- SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
struct function_payload_state {
@@ -263,6 +227,7 @@ struct sender_state {
char read_buffer[PLUGINSD_LINE_MAX + 1];
ssize_t read_len;
STREAM_CAPABILITIES capabilities;
+ STREAM_CAPABILITIES disabled_capabilities;
size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
@@ -274,9 +239,12 @@ struct sender_state {
uint16_t hops;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
+ struct line_splitter line;
struct compressor_state compressor;
-#endif // ENABLE_RRDPUSH_COMPRESSION
+
+#ifdef NETDATA_LOG_STREAM_SENDER
+ FILE *stream_log_fp;
+#endif
#ifdef ENABLE_HTTPS
NETDATA_SSL ssl; // structure used to encrypt the connection
@@ -306,6 +274,8 @@ struct sender_state {
usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
} atomic;
+
+ int parent_using_h2o;
};
#define sender_lock(sender) spinlock_lock(&(sender)->spinlock)
@@ -362,19 +332,6 @@ typedef struct stream_node_instance {
} STREAM_NODE_INSTANCE;
*/
-struct buffered_reader {
- ssize_t read_len;
- ssize_t pos;
- char read_buffer[PLUGINSD_LINE_MAX + 1];
-};
-
-bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst);
-static inline void buffered_reader_init(struct buffered_reader *reader) {
- reader->read_buffer[0] = '\0';
- reader->read_len = 0;
- reader->pos = 0;
-}
-
struct receiver_state {
RRDHOST *host;
pid_t tid;
@@ -421,6 +378,7 @@ struct receiver_state {
time_t rrdpush_replication_step;
char *rrdpush_destination; // DONT FREE - it is allocated in appconfig
unsigned int rrdpush_compression;
+ STREAM_CAPABILITIES compression_priorities[COMPRESSION_ALGORITHM_MAX];
} config;
#ifdef ENABLE_HTTPS
@@ -429,17 +387,24 @@ struct receiver_state {
time_t replication_first_time_t;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
struct decompressor_state decompressor;
-#endif // ENABLE_RRDPUSH_COMPRESSION
/*
struct {
uint32_t count;
STREAM_NODE_INSTANCE *array;
} instances;
*/
+
+#ifdef ENABLE_H2O
+ void *h2o_ctx;
+#endif
};
+#ifdef ENABLE_H2O
+#define is_h2o_rrdpush(x) ((x)->h2o_ctx != NULL)
+#define unless_h2o_rrdpush(x) if(!is_h2o_rrdpush(x))
+#endif
+
struct rrdpush_destinations {
STRING *destination;
bool ssl;
@@ -453,9 +418,7 @@ struct rrdpush_destinations {
};
extern unsigned int default_rrdpush_enabled;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
extern unsigned int default_rrdpush_compression_enabled;
-#endif // ENABLE_RRDPUSH_COMPRESSION
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
@@ -498,11 +461,10 @@ void rrdpush_send_dyncfg(RRDHOST *host);
#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
-int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string);
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx);
void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait);
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
-void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
int connect_to_one_of_destinations(
RRDHOST *host,
int default_port,
@@ -514,18 +476,15 @@ int connect_to_one_of_destinations(
void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
-struct compressor_state *create_compressor();
-#endif // ENABLE_RRDPUSH_COMPRESSION
-
void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key);
-void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
+void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority);
void log_receiver_capabilities(struct receiver_state *rpt);
void log_sender_capabilities(struct sender_state *s);
STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender);
int32_t stream_capabilities_to_vn(uint32_t caps);
+void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps);
void receiver_state_free(struct receiver_state *rpt);
bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason);
@@ -781,6 +740,13 @@ void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char
void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name);
void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type);
-void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);//x
+void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);
+void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name);
+
+bool rrdpush_compression_initialize(struct sender_state *s);
+bool rrdpush_decompression_initialize(struct receiver_state *rpt);
+void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order);
+void rrdpush_select_receiver_compression_algorithm(struct receiver_state *rpt);
+void rrdpush_compression_deactivate(struct sender_state *s);
#endif //NETDATA_RRDPUSH_H