summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h66
1 files changed, 55 insertions, 11 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index a0c7e8de2..94c1320e7 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -3,12 +3,14 @@
#ifndef NETDATA_RRDPUSH_H
#define NETDATA_RRDPUSH_H 1
-#include "database/rrd.h"
#include "libnetdata/libnetdata.h"
-#include "web/server/web_client.h"
#include "daemon/common.h"
+#include "web/server/web_client.h"
+#include "database/rrd.h"
#define CONNECTED_TO_SIZE 100
+#define CBUFFER_INITIAL_SIZE (16 * 1024)
+#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2)
// ----------------------------------------------------------------------------
// obsolete versions - do not use anymore
@@ -22,6 +24,9 @@
typedef enum {
// do not use the first 3 bits
+ // they used to be versions 1, 2 and 3
+ // before we introduce capabilities
+
STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol
STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels)
STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol)
@@ -37,6 +42,7 @@ typedef enum {
STREAM_CAP_REPLICATION = (1 << 12), // replication supported
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
+ STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
// needed for negotiating errors between parent and child
} STREAM_CAPABILITIES;
@@ -125,8 +131,8 @@ struct decompressor_state {
// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
typedef enum {
- SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
- SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
+ SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
+ SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
struct sender_state {
@@ -155,6 +161,8 @@ struct sender_state {
int rrdpush_sender_pipe[2]; // collector to sender thread signaling
int rrdpush_sender_socket;
+ uint16_t hops;
+
#ifdef ENABLE_COMPRESSION
struct compressor_state *compressor;
#endif
@@ -163,6 +171,11 @@ struct sender_state {
#endif
struct {
+ bool shutdown;
+ const char *reason;
+ } exit;
+
+ struct {
DICTIONARY *requests; // de-duplication of replication requests, per chart
struct {
@@ -176,9 +189,13 @@ struct sender_state {
struct {
size_t buffer_used_percentage; // the current utilization of the sending buffer
usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
+ time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
} atomic;
};
+#define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
+#define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
+
#define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
#define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
@@ -216,13 +233,34 @@ struct receiver_state {
char *program_name; // Duplicated in pluginsd
char *program_version;
struct rrdhost_system_info *system_info;
- int update_every;
STREAM_CAPABILITIES capabilities;
time_t last_msg_t;
char read_buffer[PLUGINSD_LINE_MAX + 1];
int read_len;
- unsigned int shutdown:1; // Tell the thread to exit
- unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
+
+ uint16_t hops;
+
+ struct {
+ bool shutdown; // signal the streaming parser to exit
+ const char *reason; // the reason of disconnection to log
+ } exit;
+
+ struct {
+ RRD_MEMORY_MODE mode;
+ int history;
+ int update_every;
+ int health_enabled; // CONFIG_BOOLEAN_YES, CONFIG_BOOLEAN_NO, CONFIG_BOOLEAN_AUTO
+ time_t alarms_delay;
+ int rrdpush_enabled;
+ char *rrdpush_api_key; // DONT FREE - it is allocated in appconfig
+ char *rrdpush_send_charts_matching; // DONT FREE - it is allocated in appconfig
+ bool rrdpush_enable_replication;
+ time_t rrdpush_seconds_to_replicate;
+ time_t rrdpush_replication_step;
+ char *rrdpush_destination; // DONT FREE - it is allocated in appconfig
+ unsigned int rrdpush_compression;
+ } config;
+
#ifdef ENABLE_HTTPS
struct netdata_ssl ssl;
#endif
@@ -260,11 +298,8 @@ extern unsigned int remote_clock_resync_iterations;
void rrdpush_destinations_init(RRDHOST *host);
void rrdpush_destinations_free(RRDHOST *host);
-void sender_init(RRDHOST *host);
-
BUFFER *sender_start(struct sender_state *s);
void sender_commit(struct sender_state *s, BUFFER *wb);
-void sender_cancel(struct sender_state *s);
int rrdpush_init();
bool rrdpush_receiver_needs_dbengine();
int configured_as_parent();
@@ -274,8 +309,11 @@ void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
void rrdpush_claimed_id(RRDHOST *host);
+#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
+#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
+
int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
-void rrdpush_sender_thread_stop(RRDHOST *host);
+void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait);
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
@@ -295,11 +333,17 @@ struct compressor_state *create_compressor();
struct decompressor_state *create_decompressor();
#endif
+void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
void log_receiver_capabilities(struct receiver_state *rpt);
void log_sender_capabilities(struct sender_state *s);
STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
int32_t stream_capabilities_to_vn(uint32_t caps);
+void receiver_state_free(struct receiver_state *rpt);
+bool stop_streaming_receiver(RRDHOST *host, const char *reason);
+
+void sender_thread_buffer_free(void);
+
#include "replication.h"
#endif //NETDATA_RRDPUSH_H