summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-10-17 09:30:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-10-17 09:30:20 +0000
commit386ccdd61e8256c8b21ee27ee2fc12438fc5ca98 (patch)
treec9fbcacdb01f029f46133a5ba7ecd610c2bcb041 /streaming/rrdpush.h
parentAdding upstream version 1.42.4. (diff)
downloadnetdata-386ccdd61e8256c8b21ee27ee2fc12438fc5ca98.tar.xz
netdata-386ccdd61e8256c8b21ee27ee2fc12438fc5ca98.zip
Adding upstream version 1.43.0.upstream/1.43.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h22
1 files changed, 21 insertions, 1 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 73bd438c9..c3c14233f 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -6,6 +6,7 @@
#include "libnetdata/libnetdata.h"
#include "daemon/common.h"
#include "web/server/web_client.h"
+#include "database/rrdfunctions.h"
#include "database/rrd.h"
#define CONNECTED_TO_SIZE 100
@@ -44,6 +45,7 @@ typedef enum {
STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
+ STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
@@ -231,6 +233,13 @@ typedef enum __attribute__((packed)) {
SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
+struct function_payload_state {
+ BUFFER *payload;
+ char *txid;
+ char *fn_name;
+ char *timeout;
+};
+
struct sender_state {
RRDHOST *host;
pid_t tid; // the thread id of the sender, from gettid()
@@ -260,6 +269,9 @@ struct sender_state {
int rrdpush_sender_pipe[2]; // collector to sender thread signaling
int rrdpush_sender_socket;
+ int receiving_function_payload;
+ struct function_payload_state function_payload; // state when receiving function with payload
+
uint16_t hops;
#ifdef ENABLE_RRDPUSH_COMPRESSION
@@ -356,7 +368,7 @@ struct buffered_reader {
char read_buffer[PLUGINSD_LINE_MAX + 1];
};
-char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size);
+bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst);
static inline void buffered_reader_init(struct buffered_reader *reader) {
reader->read_buffer[0] = '\0';
reader->read_len = 0;
@@ -481,6 +493,7 @@ void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
void rrdpush_send_claimed_id(RRDHOST *host);
void rrdpush_send_global_functions(RRDHOST *host);
+void rrdpush_send_dyncfg(RRDHOST *host);
#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
@@ -763,4 +776,11 @@ typedef struct rrdhost_status {
void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
bool rrdhost_state_cloud_emulation(RRDHOST *host);
+void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job);
+void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name);
+
+void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name);
+void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type);
+void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);//x
+
#endif //NETDATA_RRDPUSH_H