summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-08 16:27:04 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-08 16:27:04 +0000
commita836a244a3d2bdd4da1ee2641e3e957850668cea (patch)
treecb87c75b3677fab7144f868435243f864048a1e6 /streaming/rrdpush.h
parentAdding upstream version 1.38.1. (diff)
downloadnetdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz
netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h33
1 files changed, 26 insertions, 7 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 94c1320e..ff895844 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -41,6 +41,8 @@ typedef enum {
STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
STREAM_CAP_REPLICATION = (1 << 12), // replication supported
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
+ STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
+ STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
@@ -53,12 +55,9 @@ typedef enum {
#define STREAM_HAS_COMPRESSION 0
#endif // ENABLE_COMPRESSION
-#define STREAM_OUR_CAPABILITIES ( \
- STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | \
- STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | \
- STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS | STREAM_CAP_REPLICATION | STREAM_CAP_BINARY )
+STREAM_CAPABILITIES stream_our_capabilities();
-#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)))
+#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
// ----------------------------------------------------------------------------
// stream handshake
@@ -187,12 +186,17 @@ struct sender_state {
} replication;
struct {
+ bool pending_data;
size_t buffer_used_percentage; // the current utilization of the sending buffer
usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
} atomic;
};
+#define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
+#define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
+#define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)
+
#define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
#define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
@@ -303,7 +307,22 @@ void sender_commit(struct sender_state *s, BUFFER *wb);
int rrdpush_init();
bool rrdpush_receiver_needs_dbengine();
int configured_as_parent();
-void rrdset_done_push(RRDSET *st);
+
+typedef struct rrdset_stream_buffer {
+ STREAM_CAPABILITIES capabilities;
+ bool v2;
+ bool begin_v2_added;
+ time_t wall_clock_time;
+ uint64_t rrdset_flags; // RRDSET_FLAGS
+ time_t last_point_end_time_s;
+ BUFFER *wb;
+} RRDSET_STREAM_BUFFER;
+
+RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time);
+void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
+void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
+void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags);
+
bool rrdset_push_chart_definition_now(RRDSET *st);
void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
@@ -312,7 +331,7 @@ void rrdpush_claimed_id(RRDHOST *host);
#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
-int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string);
void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait);
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);