summaryrefslogtreecommitdiffstats
path: root/src/streaming/rrdpush.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/streaming/rrdpush.h (renamed from streaming/rrdpush.h)61
1 files changed, 35 insertions, 26 deletions
diff --git a/streaming/rrdpush.h b/src/streaming/rrdpush.h
index 1459c881e..d55a07675 100644
--- a/streaming/rrdpush.h
+++ b/src/streaming/rrdpush.h
@@ -47,11 +47,13 @@ typedef enum {
STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
- STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming
+ // STREAM_CAP_DYNCFG = (1 << 17), // leave this unused for as long as possible
STREAM_CAP_SLOTS = (1 << 18), // the sender can appoint a unique slot for each chart
STREAM_CAP_ZSTD = (1 << 19), // ZSTD compression supported
STREAM_CAP_GZIP = (1 << 20), // GZIP compression supported
STREAM_CAP_BROTLI = (1 << 21), // BROTLI compression supported
+ STREAM_CAP_PROGRESS = (1 << 22), // Functions PROGRESS support
+ STREAM_CAP_DYNCFG = (1 << 23), // support for DYNCFG
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
@@ -197,16 +199,9 @@ typedef enum __attribute__((packed)) {
SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
} SENDER_FLAGS;
-struct function_payload_state {
- BUFFER *payload;
- char *txid;
- char *fn_name;
- char *timeout;
-};
-
struct sender_state {
RRDHOST *host;
- pid_t tid; // the thread id of the sender, from gettid()
+ pid_t tid; // the thread id of the sender, from gettid_cached()
SENDER_FLAGS flags;
int timeout;
int default_port;
@@ -234,9 +229,6 @@ struct sender_state {
int rrdpush_sender_pipe[2]; // collector to sender thread signaling
int rrdpush_sender_socket;
- int receiving_function_payload;
- struct function_payload_state function_payload; // state when receiving function with payload
-
uint16_t hops;
struct line_splitter line;
@@ -275,6 +267,16 @@ struct sender_state {
time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
} atomic;
+ struct {
+ bool intercept_input;
+ const char *transaction;
+ const char *timeout_s;
+ const char *function;
+ const char *access;
+ const char *source;
+ BUFFER *payload;
+ } functions;
+
int parent_using_h2o;
};
@@ -335,7 +337,7 @@ typedef struct stream_node_instance {
struct receiver_state {
RRDHOST *host;
pid_t tid;
- netdata_thread_t thread;
+ ND_THREAD *thread;
int fd;
char *key;
char *hostname;
@@ -345,7 +347,6 @@ struct receiver_state {
char *timezone; // Unused?
char *abbrev_timezone;
int32_t utc_offset;
- char *tags;
char *client_ip; // Duplicated in pluginsd
char *client_port; // Duplicated in pluginsd
char *program_name; // Duplicated in pluginsd
@@ -456,10 +457,6 @@ void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
void rrdpush_send_claimed_id(RRDHOST *host);
void rrdpush_send_global_functions(RRDHOST *host);
-void rrdpush_send_dyncfg(RRDHOST *host);
-
-#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
-#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx);
void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait);
@@ -661,11 +658,31 @@ static inline const char *rrdhost_health_status_to_string(RRDHOST_HEALTH_STATUS
}
}
+typedef enum __attribute__((packed)) {
+ RRDHOST_DYNCFG_STATUS_UNAVAILABLE = 0,
+ RRDHOST_DYNCFG_STATUS_AVAILABLE,
+} RRDHOST_DYNCFG_STATUS;
+
+static inline const char *rrdhost_dyncfg_status_to_string(RRDHOST_DYNCFG_STATUS status) {
+ switch(status) {
+ default:
+ case RRDHOST_DYNCFG_STATUS_UNAVAILABLE:
+ return "unavailable";
+
+ case RRDHOST_DYNCFG_STATUS_AVAILABLE:
+ return "online";
+ }
+}
+
typedef struct rrdhost_status {
RRDHOST *host;
time_t now;
struct {
+ RRDHOST_DYNCFG_STATUS status;
+ } dyncfg;
+
+ struct {
RRDHOST_DB_STATUS status;
RRDHOST_DB_LIVENESS liveness;
RRD_MEMORY_MODE mode;
@@ -735,14 +752,6 @@ typedef struct rrdhost_status {
void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
bool rrdhost_state_cloud_emulation(RRDHOST *host);
-void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job);
-void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name);
-
-void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name);
-void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type);
-void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);
-void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name);
-
bool rrdpush_compression_initialize(struct sender_state *s);
bool rrdpush_decompression_initialize(struct receiver_state *rpt);
void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order);