summaryrefslogtreecommitdiffstats
path: root/src/streaming/sender.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-11-25 14:45:37 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-11-25 14:48:03 +0000
commite55403ed71282d7bfd8b56df219de3c28a8af064 (patch)
tree524889e5becb81643bf8741e3082955dca076f09 /src/streaming/sender.h
parentReleasing debian version 1.47.5-1. (diff)
downloadnetdata-e55403ed71282d7bfd8b56df219de3c28a8af064.tar.xz
netdata-e55403ed71282d7bfd8b56df219de3c28a8af064.zip
Merging upstream version 2.0.3+dfsg:
- does not include dygraphs anymore (Closes: #923993) - does not include pako anymore (Closes: #1042533) - does not include dashboard binaries anymore (Closes: #1045145) Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/streaming/sender.h')
-rw-r--r--src/streaming/sender.h169
1 files changed, 169 insertions, 0 deletions
diff --git a/src/streaming/sender.h b/src/streaming/sender.h
new file mode 100644
index 000000000..94d104f5f
--- /dev/null
+++ b/src/streaming/sender.h
@@ -0,0 +1,169 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_SENDER_H
+#define NETDATA_SENDER_H
+
+#include "libnetdata/libnetdata.h"
+
+#define CONNECTED_TO_SIZE 100
+
+#define CBUFFER_INITIAL_SIZE (16 * 1024)
+#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2)
+
+typedef enum __attribute__((packed)) {
+ STREAM_TRAFFIC_TYPE_REPLICATION = 0,
+ STREAM_TRAFFIC_TYPE_FUNCTIONS,
+ STREAM_TRAFFIC_TYPE_METADATA,
+ STREAM_TRAFFIC_TYPE_DATA,
+ STREAM_TRAFFIC_TYPE_DYNCFG,
+
+ // terminator
+ STREAM_TRAFFIC_TYPE_MAX,
+} STREAM_TRAFFIC_TYPE;
+
+typedef enum __attribute__((packed)) {
+ SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
+} SENDER_FLAGS;
+
+typedef struct {
+ char *os_name;
+ char *os_id;
+ char *os_version;
+ char *kernel_name;
+ char *kernel_version;
+} stream_encoded_t;
+
+#include "stream-handshake.h"
+#include "stream-capabilities.h"
+#include "stream-conf.h"
+#include "stream-compression/compression.h"
+
+#include "sender-destinations.h"
+
+typedef void (*rrdpush_defer_action_t)(struct sender_state *s, void *data);
+typedef void (*rrdpush_defer_cleanup_t)(struct sender_state *s, void *data);
+
+struct sender_state {
+ RRDHOST *host;
+ pid_t tid; // the thread id of the sender, from gettid_cached()
+ SENDER_FLAGS flags;
+ int timeout;
+ int default_port;
+ uint32_t reconnect_delay;
+ char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
+ size_t begin;
+ size_t reconnects_counter;
+ size_t sent_bytes;
+ size_t sent_bytes_on_this_connection;
+ size_t send_attempts;
+ time_t last_traffic_seen_t;
+ time_t last_state_since_t; // the timestamp of the last state (online/offline) change
+ size_t not_connected_loops;
+ // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
+ // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
+ SPINLOCK spinlock;
+ struct circular_buffer *buffer;
+ char read_buffer[PLUGINSD_LINE_MAX + 1];
+ ssize_t read_len;
+ STREAM_CAPABILITIES capabilities;
+ STREAM_CAPABILITIES disabled_capabilities;
+
+ size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
+
+ int rrdpush_sender_pipe[2]; // collector to sender thread signaling
+ int rrdpush_sender_socket;
+
+ uint16_t hops;
+
+ struct line_splitter line;
+ struct compressor_state compressor;
+
+#ifdef NETDATA_LOG_STREAM_SENDER
+ FILE *stream_log_fp;
+#endif
+
+ NETDATA_SSL ssl; // structure used to encrypt the connection
+
+ struct {
+ bool shutdown;
+ STREAM_HANDSHAKE reason;
+ } exit;
+
+ struct {
+ DICTIONARY *requests; // de-duplication of replication requests, per chart
+ time_t oldest_request_after_t; // the timestamp of the oldest replication request
+ time_t latest_completed_before_t; // the timestamp of the latest replication request
+
+ struct {
+ size_t pending_requests; // the currently outstanding replication requests
+ size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
+ bool reached_max; // true when the sender buffer should not get more replication responses
+ } atomic;
+
+ } replication;
+
+ struct {
+ bool pending_data;
+ size_t buffer_used_percentage; // the current utilization of the sending buffer
+ usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
+ time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
+ } atomic;
+
+ struct {
+ const char *end_keyword;
+ BUFFER *payload;
+ rrdpush_defer_action_t action;
+ rrdpush_defer_cleanup_t cleanup;
+ void *action_data;
+ } defer;
+
+ bool parent_using_h2o;
+};
+
+#define sender_lock(sender) spinlock_lock(&(sender)->spinlock)
+#define sender_unlock(sender) spinlock_unlock(&(sender)->spinlock)
+
+#define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
+#define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
+#define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)
+
+#define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
+#define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
+
+#define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
+#define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
+
+#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED)
+#define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)
+
+#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED)
+#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)
+
+#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED)
+
+#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
+
+BUFFER *sender_start(struct sender_state *s);
+void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type);
+
+void *rrdpush_sender_thread(void *ptr);
+void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait);
+
+void sender_thread_buffer_free(void);
+
+void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
+
+bool rrdpush_sender_connect(struct sender_state *s);
+void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t now_s, bool have_mutex, bool force);
+bool rrdhost_sender_should_exit(struct sender_state *s);
+void rrdpush_sender_thread_spawn(RRDHOST *host);
+
+#include "replication.h"
+
+#endif //NETDATA_SENDER_H