diff options
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r-- | streaming/rrdpush.h | 33 |
1 files changed, 26 insertions, 7 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 94c1320e7..ff8958440 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -41,6 +41,8 @@ typedef enum { STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported STREAM_CAP_REPLICATION = (1 << 12), // replication supported STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data + STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values + STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set // this must be signed int, so don't use the last bit @@ -53,12 +55,9 @@ typedef enum { #define STREAM_HAS_COMPRESSION 0 #endif // ENABLE_COMPRESSION -#define STREAM_OUR_CAPABILITIES ( \ - STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | \ - STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | \ - STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS | STREAM_CAP_REPLICATION | STREAM_CAP_BINARY ) +STREAM_CAPABILITIES stream_our_capabilities(); -#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability))) +#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability)) // ---------------------------------------------------------------------------- // stream handshake @@ -187,12 +186,17 @@ struct sender_state { } replication; struct { + bool pending_data; size_t buffer_used_percentage; // the current utilization of the sending buffer usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC time_t last_buffer_recreate_s; // true when the sender buffer should be re-created } atomic; }; +#define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED) +#define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED) +#define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED) + #define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED) #define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED) @@ -303,7 +307,22 @@ void sender_commit(struct sender_state *s, BUFFER *wb); int rrdpush_init(); bool rrdpush_receiver_needs_dbengine(); int configured_as_parent(); -void rrdset_done_push(RRDSET *st); + +typedef struct rrdset_stream_buffer { + STREAM_CAPABILITIES capabilities; + bool v2; + bool begin_v2_added; + time_t wall_clock_time; + uint64_t rrdset_flags; // RRDSET_FLAGS + time_t last_point_end_time_s; + BUFFER *wb; +} RRDSET_STREAM_BUFFER; + +RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time); +void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st); +void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st); +void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags); + bool rrdset_push_chart_definition_now(RRDSET *st); void *rrdpush_sender_thread(void *ptr); void rrdpush_send_host_labels(RRDHOST *host); @@ -312,7 +331,7 @@ void rrdpush_claimed_id(RRDHOST *host); #define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended #define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended -int rrdpush_receiver_thread_spawn(struct web_client *w, char *url); +int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string); void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait); void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva); |