summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/rrdpush.c (renamed from src/rrdpush.c)157
1 files changed, 138 insertions, 19 deletions
diff --git a/src/rrdpush.c b/streaming/rrdpush.c
index 8f71c6d4c..df1a2177f 100644
--- a/src/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -1,4 +1,6 @@
-#include "common.h"
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdpush.h"
/*
* rrdpush
@@ -30,14 +32,46 @@ typedef enum {
RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
-int default_rrdpush_enabled = 0;
+static struct config stream_config = {
+ .sections = NULL,
+ .mutex = NETDATA_MUTEX_INITIALIZER,
+ .index = {
+ .avl_tree = {
+ .root = NULL,
+ .compar = appconfig_section_compare
+ },
+ .rwlock = AVL_LOCK_INITIALIZER
+ }
+};
+
+unsigned int default_rrdpush_enabled = 0;
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
+char *default_rrdpush_send_charts_matching = NULL;
+
+static void load_stream_conf() {
+ errno = 0;
+ char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
+ if(!appconfig_load(&stream_config, filename, 0)) {
+ info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
+ freez(filename);
+
+ filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
+ if(!appconfig_load(&stream_config, filename, 0))
+ info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
+ }
+ freez(filename);
+}
int rrdpush_init() {
- default_rrdpush_enabled = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
+ // --------------------------------------------------------------------
+ // load stream.conf
+ load_stream_conf();
+
+ default_rrdpush_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
+ default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
@@ -67,17 +101,44 @@ unsigned int remote_clock_resync_iterations = 60;
#define rrdpush_buffer_lock(host) netdata_mutex_lock(&((host)->rrdpush_sender_buffer_mutex))
#define rrdpush_buffer_unlock(host) netdata_mutex_unlock(&((host)->rrdpush_sender_buffer_mutex))
+static inline int should_send_chart_matching(RRDSET *st) {
+ if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ }
+ else if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) {
+ RRDHOST *host = st->rrdhost;
+
+ if(simple_pattern_matches(host->rrdpush_send_charts_matching, st->id) ||
+ simple_pattern_matches(host->rrdpush_send_charts_matching, st->name)) {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+ }
+ else {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ }
+ }
+
+ return(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND));
+}
+
// checks if the current chart definition has been sent
static inline int need_to_send_chart_definition(RRDSET *st) {
rrdset_check_rdlock(st);
- if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_EXPOSED_UPSTREAM))))
+ if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED))))
return 1;
RRDDIM *rd;
- rrddim_foreach_read(rd, st)
- if(unlikely(!rd->exposed))
+ rrddim_foreach_read(rd, st) {
+ if(unlikely(!rd->exposed)) {
+ #ifdef NETDATA_INTERNAL_CHECKS
+ info("host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", st->rrdhost->hostname, st->id, rd->id);
+ #endif
return 1;
+ }
+ }
return 0;
}
@@ -86,14 +147,27 @@ static inline int need_to_send_chart_definition(RRDSET *st) {
static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
RRDHOST *host = st->rrdhost;
- rrdset_flag_set(st, RRDSET_FLAG_EXPOSED_UPSTREAM);
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+
+ // properly set the name for the remote end to parse it
+ char *name = "";
+ if(unlikely(strcmp(st->id, st->name))) {
+ // they differ
+ name = strchr(st->name, '.');
+ if(name)
+ name++;
+ else
+ name = "";
+ }
+
+ // info("CHART '%s' '%s'", st->id, name);
// send the chart
buffer_sprintf(
host->rrdpush_sender_buffer
, "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
, st->id
- , st->name
+ , name
, st->title
, st->units
, st->family
@@ -129,7 +203,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
// send the chart local custom variables
RRDSETVAR *rs;
for(rs = st->variables; rs ;rs = rs->next) {
- if(unlikely(rs->options && RRDVAR_OPTION_ALLOCATED)) {
+ if(unlikely(rs->type == RRDVAR_TYPE_CALCULATED && rs->options & RRDVAR_OPTION_CUSTOM_CHART_VAR)) {
calculated_number *value = (calculated_number *) rs->value;
buffer_sprintf(
@@ -147,7 +221,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
// sends the current chart dimensions
static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st) {
RRDHOST *host = st->rrdhost;
- buffer_sprintf(host->rrdpush_sender_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->upstream_resync_time > st->last_collected_time.tv_sec)?st->usec_since_last_update:0);
+ buffer_sprintf(host->rrdpush_sender_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
@@ -164,9 +238,12 @@ static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st) {
static void rrdpush_sender_thread_spawn(RRDHOST *host);
-void rrdset_push_chart_definition(RRDSET *st) {
+void rrdset_push_chart_definition_now(RRDSET *st) {
RRDHOST *host = st->rrdhost;
+ if(unlikely(!host->rrdpush_send_enabled || !should_send_chart_matching(st)))
+ return;
+
rrdset_rdlock(st);
rrdpush_buffer_lock(host);
rrdpush_send_chart_definition_nolock(st);
@@ -175,11 +252,11 @@ void rrdset_push_chart_definition(RRDSET *st) {
}
void rrdset_done_push(RRDSET *st) {
- RRDHOST *host = st->rrdhost;
-
- if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
+ if(unlikely(!should_send_chart_matching(st)))
return;
+ RRDHOST *host = st->rrdhost;
+
rrdpush_buffer_lock(host);
if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn))
@@ -239,7 +316,7 @@ static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr
RRDVAR *rv = (RRDVAR *)rrdvar_ptr;
RRDHOST *host = (RRDHOST *)host_ptr;
- if(unlikely(rv->type == RRDVAR_TYPE_CALCULATED_ALLOCATED)) {
+ if(unlikely(rv->options & RRDVAR_OPTION_CUSTOM_HOST_VAR && rv->type == RRDVAR_TYPE_CALCULATED)) {
rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
// return 1, so that the traversal will return the number of variables sent
@@ -252,6 +329,8 @@ static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr
static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
int ret = rrdvar_callback_for_all_host_variables(host, rrdpush_sender_thread_custom_host_variables_callback, host);
+ (void)ret;
+
debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
}
@@ -262,7 +341,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
RRDSET *st;
rrdset_foreach_read(st, host) {
- rrdset_flag_clear(st, RRDSET_FLAG_EXPOSED_UPSTREAM);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
st->upstream_resync_time = 0;
@@ -475,6 +554,7 @@ void *rrdpush_sender_thread(void *ptr) {
size_t reconnects_counter = 0;
size_t sent_bytes = 0;
size_t sent_bytes_on_this_connection = 0;
+ size_t send_attempts = 0;
time_t last_sent_t = 0;
@@ -494,6 +574,8 @@ void *rrdpush_sender_thread(void *ptr) {
// if we don't have socket open, lets wait a bit
if(unlikely(host->rrdpush_sender_socket == -1)) {
+ send_attempts = 0;
+
if(not_connected_loops == 0 && sent_bytes_on_this_connection > 0) {
// fast re-connection on first disconnect
sleep_usec(USEC_PER_MS * 500); // milliseconds
@@ -509,6 +591,9 @@ void *rrdpush_sender_thread(void *ptr) {
// reset the buffer, to properly send charts and metrics
rrdpush_sender_thread_data_flush(host);
+ // send from the beginning
+ begin = 0;
+
// make sure the next reconnection will be immediate
not_connected_loops = 0;
@@ -530,7 +615,7 @@ void *rrdpush_sender_thread(void *ptr) {
continue;
}
else if(unlikely(now_monotonic_sec() - last_sent_t > timeout)) {
- error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, timeout, sent_bytes_on_this_connection);
+ error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", host->hostname, connected_to, timeout, sent_bytes_on_this_connection, send_attempts);
rrdpush_sender_thread_close_socket(host);
}
@@ -544,6 +629,7 @@ void *rrdpush_sender_thread(void *ptr) {
debug(D_STREAM, "STREAM: Requesting data output on streaming socket %d...", ofd->fd);
ofd->events = POLLOUT;
fdmax = 2;
+ send_attempts++;
}
else {
debug(D_STREAM, "STREAM: Not requesting data output on streaming socket %d (nothing to send now)...", ofd->fd);
@@ -743,6 +829,7 @@ static int rrdpush_receive(int fd
int rrdpush_enabled = default_rrdpush_enabled;
char *rrdpush_destination = default_rrdpush_destination;
char *rrdpush_api_key = default_rrdpush_api_key;
+ char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
time_t alarms_delay = 60;
RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY rrdpush_multiple_connections_strategy = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
@@ -774,6 +861,9 @@ static int rrdpush_receive(int fd
rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, key, "multiple connections", rrdpush_multiple_connections_strategy);
rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, machine_guid, "multiple connections", rrdpush_multiple_connections_strategy);
+ rrdpush_send_charts_matching = appconfig_get(&stream_config, key, "default proxy send charts matching", rrdpush_send_charts_matching);
+ rrdpush_send_charts_matching = appconfig_get(&stream_config, machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
+
tags = appconfig_set_default(&stream_config, machine_guid, "host tags", (tags)?tags:"");
if(tags && !*tags) tags = NULL;
@@ -792,10 +882,11 @@ static int rrdpush_receive(int fd
, update_every
, history
, mode
- , (health_enabled != CONFIG_BOOLEAN_NO)
- , (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
+ , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
+ , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
, rrdpush_destination
, rrdpush_api_key
+ , rrdpush_send_charts_matching
);
if(!host) {
@@ -1005,6 +1096,14 @@ int rrdpush_receiver_permission_denied(struct web_client *w) {
return 401;
}
+int rrdpush_receiver_too_busy_now(struct web_client *w) {
+ // we always respond with the same message and error code
+ // to prevent an attacker from gaining info about the error
+ buffer_flush(w->response.data);
+ buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later.");
+ return 503;
+}
+
int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) {
(void)host;
@@ -1110,6 +1209,26 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
}
}
+ if(unlikely(web_client_streaming_rate_t > 0)) {
+ static netdata_mutex_t stream_rate_mutex = NETDATA_MUTEX_INITIALIZER;
+ static volatile time_t last_stream_accepted_t = 0;
+
+ netdata_mutex_lock(&stream_rate_mutex);
+ time_t now = now_realtime_sec();
+
+ if(unlikely(last_stream_accepted_t == 0))
+ last_stream_accepted_t = now;
+
+ if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
+ netdata_mutex_unlock(&stream_rate_mutex);
+ error("STREAM [receive from [%s]:%s]: too busy to accept new streaming request. Will be allowed in %ld secs.", w->client_ip, w->client_port, (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
+ return rrdpush_receiver_too_busy_now(w);
+ }
+
+ last_stream_accepted_t = now;
+ netdata_mutex_unlock(&stream_rate_mutex);
+ }
+
struct rrdpush_thread *rpt = callocz(1, sizeof(struct rrdpush_thread));
rpt->fd = w->ifd;
rpt->key = strdupz(key);