diff options
Diffstat (limited to '')
-rw-r--r-- | src/streaming/rrdpush.h (renamed from streaming/rrdpush.h) | 57 |
1 files changed, 33 insertions, 24 deletions
diff --git a/streaming/rrdpush.h b/src/streaming/rrdpush.h index 1459c881e..c73877134 100644 --- a/streaming/rrdpush.h +++ b/src/streaming/rrdpush.h @@ -47,11 +47,13 @@ typedef enum { STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit - STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming + // STREAM_CAP_DYNCFG = (1 << 17), // leave this unused for as long as possible STREAM_CAP_SLOTS = (1 << 18), // the sender can appoint a unique slot for each chart STREAM_CAP_ZSTD = (1 << 19), // ZSTD compression supported STREAM_CAP_GZIP = (1 << 20), // GZIP compression supported STREAM_CAP_BROTLI = (1 << 21), // BROTLI compression supported + STREAM_CAP_PROGRESS = (1 << 22), // Functions PROGRESS support + STREAM_CAP_DYNCFG = (1 << 23), // support for DYNCFG STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set // this must be signed int, so don't use the last bit @@ -197,13 +199,6 @@ typedef enum __attribute__((packed)) { SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown } SENDER_FLAGS; -struct function_payload_state { - BUFFER *payload; - char *txid; - char *fn_name; - char *timeout; -}; - struct sender_state { RRDHOST *host; pid_t tid; // the thread id of the sender, from gettid() @@ -234,9 +229,6 @@ struct sender_state { int rrdpush_sender_pipe[2]; // collector to sender thread signaling int rrdpush_sender_socket; - int receiving_function_payload; - struct function_payload_state function_payload; // state when receiving function with payload - uint16_t hops; struct line_splitter line; @@ -275,6 +267,16 @@ struct sender_state { time_t last_buffer_recreate_s; // true when the sender buffer should be re-created } atomic; + struct { + bool intercept_input; + const char *transaction; + const char *timeout_s; + const char *function; + const char *access; + const char *source; + BUFFER *payload; + } functions; + int parent_using_h2o; }; @@ -345,7 +347,6 @@ struct receiver_state { char *timezone; // Unused? char *abbrev_timezone; int32_t utc_offset; - char *tags; char *client_ip; // Duplicated in pluginsd char *client_port; // Duplicated in pluginsd char *program_name; // Duplicated in pluginsd @@ -456,10 +457,6 @@ void *rrdpush_sender_thread(void *ptr); void rrdpush_send_host_labels(RRDHOST *host); void rrdpush_send_claimed_id(RRDHOST *host); void rrdpush_send_global_functions(RRDHOST *host); -void rrdpush_send_dyncfg(RRDHOST *host); - -#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended -#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx); void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait); @@ -661,11 +658,31 @@ static inline const char *rrdhost_health_status_to_string(RRDHOST_HEALTH_STATUS } } +typedef enum __attribute__((packed)) { + RRDHOST_DYNCFG_STATUS_UNAVAILABLE = 0, + RRDHOST_DYNCFG_STATUS_AVAILABLE, +} RRDHOST_DYNCFG_STATUS; + +static inline const char *rrdhost_dyncfg_status_to_string(RRDHOST_DYNCFG_STATUS status) { + switch(status) { + default: + case RRDHOST_DYNCFG_STATUS_UNAVAILABLE: + return "unavailable"; + + case RRDHOST_DYNCFG_STATUS_AVAILABLE: + return "online"; + } +} + typedef struct rrdhost_status { RRDHOST *host; time_t now; struct { + RRDHOST_DYNCFG_STATUS status; + } dyncfg; + + struct { RRDHOST_DB_STATUS status; RRDHOST_DB_LIVENESS liveness; RRD_MEMORY_MODE mode; @@ -735,14 +752,6 @@ typedef struct rrdhost_status { void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s); bool rrdhost_state_cloud_emulation(RRDHOST *host); -void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job); -void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name); - -void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name); -void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type); -void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags); -void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name); - bool rrdpush_compression_initialize(struct sender_state *s); bool rrdpush_decompression_initialize(struct receiver_state *rpt); void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order); |