summaryrefslogtreecommitdiffstats
path: root/src/streaming/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/streaming/receiver.c669
1 files changed, 552 insertions, 117 deletions
diff --git a/src/streaming/receiver.c b/src/streaming/receiver.c
index 0c0da212..619d308c 100644
--- a/src/streaming/receiver.c
+++ b/src/streaming/receiver.c
@@ -3,12 +3,13 @@
#include "rrdpush.h"
#include "web/server/h2o/http_server.h"
-extern struct config stream_config;
+// When a child disconnects this is the maximum we will wait
+// before we update the cloud that the child is offline
+#define MAX_CHILD_DISC_DELAY (30000)
+#define MAX_CHILD_DISC_TOLERANCE (125 / 100)
void receiver_state_free(struct receiver_state *rpt) {
-#ifdef ENABLE_HTTPS
netdata_ssl_close(&rpt->ssl);
-#endif
if(rpt->fd != -1) {
internal_error(true, "closing socket...");
@@ -36,7 +37,7 @@ void receiver_state_free(struct receiver_state *rpt) {
freez(rpt);
}
-#include "collectors/plugins.d/pluginsd_parser.h"
+#include "plugins.d/pluginsd_parser.h"
// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly
#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
@@ -71,9 +72,7 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz
errno_clear();
switch(wait_on_socket_or_cancel_with_timeout(
-#ifdef ENABLE_HTTPS
&r->ssl,
-#endif
r->fd, 0, POLLIN, NULL))
{
case 0: // data are waiting
@@ -93,14 +92,10 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz
return -2;
}
-#ifdef ENABLE_HTTPS
if (SSL_connection(&r->ssl))
bytes_read = netdata_ssl_read(&r->ssl, buffer, size);
else
bytes_read = read(r->fd, buffer, size);
-#else
- bytes_read = read(r->fd, buffer, size);
-#endif
} while(bytes_read < 0 && errno == EINTR && tries--);
@@ -325,7 +320,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
.capabilities = rpt->capabilities,
};
- parser = parser_init(&user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
+ parser = parser_init(&user, fd, fd, PARSER_INPUT_SPLIT, ssl);
}
#ifdef ENABLE_H2O
@@ -336,10 +331,6 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
rrd_collector_started();
- // this keeps the parser with its current value
- // so, parser needs to be allocated before pushing it
- CLEANUP_FUNCTION_REGISTER(pluginsd_process_thread_cleanup) parser_ptr = parser;
-
bool compressed_connection = rrdpush_decompression_initialize(rpt);
buffered_reader_init(&rpt->reader);
@@ -365,6 +356,9 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
};
ND_LOG_STACK_PUSH(lgs);
+ __atomic_store_n(&rpt->parser, parser, __ATOMIC_RELAXED);
+ rrdpush_receiver_send_node_and_claim_id_to_child(rpt->host);
+
while(!receiver_should_stop(rpt)) {
if(!buffered_reader_next_line(&rpt->reader, buffer)) {
@@ -389,6 +383,13 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
buffer->len = 0;
buffer->buffer[0] = '\0';
}
+
+ // make sure send_to_plugin() will not write any data to the socket
+ spinlock_lock(&parser->writer.spinlock);
+ parser->fd_output = -1;
+ parser->ssl_output = NULL;
+ spinlock_unlock(&parser->writer.spinlock);
+
result = parser->user.data_collections_count;
return result;
}
@@ -407,7 +408,7 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
bool signal_rrdcontext = false;
bool set_this = false;
- netdata_mutex_lock(&host->receiver_lock);
+ spinlock_lock(&host->receiver_lock);
if (!host->receiver) {
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
@@ -433,7 +434,7 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
}
}
- host->health_log.health_log_history = rpt->config.alarms_history;
+ host->health_log.health_log_retention_s = rpt->config.alarms_history;
// this is a test
// if(rpt->hops <= host->sender->hops)
@@ -450,7 +451,7 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
set_this = true;
}
- netdata_mutex_unlock(&host->receiver_lock);
+ spinlock_unlock(&host->receiver_lock);
if(signal_rrdcontext)
rrdcontext_host_child_connected(host);
@@ -460,47 +461,56 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
static void rrdhost_clear_receiver(struct receiver_state *rpt) {
RRDHOST *host = rpt->host;
- if(host) {
- bool signal_rrdcontext = false;
- netdata_mutex_lock(&host->receiver_lock);
+ if(!host) return;
+ spinlock_lock(&host->receiver_lock);
+ {
// Make sure that we detach this thread and don't kill a freshly arriving receiver
- if(host->receiver == rpt) {
+
+ if (host->receiver == rpt) {
+ spinlock_unlock(&host->receiver_lock);
+ {
+ // run all these without having the receiver lock
+
+ stream_path_child_disconnected(host);
+ rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false);
+ rrdpush_receiver_replication_reset(host);
+ rrdcontext_host_child_disconnected(host);
+
+ if (rpt->config.health_enabled)
+ rrdcalc_child_disconnected(host);
+
+ rrdpush_reset_destinations_postpone_time(host);
+ }
+ spinlock_lock(&host->receiver_lock);
+
+ // now we have the lock again
+
__atomic_sub_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED);
rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
host->trigger_chart_obsoletion_check = 0;
host->child_connect_time = 0;
host->child_disconnected_time = now_realtime_sec();
-
host->health.health_enabled = 0;
- rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false);
-
- signal_rrdcontext = true;
- rrdpush_receiver_replication_reset(host);
-
+ host->rrdpush_last_receiver_exit_reason = rpt->exit.reason;
rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN);
host->receiver = NULL;
- host->rrdpush_last_receiver_exit_reason = rpt->exit.reason;
-
- if(rpt->config.health_enabled)
- rrdcalc_child_disconnected(host);
}
+ }
- netdata_mutex_unlock(&host->receiver_lock);
-
- if(signal_rrdcontext)
- rrdcontext_host_child_disconnected(host);
+ // this must be cleared with the receiver lock
+ pluginsd_process_cleanup(rpt->parser);
+ __atomic_store_n(&rpt->parser, NULL, __ATOMIC_RELAXED);
- rrdpush_reset_destinations_postpone_time(host);
- }
+ spinlock_unlock(&host->receiver_lock);
}
bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) {
bool ret = false;
- netdata_mutex_lock(&host->receiver_lock);
+ spinlock_lock(&host->receiver_lock);
if(host->receiver) {
if(!host->receiver->exit.shutdown) {
@@ -514,12 +524,12 @@ bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) {
int count = 2000;
while (host->receiver && count-- > 0) {
- netdata_mutex_unlock(&host->receiver_lock);
+ spinlock_unlock(&host->receiver_lock);
// let the lock for the receiver thread to exit
sleep_usec(1 * USEC_PER_MS);
- netdata_mutex_lock(&host->receiver_lock);
+ spinlock_lock(&host->receiver_lock);
}
if(host->receiver)
@@ -531,16 +541,14 @@ bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) {
else
ret = true;
- netdata_mutex_unlock(&host->receiver_lock);
+ spinlock_unlock(&host->receiver_lock);
return ret;
}
static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *rpt, const char *msg) {
(void) send_timeout(
-#ifdef ENABLE_HTTPS
&rpt->ssl,
-#endif
rpt->fd,
(char *)msg,
strlen(msg),
@@ -548,7 +556,7 @@ static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *r
5);
}
-void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority) {
+static void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority) {
// this function may be called BEFORE we spawn the receiver thread
// so, we need to add the fields again (it does not harm)
ND_LOG_STACK lgs[] = {
@@ -582,26 +590,26 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.health_enabled = health_plugin_enabled();
rpt->config.alarms_delay = 60;
- rpt->config.alarms_history = HEALTH_LOG_DEFAULT_HISTORY;
+ rpt->config.alarms_history = HEALTH_LOG_RETENTION_DEFAULT;
- rpt->config.rrdpush_enabled = (int)default_rrdpush_enabled;
- rpt->config.rrdpush_destination = default_rrdpush_destination;
- rpt->config.rrdpush_api_key = default_rrdpush_api_key;
- rpt->config.rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
+ rpt->config.rrdpush_enabled = (int)stream_conf_send_enabled;
+ rpt->config.rrdpush_destination = stream_conf_send_destination;
+ rpt->config.rrdpush_api_key = stream_conf_send_api_key;
+ rpt->config.rrdpush_send_charts_matching = stream_conf_send_charts_matching;
- rpt->config.rrdpush_enable_replication = default_rrdpush_enable_replication;
- rpt->config.rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate;
- rpt->config.rrdpush_replication_step = default_rrdpush_replication_step;
+ rpt->config.rrdpush_enable_replication = stream_conf_replication_enabled;
+ rpt->config.rrdpush_seconds_to_replicate = stream_conf_replication_period;
+ rpt->config.rrdpush_replication_step = stream_conf_replication_step;
- rpt->config.update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->config.update_every);
+ rpt->config.update_every = (int)appconfig_get_duration_seconds(&stream_config, rpt->machine_guid, "update every", rpt->config.update_every);
if(rpt->config.update_every < 0) rpt->config.update_every = 1;
- rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", rpt->config.history);
- rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", rpt->config.history);
+ rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->key, "retention", rpt->config.history);
+ rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "retention", rpt->config.history);
if(rpt->config.history < 5) rpt->config.history = 5;
- rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(rpt->config.mode)));
- rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode)));
+ rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "db", rrd_memory_mode_name(rpt->config.mode)));
+ rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "db", rrd_memory_mode_name(rpt->config.mode)));
if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) {
netdata_log_error("STREAM '%s' [receive from %s:%s]: "
@@ -616,34 +624,34 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", rpt->config.health_enabled);
rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", rpt->config.health_enabled);
- rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", rpt->config.alarms_delay);
- rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", rpt->config.alarms_delay);
+ rpt->config.alarms_delay = appconfig_get_duration_seconds(&stream_config, rpt->key, "postpone alerts on connect", rpt->config.alarms_delay);
+ rpt->config.alarms_delay = appconfig_get_duration_seconds(&stream_config, rpt->machine_guid, "postpone alerts on connect", rpt->config.alarms_delay);
- rpt->config.alarms_history = appconfig_get_number(&stream_config, rpt->key, "default health log history", rpt->config.alarms_history);
- rpt->config.alarms_history = appconfig_get_number(&stream_config, rpt->machine_guid, "health log history", rpt->config.alarms_history);
+ rpt->config.alarms_history = appconfig_get_duration_seconds(&stream_config, rpt->key, "health log retention", rpt->config.alarms_history);
+ rpt->config.alarms_history = appconfig_get_duration_seconds(&stream_config, rpt->machine_guid, "health log retention", rpt->config.alarms_history);
- rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rpt->config.rrdpush_enabled);
+ rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "proxy enabled", rpt->config.rrdpush_enabled);
rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rpt->config.rrdpush_enabled);
- rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rpt->config.rrdpush_destination);
+ rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->key, "proxy destination", rpt->config.rrdpush_destination);
rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rpt->config.rrdpush_destination);
- rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rpt->config.rrdpush_api_key);
+ rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "proxy api key", rpt->config.rrdpush_api_key);
rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rpt->config.rrdpush_api_key);
- rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rpt->config.rrdpush_send_charts_matching);
+ rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "proxy send charts matching", rpt->config.rrdpush_send_charts_matching);
rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rpt->config.rrdpush_send_charts_matching);
rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rpt->config.rrdpush_enable_replication);
rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rpt->config.rrdpush_enable_replication);
- rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate);
- rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate);
+ rpt->config.rrdpush_seconds_to_replicate = appconfig_get_duration_seconds(&stream_config, rpt->key, "replication period", rpt->config.rrdpush_seconds_to_replicate);
+ rpt->config.rrdpush_seconds_to_replicate = appconfig_get_duration_seconds(&stream_config, rpt->machine_guid, "replication period", rpt->config.rrdpush_seconds_to_replicate);
- rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step);
- rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step);
+ rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "replication step", rpt->config.rrdpush_replication_step);
+ rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "replication step", rpt->config.rrdpush_replication_step);
- rpt->config.rrdpush_compression = default_rrdpush_compression_enabled;
+ rpt->config.rrdpush_compression = stream_conf_compression_enabled;
rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression);
rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression);
@@ -652,7 +660,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
is_ephemeral = appconfig_get_boolean(&stream_config, rpt->machine_guid, "is ephemeral node", is_ephemeral);
if(rpt->config.rrdpush_compression) {
- char *order = appconfig_get(&stream_config, rpt->key, "compression algorithms order", RRDPUSH_COMPRESSION_ALGORITHMS_ORDER);
+ const char *order = appconfig_get(&stream_config, rpt->key, "compression algorithms order", RRDPUSH_COMPRESSION_ALGORITHMS_ORDER);
order = appconfig_get(&stream_config, rpt->machine_guid, "compression algorithms order", order);
rrdpush_parse_compression_order(rpt, order);
}
@@ -730,11 +738,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
, rpt->host->rrd_history_entries
, rrd_memory_mode_name(rpt->host->rrd_memory_mode)
, (rpt->config.health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((rpt->config.health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
-#ifdef ENABLE_HTTPS
, (rpt->ssl.conn != NULL) ? " SSL," : ""
-#else
- , ""
-#endif
);
#endif // NETDATA_INTERNAL_CHECKS
@@ -784,9 +788,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
} else {
#endif
ssize_t bytes_sent = send_timeout(
-#ifdef ENABLE_HTTPS
&rpt->ssl,
-#endif
rpt->fd, initial_response, strlen(initial_response), 0, 60);
if(bytes_sent != (ssize_t)strlen(initial_response)) {
@@ -828,13 +830,9 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt, "connected and ready to receive data",
RRDPUSH_STATUS_CONNECTED, NDLP_INFO);
-#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected
- if (netdata_cloud_enabled)
- aclk_host_state_update(rpt->host, 1, 1);
-#endif
-
+ schedule_node_state_update(rpt->host, 300);
rrdhost_set_is_parent_label();
if (is_ephemeral)
@@ -843,50 +841,28 @@ static void rrdpush_receive(struct receiver_state *rpt)
// let it reconnect to parent immediately
rrdpush_reset_destinations_postpone_time(rpt->host);
- size_t count = streaming_parser(rpt, &cd, rpt->fd,
-#ifdef ENABLE_HTTPS
- (rpt->ssl.conn) ? &rpt->ssl : NULL
-#else
- NULL
-#endif
- );
+ // receive data
+ size_t count = streaming_parser(rpt, &cd, rpt->fd, (rpt->ssl.conn) ? &rpt->ssl : NULL);
+ // the parser stopped
receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, false);
{
char msg[100 + 1];
snprintfz(msg, sizeof(msg) - 1, "disconnected (completed %zu updates)", count);
- rrdpush_receive_log_status(
- rpt, msg,
- RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING);
+ rrdpush_receive_log_status(rpt, msg, RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING);
}
-#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// a child disconnected
- if (netdata_cloud_enabled)
- aclk_host_state_update(rpt->host, 0, 1);
-#endif
+ STREAM_PATH tmp = rrdhost_stream_path_fetch(rpt->host);
+ uint64_t total_reboot = (tmp.start_time + tmp.shutdown_time);
+ schedule_node_state_update(rpt->host, MIN((total_reboot * MAX_CHILD_DISC_TOLERANCE), MAX_CHILD_DISC_DELAY));
cleanup:
;
}
-static void rrdpush_receiver_thread_cleanup(void *pptr) {
- struct receiver_state *rpt = CLEANUP_FUNCTION_GET_PTR(pptr);
- if(!rpt) return;
-
- netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
- "receive thread ended (task id %d)"
- , rpt->hostname ? rpt->hostname : "-"
- , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-", gettid_cached());
-
- worker_unregister();
- rrdhost_clear_receiver(rpt);
- receiver_state_free(rpt);
- rrdhost_set_is_parent_label();
-}
-
static bool stream_receiver_log_capabilities(BUFFER *wb, void *ptr) {
struct receiver_state *rpt = ptr;
if(!rpt)
@@ -901,16 +877,11 @@ static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) {
if(!rpt)
return false;
-#ifdef ENABLE_HTTPS
buffer_strcat(wb, SSL_connection(&rpt->ssl) ? "https" : "http");
-#else
- buffer_strcat(wb, "http");
-#endif
return true;
}
void *rrdpush_receiver_thread(void *ptr) {
- CLEANUP_FUNCTION_REGISTER(rrdpush_receiver_thread_cleanup) cleanup_ptr = ptr;
worker_register("STREAMRCV");
worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ,
@@ -942,5 +913,469 @@ void *rrdpush_receiver_thread(void *ptr) {
, rpt->client_port);
rrdpush_receive(rpt);
+
+ netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
+ "receive thread ended (task id %d)"
+ , rpt->hostname ? rpt->hostname : "-"
+ , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-", gettid_cached());
+
+ worker_unregister();
+ rrdhost_clear_receiver(rpt);
+ rrdhost_set_is_parent_label();
+ receiver_state_free(rpt);
return NULL;
}
+
+int rrdpush_receiver_permission_denied(struct web_client *w) {
+ // we always respond with the same message and error code
+ // to prevent an attacker from gaining info about the error
+ buffer_flush(w->response.data);
+ buffer_strcat(w->response.data, START_STREAMING_ERROR_NOT_PERMITTED);
+ return HTTP_RESP_UNAUTHORIZED;
+}
+
+int rrdpush_receiver_too_busy_now(struct web_client *w) {
+ // we always respond with the same message and error code
+ // to prevent an attacker from gaining info about the error
+ buffer_flush(w->response.data);
+ buffer_strcat(w->response.data, START_STREAMING_ERROR_BUSY_TRY_LATER);
+ return HTTP_RESP_SERVICE_UNAVAILABLE;
+}
+
+static void rrdpush_receiver_takeover_web_connection(struct web_client *w, struct receiver_state *rpt) {
+ rpt->fd = w->ifd;
+
+ rpt->ssl.conn = w->ssl.conn;
+ rpt->ssl.state = w->ssl.state;
+
+ w->ssl = NETDATA_SSL_UNSET_CONNECTION;
+
+ WEB_CLIENT_IS_DEAD(w);
+
+ if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) {
+ web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET);
+ }
+ else {
+ if(w->ifd == w->ofd)
+ w->ifd = w->ofd = -1;
+ else
+ w->ifd = -1;
+ }
+
+ buffer_flush(w->response.data);
+}
+
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx __maybe_unused) {
+
+ if(!service_running(ABILITY_STREAMING_CONNECTIONS))
+ return rrdpush_receiver_too_busy_now(w);
+
+ struct receiver_state *rpt = callocz(1, sizeof(*rpt));
+ rpt->connected_since_s = now_realtime_sec();
+ rpt->last_msg_t = now_monotonic_sec();
+ rpt->hops = 1;
+
+ rpt->capabilities = STREAM_CAP_INVALID;
+
+#ifdef ENABLE_H2O
+ rpt->h2o_ctx = h2o_ctx;
+#endif
+
+ __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
+ __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
+
+ rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info));
+ rpt->system_info->hops = rpt->hops;
+
+ rpt->fd = -1;
+ rpt->client_ip = strdupz(w->client_ip);
+ rpt->client_port = strdupz(w->client_port);
+
+ rpt->ssl = NETDATA_SSL_UNSET_CONNECTION;
+
+ rpt->config.update_every = default_rrd_update_every;
+
+ // parse the parameters and fill rpt and rpt->system_info
+
+ while(decoded_query_string) {
+ char *value = strsep_skip_consecutive_separators(&decoded_query_string, "&");
+ if(!value || !*value) continue;
+
+ char *name = strsep_skip_consecutive_separators(&value, "=");
+ if(!name || !*name) continue;
+ if(!value || !*value) continue;
+
+ if(!strcmp(name, "key") && !rpt->key)
+ rpt->key = strdupz(value);
+
+ else if(!strcmp(name, "hostname") && !rpt->hostname)
+ rpt->hostname = strdupz(value);
+
+ else if(!strcmp(name, "registry_hostname") && !rpt->registry_hostname)
+ rpt->registry_hostname = strdupz(value);
+
+ else if(!strcmp(name, "machine_guid") && !rpt->machine_guid)
+ rpt->machine_guid = strdupz(value);
+
+ else if(!strcmp(name, "update_every"))
+ rpt->config.update_every = (int)strtoul(value, NULL, 0);
+
+ else if(!strcmp(name, "os") && !rpt->os)
+ rpt->os = strdupz(value);
+
+ else if(!strcmp(name, "timezone") && !rpt->timezone)
+ rpt->timezone = strdupz(value);
+
+ else if(!strcmp(name, "abbrev_timezone") && !rpt->abbrev_timezone)
+ rpt->abbrev_timezone = strdupz(value);
+
+ else if(!strcmp(name, "utc_offset"))
+ rpt->utc_offset = (int32_t)strtol(value, NULL, 0);
+
+ else if(!strcmp(name, "hops"))
+ rpt->hops = rpt->system_info->hops = (uint16_t) strtoul(value, NULL, 0);
+
+ else if(!strcmp(name, "ml_capable"))
+ rpt->system_info->ml_capable = strtoul(value, NULL, 0);
+
+ else if(!strcmp(name, "ml_enabled"))
+ rpt->system_info->ml_enabled = strtoul(value, NULL, 0);
+
+ else if(!strcmp(name, "mc_version"))
+ rpt->system_info->mc_version = strtoul(value, NULL, 0);
+
+ else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID))
+ rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0), NULL, false);
+
+ else {
+ // An old Netdata child does not have a compatible streaming protocol, map to something sane.
+ if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME"))
+ name = "NETDATA_HOST_OS_NAME";
+
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID"))
+ name = "NETDATA_HOST_OS_ID";
+
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE"))
+ name = "NETDATA_HOST_OS_ID_LIKE";
+
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION"))
+ name = "NETDATA_HOST_OS_VERSION";
+
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID"))
+ name = "NETDATA_HOST_OS_VERSION_ID";
+
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION"))
+ name = "NETDATA_HOST_OS_DETECTION";
+
+ else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID))
+ rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false);
+
+ if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) {
+ nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: "
+ "request has parameter '%s' = '%s', which is not used."
+ , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-"
+ , rpt->client_ip, rpt->client_port
+ , name, value);
+ }
+ }
+ }
+
+ if (rpt->capabilities & STREAM_CAP_INVALID)
+ // no version is supplied, assume version 0;
+ rpt->capabilities = convert_stream_version_to_capabilities(0, NULL, false);
+
+ // find the program name and version
+ if(w->user_agent && w->user_agent[0]) {
+ char *t = strchr(w->user_agent, '/');
+ if(t && *t) {
+ *t = '\0';
+ t++;
+ }
+
+ rpt->program_name = strdupz(w->user_agent);
+ if(t && *t) rpt->program_version = strdupz(t);
+ }
+
+ // check if we should accept this connection
+
+ if(!rpt->key || !*rpt->key) {
+ rrdpush_receive_log_status(
+ rpt, "request without an API key, rejecting connection",
+ RRDPUSH_STATUS_NO_API_KEY, NDLP_WARNING);
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(!rpt->hostname || !*rpt->hostname) {
+ rrdpush_receive_log_status(
+ rpt, "request without a hostname, rejecting connection",
+ RRDPUSH_STATUS_NO_HOSTNAME, NDLP_WARNING);
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(!rpt->registry_hostname)
+ rpt->registry_hostname = strdupz(rpt->hostname);
+
+ if(!rpt->machine_guid || !*rpt->machine_guid) {
+ rrdpush_receive_log_status(
+ rpt, "request without a machine GUID, rejecting connection",
+ RRDPUSH_STATUS_NO_MACHINE_GUID, NDLP_WARNING);
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ {
+ char buf[GUID_LEN + 1];
+
+ if (regenerate_guid(rpt->key, buf) == -1) {
+ rrdpush_receive_log_status(
+ rpt, "API key is not a valid UUID (use the command uuidgen to generate one)",
+ RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING);
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if (regenerate_guid(rpt->machine_guid, buf) == -1) {
+ rrdpush_receive_log_status(
+ rpt, "machine GUID is not a valid UUID",
+ RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING);
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+ }
+
+ const char *api_key_type = appconfig_get(&stream_config, rpt->key, "type", "api");
+ if(!api_key_type || !*api_key_type) api_key_type = "unknown";
+ if(strcmp(api_key_type, "api") != 0) {
+ rrdpush_receive_log_status(
+ rpt, "API key is a machine GUID",
+ RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING);
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) {
+ rrdpush_receive_log_status(
+ rpt, "API key is not enabled",
+ RRDPUSH_STATUS_API_KEY_DISABLED, NDLP_WARNING);
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ {
+ SIMPLE_PATTERN *key_allow_from = simple_pattern_create(
+ appconfig_get(&stream_config, rpt->key, "allow from", "*"),
+ NULL, SIMPLE_PATTERN_EXACT, true);
+
+ if(key_allow_from) {
+ if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
+ simple_pattern_free(key_allow_from);
+
+ rrdpush_receive_log_status(
+ rpt, "API key is not allowed from this IP",
+ RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING);
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ simple_pattern_free(key_allow_from);
+ }
+ }
+
+ {
+ const char *machine_guid_type = appconfig_get(&stream_config, rpt->machine_guid, "type", "machine");
+ if (!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown";
+
+ if (strcmp(machine_guid_type, "machine") != 0) {
+ rrdpush_receive_log_status(
+ rpt, "machine GUID is an API key",
+ RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING);
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+ }
+
+ if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) {
+ rrdpush_receive_log_status(
+ rpt, "machine GUID is not enabled",
+ RRDPUSH_STATUS_MACHINE_GUID_DISABLED, NDLP_WARNING);
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ {
+ SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(
+ appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"),
+ NULL, SIMPLE_PATTERN_EXACT, true);
+
+ if(machine_allow_from) {
+ if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
+ simple_pattern_free(machine_allow_from);
+
+ rrdpush_receive_log_status(
+ rpt, "machine GUID is not allowed from this IP",
+ RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING);
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ simple_pattern_free(machine_allow_from);
+ }
+ }
+
+ if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
+
+ rrdpush_receiver_takeover_web_connection(w, rpt);
+
+ rrdpush_receive_log_status(
+ rpt, "machine GUID is my own",
+ RRDPUSH_STATUS_LOCALHOST, NDLP_DEBUG);
+
+ char initial_response[HTTP_HEADER_SIZE + 1];
+ snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
+
+ if(send_timeout(
+ &rpt->ssl,
+ rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
+
+ nd_log_daemon(NDLP_ERR, "STREAM '%s' [receive from [%s]:%s]: "
+ "failed to reply."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
+ }
+
+ receiver_state_free(rpt);
+ return HTTP_RESP_OK;
+ }
+
+ if(unlikely(web_client_streaming_rate_t > 0)) {
+ static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
+ static time_t last_stream_accepted_t = 0;
+
+ time_t now = now_realtime_sec();
+ spinlock_lock(&spinlock);
+
+ if(unlikely(last_stream_accepted_t == 0))
+ last_stream_accepted_t = now;
+
+ if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
+ spinlock_unlock(&spinlock);
+
+ char msg[100 + 1];
+ snprintfz(msg, sizeof(msg) - 1,
+ "rate limit, will accept new connection in %ld secs",
+ (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
+
+ rrdpush_receive_log_status(
+ rpt, msg,
+ RRDPUSH_STATUS_RATE_LIMIT, NDLP_NOTICE);
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_too_busy_now(w);
+ }
+
+ last_stream_accepted_t = now;
+ spinlock_unlock(&spinlock);
+ }
+
+ /*
+ * Quick path for rejecting multiple connections. The lock taken is fine-grained - it only protects the receiver
+ * pointer within the host (if a host exists). This protects against multiple concurrent web requests hitting
+ * separate threads within the web-server and landing here. The lock guards the thread-shutdown sequence that
+ * detaches the receiver from the host. If the host is being created (first time-access) then we also use the
+ * lock to prevent race-hazard (two threads try to create the host concurrently, one wins and the other does a
+ * lookup to the now-attached structure).
+ */
+
+ {
+ time_t age = 0;
+ bool receiver_stale = false;
+ bool receiver_working = false;
+
+ rrd_rdlock();
+ RRDHOST *host = rrdhost_find_by_guid(rpt->machine_guid);
+ if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */
+ host = NULL;
+
+ if (host) {
+ spinlock_lock(&host->receiver_lock);
+ if (host->receiver) {
+ age = now_monotonic_sec() - host->receiver->last_msg_t;
+
+ if (age < 30)
+ receiver_working = true;
+ else
+ receiver_stale = true;
+ }
+ spinlock_unlock(&host->receiver_lock);
+ }
+ rrd_rdunlock();
+
+ if (receiver_stale && stop_streaming_receiver(host, STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER)) {
+ // we stopped the receiver
+ // we can proceed with this connection
+ receiver_stale = false;
+
+ nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: "
+ "stopped previous stale receiver to accept this one."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
+ }
+
+ if (receiver_working || receiver_stale) {
+ // another receiver is already connected
+ // try again later
+
+ char msg[200 + 1];
+ snprintfz(msg, sizeof(msg) - 1,
+ "multiple connections for same host, "
+ "old connection was last used %ld secs ago%s",
+ age, receiver_stale ? " (signaled old receiver to stop)" : " (new connection not accepted)");
+
+ rrdpush_receive_log_status(
+ rpt, msg,
+ RRDPUSH_STATUS_ALREADY_CONNECTED, NDLP_DEBUG);
+
+ // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
+ buffer_flush(w->response.data);
+ buffer_strcat(w->response.data, START_STREAMING_ERROR_ALREADY_STREAMING);
+ receiver_state_free(rpt);
+ return HTTP_RESP_CONFLICT;
+ }
+ }
+
+ rrdpush_receiver_takeover_web_connection(w, rpt);
+
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_RECEIVER "[%s]", rpt->hostname);
+ tag[NETDATA_THREAD_TAG_MAX] = '\0';
+
+ rpt->thread = nd_thread_create(tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt);
+ if(!rpt->thread) {
+ rrdpush_receive_log_status(
+ rpt, "can't create receiver thread",
+ RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR);
+
+ buffer_flush(w->response.data);
+ buffer_strcat(w->response.data, "Can't handle this request");
+ receiver_state_free(rpt);
+ return HTTP_RESP_INTERNAL_SERVER_ERROR;
+ }
+
+ // prevent the caller from closing the streaming socket
+ return HTTP_RESP_OK;
+}