From 517a443636daa1e8085cb4e5325524a54e8a8fd7 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Tue, 17 Oct 2023 11:30:23 +0200 Subject: Merging upstream version 1.43.0. Signed-off-by: Daniel Baumann --- streaming/rrdpush.h | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) (limited to 'streaming/rrdpush.h') diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 73bd438c9..c3c14233f 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -6,6 +6,7 @@ #include "libnetdata/libnetdata.h" #include "daemon/common.h" #include "web/server/web_client.h" +#include "database/rrdfunctions.h" #include "database/rrd.h" #define CONNECTED_TO_SIZE 100 @@ -44,6 +45,7 @@ typedef enum { STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit + STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set // this must be signed int, so don't use the last bit @@ -231,6 +233,13 @@ typedef enum __attribute__((packed)) { SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression } SENDER_FLAGS; +struct function_payload_state { + BUFFER *payload; + char *txid; + char *fn_name; + char *timeout; +}; + struct sender_state { RRDHOST *host; pid_t tid; // the thread id of the sender, from gettid() @@ -260,6 +269,9 @@ struct sender_state { int rrdpush_sender_pipe[2]; // collector to sender thread signaling int rrdpush_sender_socket; + int receiving_function_payload; + struct function_payload_state function_payload; // state when receiving function with payload + uint16_t hops; #ifdef ENABLE_RRDPUSH_COMPRESSION @@ -356,7 +368,7 @@ struct buffered_reader { char read_buffer[PLUGINSD_LINE_MAX + 1]; }; -char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size); +bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst); static inline void buffered_reader_init(struct buffered_reader *reader) { reader->read_buffer[0] = '\0'; reader->read_len = 0; @@ -481,6 +493,7 @@ void *rrdpush_sender_thread(void *ptr); void rrdpush_send_host_labels(RRDHOST *host); void rrdpush_send_claimed_id(RRDHOST *host); void rrdpush_send_global_functions(RRDHOST *host); +void rrdpush_send_dyncfg(RRDHOST *host); #define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended #define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended @@ -763,4 +776,11 @@ typedef struct rrdhost_status { void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s); bool rrdhost_state_cloud_emulation(RRDHOST *host); +void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job); +void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name); + +void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name); +void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type); +void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);//x + #endif //NETDATA_RRDPUSH_H -- cgit v1.2.3