diff options
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r-- | streaming/rrdpush.h | 69 |
1 files changed, 62 insertions, 7 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 027ccd102..937ead6fa 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -10,10 +10,16 @@ #define CONNECTED_TO_SIZE 100 -#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4 #define STREAM_VERSION_CLAIM 3 #define STREAM_VERSION_CLABELS 4 -#define VERSION_GAP_FILLING 5 +#define STREAM_VERSION_COMPRESSION 5 +#define VERSION_GAP_FILLING 6 + +#ifdef ENABLE_COMPRESSION +#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_COMPRESSION) +#else +#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_CLABELS) +#endif //ENABLE_COMPRESSION #define STREAMING_PROTOCOL_VERSION "1.1" #define START_STREAMING_PROMPT "Hit me baby, push them over..." @@ -35,6 +41,38 @@ typedef struct { char *kernel_version; } stream_encoded_t; +#ifdef ENABLE_COMPRESSION +struct compressor_state { + char *buffer; + size_t buffer_size; + struct compressor_data *data; // Compression API specific data + void (*reset)(struct compressor_state *state); + size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer); + void (*destroy)(struct compressor_state **state); +}; + +struct decompressor_state { + char *buffer; + size_t buffer_size; + size_t buffer_len; + size_t buffer_pos; + char *out_buffer; + size_t out_buffer_len; + size_t out_buffer_pos; + size_t total_compressed; + size_t total_uncompressed; + size_t packet_count; + struct decompressor_data *data; // Deompression API specific data + void (*reset)(struct decompressor_state *state); + size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size); + size_t (*put)(struct decompressor_state *state, const char *data, size_t size); + size_t (*decompress)(struct decompressor_state *state); + size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state); + size_t (*get)(struct decompressor_state *state, char *data, size_t size); + void (*destroy)(struct decompressor_state **state); +}; +#endif + // Thread-local storage // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. @@ -60,6 +98,10 @@ struct sender_state { char read_buffer[512]; int read_len; int32_t version; +#ifdef ENABLE_COMPRESSION + unsigned int rrdpush_compression; + struct compressor_state *compressor; +#endif }; struct receiver_state { @@ -75,9 +117,9 @@ struct receiver_state { char *abbrev_timezone; int32_t utc_offset; char *tags; - char *client_ip; // Duplicated in pluginsd - char *client_port; // Duplicated in pluginsd - char *program_name; // Duplicated in pluginsd + char *client_ip; // Duplicated in pluginsd + char *client_port; // Duplicated in pluginsd + char *program_name; // Duplicated in pluginsd char *program_version; struct rrdhost_system_info *system_info; int update_every; @@ -85,15 +127,22 @@ struct receiver_state { time_t last_msg_t; char read_buffer[1024]; // Need to allow RRD_ID_LENGTH_MAX * 4 + the other fields int read_len; + unsigned int shutdown:1; // Tell the thread to exit + unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!) #ifdef ENABLE_HTTPS struct netdata_ssl ssl; #endif - unsigned int shutdown:1; // Tell the thread to exit - unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!) +#ifdef ENABLE_COMPRESSION + unsigned int rrdpush_compression; + struct decompressor_state *decompressor; +#endif }; extern unsigned int default_rrdpush_enabled; +#ifdef ENABLE_COMPRESSION +extern unsigned int default_compression_enabled; +#endif extern char *default_rrdpush_destination; extern char *default_rrdpush_api_key; extern char *default_rrdpush_send_charts_matching; @@ -116,4 +165,10 @@ extern void rrdpush_sender_thread_stop(RRDHOST *host); extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv); extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); +#ifdef ENABLE_COMPRESSION +struct compressor_state *create_compressor(); +struct decompressor_state *create_decompressor(); +size_t is_compressed_data(const char *data, size_t data_size); +#endif + #endif //NETDATA_RRDPUSH_H |