diff options
Diffstat (limited to '')
-rw-r--r-- | streaming/rrdpush.c (renamed from src/rrdpush.c) | 157 |
1 files changed, 138 insertions, 19 deletions
diff --git a/src/rrdpush.c b/streaming/rrdpush.c index 8f71c6d4c..df1a2177f 100644 --- a/src/rrdpush.c +++ b/streaming/rrdpush.c @@ -1,4 +1,6 @@ -#include "common.h" +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "rrdpush.h" /* * rrdpush @@ -30,14 +32,46 @@ typedef enum { RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW } RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY; -int default_rrdpush_enabled = 0; +static struct config stream_config = { + .sections = NULL, + .mutex = NETDATA_MUTEX_INITIALIZER, + .index = { + .avl_tree = { + .root = NULL, + .compar = appconfig_section_compare + }, + .rwlock = AVL_LOCK_INITIALIZER + } +}; + +unsigned int default_rrdpush_enabled = 0; char *default_rrdpush_destination = NULL; char *default_rrdpush_api_key = NULL; +char *default_rrdpush_send_charts_matching = NULL; + +static void load_stream_conf() { + errno = 0; + char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf"); + if(!appconfig_load(&stream_config, filename, 0)) { + info("CONFIG: cannot load user config '%s'. Will try stock config.", filename); + freez(filename); + + filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf"); + if(!appconfig_load(&stream_config, filename, 0)) + info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename); + } + freez(filename); +} int rrdpush_init() { - default_rrdpush_enabled = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled); + // -------------------------------------------------------------------- + // load stream.conf + load_stream_conf(); + + default_rrdpush_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled); default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", ""); default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", ""); + default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*"); rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time); if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) { @@ -67,17 +101,44 @@ unsigned int remote_clock_resync_iterations = 60; #define rrdpush_buffer_lock(host) netdata_mutex_lock(&((host)->rrdpush_sender_buffer_mutex)) #define rrdpush_buffer_unlock(host) netdata_mutex_unlock(&((host)->rrdpush_sender_buffer_mutex)) +static inline int should_send_chart_matching(RRDSET *st) { + if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) { + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND); + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); + } + else if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) { + RRDHOST *host = st->rrdhost; + + if(simple_pattern_matches(host->rrdpush_send_charts_matching, st->id) || + simple_pattern_matches(host->rrdpush_send_charts_matching, st->name)) { + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE); + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); + } + else { + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND); + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); + } + } + + return(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND)); +} + // checks if the current chart definition has been sent static inline int need_to_send_chart_definition(RRDSET *st) { rrdset_check_rdlock(st); - if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_EXPOSED_UPSTREAM)))) + if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED)))) return 1; RRDDIM *rd; - rrddim_foreach_read(rd, st) - if(unlikely(!rd->exposed)) + rrddim_foreach_read(rd, st) { + if(unlikely(!rd->exposed)) { + #ifdef NETDATA_INTERNAL_CHECKS + info("host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", st->rrdhost->hostname, st->id, rd->id); + #endif return 1; + } + } return 0; } @@ -86,14 +147,27 @@ static inline int need_to_send_chart_definition(RRDSET *st) { static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { RRDHOST *host = st->rrdhost; - rrdset_flag_set(st, RRDSET_FLAG_EXPOSED_UPSTREAM); + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + + // properly set the name for the remote end to parse it + char *name = ""; + if(unlikely(strcmp(st->id, st->name))) { + // they differ + name = strchr(st->name, '.'); + if(name) + name++; + else + name = ""; + } + + // info("CHART '%s' '%s'", st->id, name); // send the chart buffer_sprintf( host->rrdpush_sender_buffer , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n" , st->id - , st->name + , name , st->title , st->units , st->family @@ -129,7 +203,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { // send the chart local custom variables RRDSETVAR *rs; for(rs = st->variables; rs ;rs = rs->next) { - if(unlikely(rs->options && RRDVAR_OPTION_ALLOCATED)) { + if(unlikely(rs->type == RRDVAR_TYPE_CALCULATED && rs->options & RRDVAR_OPTION_CUSTOM_CHART_VAR)) { calculated_number *value = (calculated_number *) rs->value; buffer_sprintf( @@ -147,7 +221,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { // sends the current chart dimensions static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st) { RRDHOST *host = st->rrdhost; - buffer_sprintf(host->rrdpush_sender_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->upstream_resync_time > st->last_collected_time.tv_sec)?st->usec_since_last_update:0); + buffer_sprintf(host->rrdpush_sender_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0); RRDDIM *rd; rrddim_foreach_read(rd, st) { @@ -164,9 +238,12 @@ static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st) { static void rrdpush_sender_thread_spawn(RRDHOST *host); -void rrdset_push_chart_definition(RRDSET *st) { +void rrdset_push_chart_definition_now(RRDSET *st) { RRDHOST *host = st->rrdhost; + if(unlikely(!host->rrdpush_send_enabled || !should_send_chart_matching(st))) + return; + rrdset_rdlock(st); rrdpush_buffer_lock(host); rrdpush_send_chart_definition_nolock(st); @@ -175,11 +252,11 @@ void rrdset_push_chart_definition(RRDSET *st) { } void rrdset_done_push(RRDSET *st) { - RRDHOST *host = st->rrdhost; - - if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) + if(unlikely(!should_send_chart_matching(st))) return; + RRDHOST *host = st->rrdhost; + rrdpush_buffer_lock(host); if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn)) @@ -239,7 +316,7 @@ static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr RRDVAR *rv = (RRDVAR *)rrdvar_ptr; RRDHOST *host = (RRDHOST *)host_ptr; - if(unlikely(rv->type == RRDVAR_TYPE_CALCULATED_ALLOCATED)) { + if(unlikely(rv->options & RRDVAR_OPTION_CUSTOM_HOST_VAR && rv->type == RRDVAR_TYPE_CALCULATED)) { rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv); // return 1, so that the traversal will return the number of variables sent @@ -252,6 +329,8 @@ static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { int ret = rrdvar_callback_for_all_host_variables(host, rrdpush_sender_thread_custom_host_variables_callback, host); + (void)ret; + debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); } @@ -262,7 +341,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { RRDSET *st; rrdset_foreach_read(st, host) { - rrdset_flag_clear(st, RRDSET_FLAG_EXPOSED_UPSTREAM); + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); st->upstream_resync_time = 0; @@ -475,6 +554,7 @@ void *rrdpush_sender_thread(void *ptr) { size_t reconnects_counter = 0; size_t sent_bytes = 0; size_t sent_bytes_on_this_connection = 0; + size_t send_attempts = 0; time_t last_sent_t = 0; @@ -494,6 +574,8 @@ void *rrdpush_sender_thread(void *ptr) { // if we don't have socket open, lets wait a bit if(unlikely(host->rrdpush_sender_socket == -1)) { + send_attempts = 0; + if(not_connected_loops == 0 && sent_bytes_on_this_connection > 0) { // fast re-connection on first disconnect sleep_usec(USEC_PER_MS * 500); // milliseconds @@ -509,6 +591,9 @@ void *rrdpush_sender_thread(void *ptr) { // reset the buffer, to properly send charts and metrics rrdpush_sender_thread_data_flush(host); + // send from the beginning + begin = 0; + // make sure the next reconnection will be immediate not_connected_loops = 0; @@ -530,7 +615,7 @@ void *rrdpush_sender_thread(void *ptr) { continue; } else if(unlikely(now_monotonic_sec() - last_sent_t > timeout)) { - error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, timeout, sent_bytes_on_this_connection); + error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", host->hostname, connected_to, timeout, sent_bytes_on_this_connection, send_attempts); rrdpush_sender_thread_close_socket(host); } @@ -544,6 +629,7 @@ void *rrdpush_sender_thread(void *ptr) { debug(D_STREAM, "STREAM: Requesting data output on streaming socket %d...", ofd->fd); ofd->events = POLLOUT; fdmax = 2; + send_attempts++; } else { debug(D_STREAM, "STREAM: Not requesting data output on streaming socket %d (nothing to send now)...", ofd->fd); @@ -743,6 +829,7 @@ static int rrdpush_receive(int fd int rrdpush_enabled = default_rrdpush_enabled; char *rrdpush_destination = default_rrdpush_destination; char *rrdpush_api_key = default_rrdpush_api_key; + char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; time_t alarms_delay = 60; RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY rrdpush_multiple_connections_strategy = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW; @@ -774,6 +861,9 @@ static int rrdpush_receive(int fd rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, key, "multiple connections", rrdpush_multiple_connections_strategy); rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, machine_guid, "multiple connections", rrdpush_multiple_connections_strategy); + rrdpush_send_charts_matching = appconfig_get(&stream_config, key, "default proxy send charts matching", rrdpush_send_charts_matching); + rrdpush_send_charts_matching = appconfig_get(&stream_config, machine_guid, "proxy send charts matching", rrdpush_send_charts_matching); + tags = appconfig_set_default(&stream_config, machine_guid, "host tags", (tags)?tags:""); if(tags && !*tags) tags = NULL; @@ -792,10 +882,11 @@ static int rrdpush_receive(int fd , update_every , history , mode - , (health_enabled != CONFIG_BOOLEAN_NO) - , (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) + , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO) + , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) , rrdpush_destination , rrdpush_api_key + , rrdpush_send_charts_matching ); if(!host) { @@ -1005,6 +1096,14 @@ int rrdpush_receiver_permission_denied(struct web_client *w) { return 401; } +int rrdpush_receiver_too_busy_now(struct web_client *w) { + // we always respond with the same message and error code + // to prevent an attacker from gaining info about the error + buffer_flush(w->response.data); + buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later."); + return 503; +} + int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) { (void)host; @@ -1110,6 +1209,26 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url } } + if(unlikely(web_client_streaming_rate_t > 0)) { + static netdata_mutex_t stream_rate_mutex = NETDATA_MUTEX_INITIALIZER; + static volatile time_t last_stream_accepted_t = 0; + + netdata_mutex_lock(&stream_rate_mutex); + time_t now = now_realtime_sec(); + + if(unlikely(last_stream_accepted_t == 0)) + last_stream_accepted_t = now; + + if(now - last_stream_accepted_t < web_client_streaming_rate_t) { + netdata_mutex_unlock(&stream_rate_mutex); + error("STREAM [receive from [%s]:%s]: too busy to accept new streaming request. Will be allowed in %ld secs.", w->client_ip, w->client_port, (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t))); + return rrdpush_receiver_too_busy_now(w); + } + + last_stream_accepted_t = now; + netdata_mutex_unlock(&stream_rate_mutex); + } + struct rrdpush_thread *rpt = callocz(1, sizeof(struct rrdpush_thread)); rpt->fd = w->ifd; rpt->key = strdupz(key); |