summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c152
-rw-r--r--streaming/replication.c11
-rw-r--r--streaming/rrdpush.c186
-rw-r--r--streaming/rrdpush.h41
-rw-r--r--streaming/sender.c391
5 files changed, 554 insertions, 227 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index ff7a95629..709f15bd5 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -31,10 +31,14 @@ void receiver_state_free(struct receiver_state *rpt) {
freez(rpt->program_version);
#ifdef ENABLE_HTTPS
- if(rpt->ssl.conn)
- SSL_free(rpt->ssl.conn);
+ netdata_ssl_close(&rpt->ssl);
#endif
+ if(rpt->fd != -1) {
+ internal_error(true, "closing socket...");
+ close(rpt->fd);
+ }
+
#ifdef ENABLE_COMPRESSION
if (rpt->decompressor)
rpt->decompressor->destroy(&rpt->decompressor);
@@ -100,13 +104,18 @@ static int read_stream(struct receiver_state *r, char* buffer, size_t size) {
return 0;
}
+ ssize_t bytes_read;
+
#ifdef ENABLE_HTTPS
- if (r->ssl.conn && r->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
- return (int)netdata_ssl_read(r->ssl.conn, buffer, size);
+ if (SSL_connection(&r->ssl))
+ bytes_read = netdata_ssl_read(&r->ssl, buffer, size);
+ else
+ bytes_read = read(r->fd, buffer, size);
+#else
+ bytes_read = read(r->fd, buffer, size);
#endif
- ssize_t bytes_read = read(r->fd, buffer, size);
- if(bytes_read == 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) {
+ if((bytes_read == 0 || bytes_read == -1) && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) {
error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__);
bytes_read = -3;
}
@@ -119,23 +128,6 @@ static int read_stream(struct receiver_state *r, char* buffer, size_t size) {
bytes_read = -2;
}
-// do {
-// bytes_read = (int) fread(buffer, 1, size, fp);
-// if (unlikely(bytes_read <= 0)) {
-// if(feof(fp)) {
-// internal_error(true, "%s(): fread() failed with EOF", __FUNCTION__);
-// bytes_read = -2;
-// }
-// else if(ferror(fp)) {
-// internal_error(true, "%s(): fread() failed with ERROR", __FUNCTION__);
-// bytes_read = -3;
-// }
-// else bytes_read = 0;
-// }
-// else
-// worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, bytes_read);
-// } while(bytes_read == 0);
-
return (int)bytes_read;
}
@@ -323,12 +315,6 @@ static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t b
return NULL;
}
-static void streaming_parser_thread_cleanup(void *ptr) {
- PARSER *parser = (PARSER *)ptr;
- rrd_collector_finished();
- parser_destroy(parser);
-}
-
bool plugin_is_enabled(struct plugind *cd);
static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) {
@@ -352,7 +338,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
// this keeps the parser with its current value
// so, parser needs to be allocated before pushing it
- netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser);
+ netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
@@ -437,6 +423,58 @@ static void rrdpush_receiver_replication_reset(RRDHOST *host) {
rrdhost_receiver_replicating_charts_zero(host);
}
+void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) {
+ size_t receiver_hops = host->system_info ? host->system_info->hops : (host == localhost) ? 0 : 1;
+
+ netdata_mutex_lock(&host->receiver_lock);
+
+ buffer_json_member_add_object(wb, key);
+ buffer_json_member_add_uint64(wb, "hops", receiver_hops);
+
+ bool online = host == localhost || !rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+ buffer_json_member_add_boolean(wb, "online", online);
+
+ if(host->child_connect_time || host->child_disconnected_time) {
+ time_t since = MAX(host->child_connect_time, host->child_disconnected_time);
+ buffer_json_member_add_time_t(wb, "since", since);
+ buffer_json_member_add_time_t(wb, "age", now - since);
+ }
+
+ if(!online && host->rrdpush_last_receiver_exit_reason)
+ buffer_json_member_add_string(wb, "reason", host->rrdpush_last_receiver_exit_reason);
+
+ if(host != localhost && host->receiver) {
+ buffer_json_member_add_object(wb, "replication");
+ {
+ size_t instances = rrdhost_receiver_replicating_charts(host);
+ buffer_json_member_add_boolean(wb, "in_progress", instances);
+ buffer_json_member_add_double(wb, "completion", host->rrdpush_receiver_replication_percent);
+ buffer_json_member_add_uint64(wb, "instances", instances);
+ }
+ buffer_json_object_close(wb); // replication
+
+ buffer_json_member_add_object(wb, "source");
+ {
+
+ char buf[1024 + 1];
+ SOCKET_PEERS peers = socket_peers(host->receiver->fd);
+ bool ssl = SSL_connection(&host->receiver->ssl);
+
+ snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : "");
+ buffer_json_member_add_string(wb, "local", buf);
+
+ snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : "");
+ buffer_json_member_add_string(wb, "remote", buf);
+
+ stream_capabilities_to_json_array(wb, host->receiver->capabilities, "capabilities");
+ }
+ buffer_json_object_close(wb); // source
+ }
+ buffer_json_object_close(wb); // collection
+
+ netdata_mutex_unlock(&host->receiver_lock);
+}
+
static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
bool signal_rrdcontext = false;
bool set_this = false;
@@ -474,6 +512,8 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
aclk_queue_node_info(rpt->host, true);
+ rrdpush_reset_destinations_postpone_time(host);
+
set_this = true;
}
@@ -506,16 +546,17 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) {
signal_rrdcontext = true;
rrdpush_receiver_replication_reset(host);
- if (host->receiver == rpt)
- host->receiver = NULL;
-
rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN);
+ host->receiver = NULL;
+ host->rrdpush_last_receiver_exit_reason = rpt->exit.reason;
}
netdata_mutex_unlock(&host->receiver_lock);
if(signal_rrdcontext)
rrdcontext_host_child_disconnected(host);
+
+ rrdpush_reset_destinations_postpone_time(host);
}
}
@@ -549,7 +590,7 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) {
"thread %d takes too long to stop, giving up..."
, rrdhost_hostname(host)
, host->receiver->client_ip, host->receiver->client_port
- , gettid());
+ , host->receiver->tid);
else
ret = true;
@@ -558,6 +599,18 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) {
return ret;
}
+static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *rpt, const char *msg) {
+ (void) send_timeout(
+#ifdef ENABLE_HTTPS
+ &rpt->ssl,
+#endif
+ rpt->fd,
+ (char *)msg,
+ strlen(msg),
+ 0,
+ 5);
+}
+
void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) {
log_stream_connection(rpt->client_ip, rpt->client_port,
@@ -585,7 +638,7 @@ static void rrdhost_reset_destinations(RRDHOST *host) {
d->postpone_reconnection_until = 0;
}
-static int rrdpush_receive(struct receiver_state *rpt)
+static void rrdpush_receive(struct receiver_state *rpt)
{
rpt->config.mode = default_rrd_memory_mode;
rpt->config.history = default_rrd_history_entries;
@@ -689,14 +742,14 @@ static int rrdpush_receive(struct receiver_state *rpt)
if(!host) {
rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION");
- close(rpt->fd);
- return 1;
+ rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INTERNAL_ERROR);
+ goto cleanup;
}
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER");
- close(rpt->fd);
- return 1;
+ rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION);
+ goto cleanup;
}
// system_info has been consumed by the host structure
@@ -704,8 +757,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
if(!rrdhost_set_receiver(host, rpt)) {
rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION");
- close(rpt->fd);
- return 1;
+ rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_ALREADY_STREAMING);
+ goto cleanup;
}
}
@@ -776,15 +829,16 @@ static int rrdpush_receive(struct receiver_state *rpt)
}
debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
- if(send_timeout(
+ ssize_t bytes_sent = send_timeout(
#ifdef ENABLE_HTTPS
&rpt->ssl,
#endif
- rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
+ rpt->fd, initial_response, strlen(initial_response), 0, 60);
+ if(bytes_sent != (ssize_t)strlen(initial_response)) {
+ internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response));
rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION");
- close(rpt->fd);
- return 0;
+ goto cleanup;
}
}
@@ -850,9 +904,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdhost_set_is_parent_label(--localhost->connected_children_count);
- // cleanup
- close(rpt->fd);
- return (int)count;
+cleanup:
+ ;
}
static void rrdpush_receiver_thread_cleanup(void *ptr) {
@@ -879,7 +932,8 @@ void *rrdpush_receiver_thread(void *ptr) {
worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE);
struct receiver_state *rpt = (struct receiver_state *)ptr;
- info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+ rpt->tid = gettid();
+ info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid);
rrdpush_receive(rpt);
diff --git a/streaming/replication.c b/streaming/replication.c
index a50913a1a..c6fafc357 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -274,6 +274,12 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
replication_queries.queries_finished += queries;
replication_queries.points_read += q->points_read;
replication_queries.points_generated += q->points_generated;
+
+ if(q->st && q->st->rrdhost->sender) {
+ struct sender_state *s = q->st->rrdhost->sender;
+ s->replication.latest_completed_before_t = q->query.before;
+ }
+
netdata_spinlock_unlock(&replication_queries.spinlock);
}
@@ -644,7 +650,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
buffer_fast_strcat(wb, "\n", 1);
worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_REPLICATION);
worker_is_busy(WORKER_JOB_CLEANUP);
if(enable_streaming) {
@@ -1466,6 +1472,9 @@ void replication_add_request(struct sender_state *sender, const char *chart_id,
.not_indexed_preprocessing = false,
};
+ if(!sender->replication.oldest_request_after_t || rq.after < sender->replication.oldest_request_after_t)
+ sender->replication.oldest_request_after_t = rq.after;
+
if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
replication_execute_request(&rq, false);
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 62b537f0c..c481871cc 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -49,7 +49,6 @@ bool default_rrdpush_enable_replication = true;
time_t default_rrdpush_seconds_to_replicate = 86400;
time_t default_rrdpush_replication_step = 600;
#ifdef ENABLE_HTTPS
-int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL;
char *netdata_ssl_ca_path = NULL;
char *netdata_ssl_ca_file = NULL;
#endif
@@ -137,24 +136,10 @@ int rrdpush_init() {
}
#ifdef ENABLE_HTTPS
- if (netdata_use_ssl_on_stream == NETDATA_SSL_OPTIONAL) {
- if (default_rrdpush_destination){
- char *test = strstr(default_rrdpush_destination,":SSL");
- if(test){
- *test = 0X00;
- netdata_use_ssl_on_stream = NETDATA_SSL_FORCE;
- }
- }
- }
+ netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate);
- bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO);
-
- if(invalid_certificate == CONFIG_BOOLEAN_YES){
- if(netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
- info("Netdata is configured to accept invalid SSL certificate.");
- netdata_ssl_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
- }
- }
+ if(!netdata_ssl_validate_certificate_sender)
+ info("SSL: streaming senders will skip SSL certificates verification.");
netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL);
netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL);
@@ -390,7 +375,7 @@ bool rrdset_push_chart_definition_now(RRDSET *st) {
BUFFER *wb = sender_start(host->sender);
rrdpush_send_chart_definition(wb, st);
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
return true;
@@ -458,7 +443,7 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
}
- sender_commit(st->rrdhost->sender, rsb->wb);
+ sender_commit(st->rrdhost->sender, rsb->wb, STREAM_TRAFFIC_TYPE_DATA);
*rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
}
@@ -498,7 +483,7 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
if(unlikely(!exposed_upstream)) {
BUFFER *wb = sender_start(host->sender);
replication_in_progress = rrdpush_send_chart_definition(wb, st);
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
}
if(replication_in_progress)
@@ -529,7 +514,7 @@ void rrdpush_send_host_labels(RRDHOST *host) {
rrdlabels_walkthrough_read(host->rrdlabels, send_labels_callback, wb);
buffer_sprintf(wb, "OVERWRITE %s\n", "labels");
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
}
@@ -548,7 +533,7 @@ void rrdpush_claimed_id(RRDHOST *host)
buffer_sprintf(wb, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") );
rrdhost_aclk_state_unlock(host);
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
}
@@ -579,6 +564,7 @@ int connect_to_one_of_destinations(
if (reconnects_counter)
*reconnects_counter += 1;
+ d->last_attempt = now;
sock = connect_to_this(string2str(d->destination), default_port, timeout);
if (sock != -1) {
@@ -610,6 +596,14 @@ bool destinations_init_add_one(char *entry, void *data) {
struct destinations_init_tmp *t = data;
struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations));
+ char *colon_ssl = strstr(entry, ":SSL");
+ if(colon_ssl) {
+ *colon_ssl = '\0';
+ d->ssl = true;
+ }
+ else
+ d->ssl = false;
+
d->destination = string_strdupz(entry);
__atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED);
@@ -712,7 +706,7 @@ int rrdpush_receiver_permission_denied(struct web_client *w) {
// we always respond with the same message and error code
// to prevent an attacker from gaining info about the error
buffer_flush(w->response.data);
- buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info.");
+ buffer_strcat(w->response.data, START_STREAMING_ERROR_NOT_PERMITTED);
return HTTP_RESP_UNAUTHORIZED;
}
@@ -720,10 +714,35 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) {
// we always respond with the same message and error code
// to prevent an attacker from gaining info about the error
buffer_flush(w->response.data);
- buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later.");
+ buffer_strcat(w->response.data, START_STREAMING_ERROR_BUSY_TRY_LATER);
return HTTP_RESP_SERVICE_UNAVAILABLE;
}
+static void rrdpush_receiver_takeover_web_connection(struct web_client *w, struct receiver_state *rpt) {
+ rpt->fd = w->ifd;
+
+#ifdef ENABLE_HTTPS
+ rpt->ssl.conn = w->ssl.conn;
+ rpt->ssl.state = w->ssl.state;
+
+ w->ssl = NETDATA_SSL_UNSET_CONNECTION;
+#endif
+
+ WEB_CLIENT_IS_DEAD(w);
+
+ if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) {
+ web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET);
+ }
+ else {
+ if(w->ifd == w->ofd)
+ w->ifd = w->ofd = -1;
+ else
+ w->ifd = -1;
+ }
+
+ buffer_flush(w->response.data);
+}
+
void *rrdpush_receiver_thread(void *ptr);
int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) {
@@ -741,20 +760,16 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info));
rpt->system_info->hops = rpt->hops;
- rpt->fd = w->ifd;
+ rpt->fd = -1;
rpt->client_ip = strdupz(w->client_ip);
rpt->client_port = strdupz(w->client_port);
- rpt->config.update_every = default_rrd_update_every;
-
#ifdef ENABLE_HTTPS
- rpt->ssl.conn = w->ssl.conn;
- rpt->ssl.flags = w->ssl.flags;
-
- w->ssl.conn = NULL;
- w->ssl.flags = NETDATA_SSL_START;
+ rpt->ssl = NETDATA_SSL_UNSET_CONNECTION;
#endif
+ rpt->config.update_every = default_rrd_update_every;
+
// parse the parameters and fill rpt and rpt->system_info
while(decoded_query_string) {
@@ -1011,6 +1026,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
+ rrdpush_receiver_takeover_web_connection(w, rpt);
+
rrdpush_receive_log_status(
rpt,
"machine GUID is my own",
@@ -1032,9 +1049,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
);
}
- close(rpt->fd);
receiver_state_free(rpt);
- return web_client_socket_is_now_used_for_streaming(w);
+ return HTTP_RESP_OK;
}
if(unlikely(web_client_streaming_rate_t > 0)) {
@@ -1130,7 +1146,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
// Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
buffer_flush(w->response.data);
- buffer_strcat(w->response.data, "This GUID is already streaming to this server");
+ buffer_strcat(w->response.data, START_STREAMING_ERROR_ALREADY_STREAMING);
receiver_state_free(rpt);
return HTTP_RESP_CONFLICT;
}
@@ -1138,8 +1154,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
debug(D_SYSTEM, "starting STREAM receive thread.");
- char tag[FILENAME_MAX + 1];
- snprintfz(tag, FILENAME_MAX, THREAD_TAG_STREAM_RECEIVER "[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
+ rrdpush_receiver_takeover_web_connection(w, rpt);
+
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_RECEIVER "[%s]", rpt->hostname);
+ tag[NETDATA_THREAD_TAG_MAX] = '\0';
if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) {
rrdpush_receive_log_status(
@@ -1154,23 +1173,86 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
// prevent the caller from closing the streaming socket
- return web_client_socket_is_now_used_for_streaming(w);
+ return HTTP_RESP_OK;
+}
+
+void rrdpush_reset_destinations_postpone_time(RRDHOST *host) {
+ struct rrdpush_destinations *d;
+ for (d = host->destinations; d; d = d->next)
+ d->postpone_reconnection_until = 0;
}
+static struct {
+ STREAM_HANDSHAKE err;
+ const char *str;
+} handshake_errors[] = {
+ { STREAM_HANDSHAKE_OK_V5, "OK_V5" },
+ { STREAM_HANDSHAKE_OK_V4, "OK_V4" },
+ { STREAM_HANDSHAKE_OK_V3, "OK_V3" },
+ { STREAM_HANDSHAKE_OK_V2, "OK_V2" },
+ { STREAM_HANDSHAKE_OK_V1, "OK_V1" },
+ { STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, "BAD HANDSHAKE" },
+ { STREAM_HANDSHAKE_ERROR_LOCALHOST, "LOCALHOST" },
+ { STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, "ALREADY CONNECTED" },
+ { STREAM_HANDSHAKE_ERROR_DENIED, "DENIED" },
+ { STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT, "SEND TIMEOUT" },
+ { STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT, "RECEIVE TIMEOUT" },
+ { STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE, "INVALID CERTIFICATE" },
+ { STREAM_HANDSHAKE_ERROR_SSL_ERROR, "SSL ERROR" },
+ { STREAM_HANDSHAKE_ERROR_CANT_CONNECT, "CANT CONNECT" },
+ { STREAM_HANDSHAKE_BUSY_TRY_LATER, "BUSY TRY LATER" },
+ { STREAM_HANDSHAKE_INTERNAL_ERROR, "INTERNAL ERROR" },
+ { STREAM_HANDSHAKE_INITIALIZATION, "INITIALIZING" },
+ { 0, NULL },
+};
+
+const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error) {
+ for(size_t i = 0; handshake_errors[i].str ; i++) {
+ if(handshake_error == handshake_errors[i].err)
+ return handshake_errors[i].str;
+ }
+
+ return "";
+}
+
+static struct {
+ STREAM_CAPABILITIES cap;
+ const char *str;
+} capability_names[] = {
+ { STREAM_CAP_V1, "V1" },
+ { STREAM_CAP_V2, "V2" },
+ { STREAM_CAP_VN, "VN" },
+ { STREAM_CAP_VCAPS, "VCAPS" },
+ { STREAM_CAP_HLABELS, "HLABELS" },
+ { STREAM_CAP_CLAIM, "CLAIM" },
+ { STREAM_CAP_CLABELS, "CLABELS" },
+ { STREAM_CAP_COMPRESSION, "COMPRESSION" },
+ { STREAM_CAP_FUNCTIONS, "FUNCTIONS" },
+ { STREAM_CAP_REPLICATION, "REPLICATION" },
+ { STREAM_CAP_BINARY, "BINARY" },
+ { STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
+ { STREAM_CAP_IEEE754, "IEEE754" },
+ { 0 , NULL },
+};
+
static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
- if(caps & STREAM_CAP_V1) buffer_strcat(wb, "V1 ");
- if(caps & STREAM_CAP_V2) buffer_strcat(wb, "V2 ");
- if(caps & STREAM_CAP_VN) buffer_strcat(wb, "VN ");
- if(caps & STREAM_CAP_VCAPS) buffer_strcat(wb, "VCAPS ");
- if(caps & STREAM_CAP_HLABELS) buffer_strcat(wb, "HLABELS ");
- if(caps & STREAM_CAP_CLAIM) buffer_strcat(wb, "CLAIM ");
- if(caps & STREAM_CAP_CLABELS) buffer_strcat(wb, "CLABELS ");
- if(caps & STREAM_CAP_COMPRESSION) buffer_strcat(wb, "COMPRESSION ");
- if(caps & STREAM_CAP_FUNCTIONS) buffer_strcat(wb, "FUNCTIONS ");
- if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION ");
- if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY ");
- if(caps & STREAM_CAP_INTERPOLATED) buffer_strcat(wb, "INTERPOLATED ");
- if(caps & STREAM_CAP_IEEE754) buffer_strcat(wb, "IEEE754 ");
+ for(size_t i = 0; capability_names[i].str ; i++) {
+ if(caps & capability_names[i].cap) {
+ buffer_strcat(wb, capability_names[i].str);
+ buffer_strcat(wb, " ");
+ }
+ }
+}
+
+void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key) {
+ buffer_json_member_add_array(wb, key);
+
+ for(size_t i = 0; capability_names[i].str ; i++) {
+ if(caps & capability_names[i].cap)
+ buffer_json_add_array_item_string(wb, capability_names[i].str);
+ }
+
+ buffer_json_array_close(wb);
}
void log_receiver_capabilities(struct receiver_state *rpt) {
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index ff8958440..f97c8ddfb 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -72,6 +72,9 @@ STREAM_CAPABILITIES stream_our_capabilities();
#define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
#define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
#define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
+#define START_STREAMING_ERROR_BUSY_TRY_LATER "The server is too busy now to accept this request. Try later."
+#define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later."
+#define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later."
typedef enum {
STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
@@ -87,12 +90,27 @@ typedef enum {
STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
- STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9
+ STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9,
+ STREAM_HANDSHAKE_BUSY_TRY_LATER = -10,
+ STREAM_HANDSHAKE_INTERNAL_ERROR = -11,
+ STREAM_HANDSHAKE_INITIALIZATION = -12,
} STREAM_HANDSHAKE;
// ----------------------------------------------------------------------------
+typedef enum __attribute__((packed)) {
+ STREAM_TRAFFIC_TYPE_REPLICATION,
+ STREAM_TRAFFIC_TYPE_FUNCTIONS,
+ STREAM_TRAFFIC_TYPE_METADATA,
+ STREAM_TRAFFIC_TYPE_DATA,
+
+ // terminator
+ STREAM_TRAFFIC_TYPE_MAX,
+} STREAM_TRAFFIC_TYPE;
+
+// ----------------------------------------------------------------------------
+
typedef struct {
char *os_name;
char *os_id;
@@ -148,6 +166,7 @@ struct sender_state {
size_t sent_bytes_on_this_connection;
size_t send_attempts;
time_t last_traffic_seen_t;
+ time_t last_state_since_t; // the timestamp of the last state (online/offline) change
size_t not_connected_loops;
// Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
// the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
@@ -157,6 +176,8 @@ struct sender_state {
int read_len;
STREAM_CAPABILITIES capabilities;
+ size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
+
int rrdpush_sender_pipe[2]; // collector to sender thread signaling
int rrdpush_sender_socket;
@@ -166,7 +187,7 @@ struct sender_state {
struct compressor_state *compressor;
#endif
#ifdef ENABLE_HTTPS
- struct netdata_ssl ssl; // structure used to encrypt the connection
+ NETDATA_SSL ssl; // structure used to encrypt the connection
#endif
struct {
@@ -176,6 +197,8 @@ struct sender_state {
struct {
DICTIONARY *requests; // de-duplication of replication requests, per chart
+ time_t oldest_request_after_t; // the timestamp of the oldest replication request
+ time_t latest_completed_before_t; // the timestamp of the latest replication request
struct {
size_t pending_requests; // the currently outstanding replication requests
@@ -221,6 +244,7 @@ struct sender_state {
struct receiver_state {
RRDHOST *host;
+ pid_t tid;
netdata_thread_t thread;
int fd;
char *key;
@@ -266,7 +290,7 @@ struct receiver_state {
} config;
#ifdef ENABLE_HTTPS
- struct netdata_ssl ssl;
+ NETDATA_SSL ssl;
#endif
#ifdef ENABLE_COMPRESSION
unsigned int rrdpush_compression;
@@ -278,8 +302,10 @@ struct receiver_state {
struct rrdpush_destinations {
STRING *destination;
+ bool ssl;
const char *last_error;
+ time_t last_attempt;
time_t postpone_reconnection_until;
STREAM_HANDSHAKE last_handshake;
@@ -303,7 +329,7 @@ void rrdpush_destinations_init(RRDHOST *host);
void rrdpush_destinations_free(RRDHOST *host);
BUFFER *sender_start(struct sender_state *s);
-void sender_commit(struct sender_state *s, BUFFER *wb);
+void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type);
int rrdpush_init();
bool rrdpush_receiver_needs_dbengine();
int configured_as_parent();
@@ -351,7 +377,9 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
struct compressor_state *create_compressor();
struct decompressor_state *create_decompressor();
#endif
-
+void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
+const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
+void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key);
void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
void log_receiver_capabilities(struct receiver_state *rpt);
void log_sender_capabilities(struct sender_state *s);
@@ -363,6 +391,9 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason);
void sender_thread_buffer_free(void);
+void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
+void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
+
#include "replication.h"
#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
index 179c2dc60..c74c9b407 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -29,7 +29,6 @@
#endif
extern struct config stream_config;
-extern int netdata_use_ssl_on_stream;
extern char *netdata_ssl_ca_path;
extern char *netdata_ssl_ca_file;
@@ -85,7 +84,7 @@ static inline void deactivate_compression(struct sender_state *s) {
#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
// Collector thread finishing a transmission
-void sender_commit(struct sender_state *s, BUFFER *wb) {
+void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) {
if(unlikely(wb != sender_thread_buffer))
fatal("STREAMING: sender is trying to commit a buffer that is not this thread's buffer.");
@@ -164,6 +163,8 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
s->flags |= SENDER_FLAG_OVERFLOW;
+ else
+ s->sent_bytes_on_this_connection_per_type[type] += dst_len;
src = src + size_to_compress;
src_len -= size_to_compress;
@@ -171,9 +172,13 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
}
else if(cbuffer_add_unsafe(s->buffer, src, src_len))
s->flags |= SENDER_FLAG_OVERFLOW;
+ else
+ s->sent_bytes_on_this_connection_per_type[type] += src_len;
#else
if(cbuffer_add_unsafe(s->buffer, src, src_len))
s->flags |= SENDER_FLAG_OVERFLOW;
+ else
+ s->sent_bytes_on_this_connection_per_type[type] += src_len;
#endif
replication_recalculate_buffer_used_ratio_unsafe(s);
@@ -205,7 +210,7 @@ void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQU
if(rrdhost_can_send_definitions_to_parent(host)) {
BUFFER *wb = sender_start(host->sender);
rrdpush_sender_add_host_variable_to_buffer(wb, rva);
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
}
}
@@ -234,7 +239,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
};
int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp);
(void)ret;
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
@@ -320,6 +325,10 @@ static void rrdpush_sender_after_connect(RRDHOST *host) {
}
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
+#ifdef ENABLE_HTTPS
+ netdata_ssl_close(&host->sender->ssl);
+#endif
+
if(host->sender->rrdpush_sender_socket != -1) {
close(host->sender->rrdpush_sender_socket);
host->sender->rrdpush_sender_socket = -1;
@@ -335,11 +344,11 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
{
- se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):"";
- se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):"";
- se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):"";
- se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):"";
- se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):"";
+ se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz("");
+ se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz("");
+ se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz("");
+ se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):strdupz("");
+ se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz("");
}
void rrdpush_clean_encoded(stream_encoded_t *se)
@@ -423,6 +432,33 @@ struct {
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 1 * 60, // 1 minute
},
+ {
+ .response = START_STREAMING_ERROR_BUSY_TRY_LATER,
+ .length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1,
+ .version = STREAM_HANDSHAKE_BUSY_TRY_LATER,
+ .dynamic = false,
+ .error = "remote server is currently busy, we should try later",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 2 * 60, // 2 minutes
+ },
+ {
+ .response = START_STREAMING_ERROR_INTERNAL_ERROR,
+ .length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1,
+ .version = STREAM_HANDSHAKE_INTERNAL_ERROR,
+ .dynamic = false,
+ .error = "remote server is encountered an internal error, we should try later",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 5 * 60, // 5 minutes
+ },
+ {
+ .response = START_STREAMING_ERROR_INITIALIZATION,
+ .length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1,
+ .version = STREAM_HANDSHAKE_INITIALIZATION,
+ .dynamic = false,
+ .error = "remote server is initializing, we should try later",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 2 * 60, // 2 minute
+ },
// terminator
{
@@ -480,6 +516,53 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
return false;
}
+static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
+#ifdef ENABLE_HTTPS
+ RRDHOST *host = s->host;
+ bool ssl_required = host->destination && host->destination->ssl;
+
+ netdata_ssl_close(&host->sender->ssl);
+
+ if(!ssl_required)
+ return true;
+
+ if (netdata_ssl_open(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket)) {
+ if(!netdata_ssl_connect(&host->sender->ssl)) {
+ // couldn't connect
+
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->last_error = "SSL error";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
+ return false;
+ }
+
+ if (netdata_ssl_validate_certificate_sender &&
+ security_test_certificate(host->sender->ssl.conn)) {
+ // certificate is not valid
+
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
+ error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->last_error = "invalid SSL certificate";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
+ return false;
+ }
+
+ return true;
+ }
+
+ // failed to establish connection
+ return false;
+
+#else
+ // SSL is not enabled
+ return true;
+#endif
+}
+
static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) {
struct timeval tv = {
@@ -507,35 +590,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
// info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
-#ifdef ENABLE_HTTPS
- if(netdata_ssl_client_ctx){
- host->sender->ssl.flags = NETDATA_SSL_START;
- if (!host->sender->ssl.conn){
- host->sender->ssl.conn = SSL_new(netdata_ssl_client_ctx);
- if(!host->sender->ssl.conn){
- error("Failed to allocate SSL structure.");
- host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
- }
- }
- else{
- SSL_clear(host->sender->ssl.conn);
- }
-
- if (host->sender->ssl.conn)
- {
- if (SSL_set_fd(host->sender->ssl.conn, s->rrdpush_sender_socket) != 1) {
- error("Failed to set the socket to the SSL on socket fd %d.", s->rrdpush_sender_socket);
- host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
- } else{
- host->sender->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
- }
- }
- }
- else {
- host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
- }
-#endif
-
// reset our capabilities to default
s->capabilities = stream_our_capabilities();
@@ -651,43 +705,8 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
http[eol] = 0x00;
rrdpush_clean_encoded(&se);
-#ifdef ENABLE_HTTPS
- if (!host->sender->ssl.flags) {
- ERR_clear_error();
- SSL_set_connect_state(host->sender->ssl.conn);
- int err = SSL_connect(host->sender->ssl.conn);
- if (err != 1){
- err = SSL_get_error(host->sender->ssl.conn, err);
- error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->sender->ssl.conn,err),NULL));
- if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
- rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = "SSL error";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
- host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
- return false;
- }
- else {
- host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
- }
- }
- else {
- if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
- if (netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE) {
- if ( security_test_certificate(host->sender->ssl.conn)) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
- error("Closing the stream connection, because the server SSL certificate is not valid.");
- rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = "invalid SSL certificate";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
- host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
- return false;
- }
- }
- }
- }
- }
-#endif
+ if(!rrdpush_sender_connect_ssl(s))
+ return false;
ssize_t bytes;
@@ -733,6 +752,12 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
return false;
}
+ if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
+ error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
+
+ if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
+ error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
+
http[bytes] = '\0';
debug(D_STREAM, "Response to sender from far end: %s", http);
if(!rrdpush_sender_validate_response(host, s, http, bytes))
@@ -749,12 +774,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
log_sender_capabilities(s);
- if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
-
- if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
-
debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
return true;
@@ -764,6 +783,10 @@ static bool attempt_to_connect(struct sender_state *state)
{
state->send_attempts = 0;
+ // reset the bytes we have sent for this session
+ state->sent_bytes_on_this_connection = 0;
+ memset(state->sent_bytes_on_this_connection_per_type, 0, sizeof(state->sent_bytes_on_this_connection_per_type));
+
if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
// reset the buffer, to properly send charts and metrics
rrdpush_sender_on_connect(state->host);
@@ -774,9 +797,6 @@ static bool attempt_to_connect(struct sender_state *state)
// make sure the next reconnection will be immediate
state->not_connected_loops = 0;
- // reset the bytes we have sent for this session
- state->sent_bytes_on_this_connection = 0;
-
// let the data collection threads know we are ready
rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
@@ -790,9 +810,6 @@ static bool attempt_to_connect(struct sender_state *state)
// increase the failed connections counter
state->not_connected_loops++;
- // reset the number of bytes sent
- state->sent_bytes_on_this_connection = 0;
-
// slow re-connection on repeating errors
usec_t now_ut = now_monotonic_usec();
usec_t end_ut = now_ut + USEC_PER_SEC * state->reconnect_delay;
@@ -819,9 +836,8 @@ static ssize_t attempt_to_send(struct sender_state *s) {
debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
#ifdef ENABLE_HTTPS
- SSL *conn = s->ssl.conn ;
- if(conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
- ret = netdata_ssl_write(conn, chunk, outstanding);
+ if(SSL_connection(&s->ssl))
+ ret = netdata_ssl_write(&s->ssl, chunk, outstanding);
else
ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
#else
@@ -852,25 +868,17 @@ static ssize_t attempt_to_send(struct sender_state *s) {
}
static ssize_t attempt_read(struct sender_state *s) {
- ssize_t ret = 0;
+ ssize_t ret;
#ifdef ENABLE_HTTPS
- if (s->ssl.conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
- size_t desired = sizeof(s->read_buffer) - s->read_len - 1;
- ret = netdata_ssl_read(s->ssl.conn, s->read_buffer, desired);
- if (ret > 0 ) {
- s->read_len += (int)ret;
- return ret;
- }
-
- if (ret == -1) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
- rrdpush_sender_thread_close_socket(s->host);
- }
- return ret;
- }
-#endif
+ if (SSL_connection(&s->ssl))
+ ret = netdata_ssl_read(&s->ssl, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1);
+ else
+ ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
+#else
ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
+#endif
+
if (ret > 0) {
s->read_len += ret;
return ret;
@@ -879,6 +887,12 @@ static ssize_t attempt_read(struct sender_state *s) {
if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
return ret;
+#ifdef ENABLE_HTTPS
+ if (SSL_connection(&s->ssl))
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
+ else
+#endif
+
if (ret == 0 || errno == ECONNRESET) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED);
error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to);
@@ -887,6 +901,7 @@ static ssize_t attempt_read(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR);
error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
}
+
rrdpush_sender_thread_close_socket(s->host);
return ret;
@@ -915,7 +930,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));
pluginsd_function_result_end_to_buffer(wb);
- sender_commit(s, wb);
+ sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
sender_thread_buffer_free();
internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).",
@@ -1083,6 +1098,119 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
}
}
+static NETDATA_DOUBLE rrdhost_sender_replication_completion(RRDHOST *host, time_t now, size_t *instances) {
+ size_t charts = rrdhost_sender_replicating_charts(host);
+ NETDATA_DOUBLE completion;
+ if(!charts || !host->sender || !host->sender->replication.oldest_request_after_t)
+ completion = 100.0;
+ else if(!host->sender->replication.latest_completed_before_t || host->sender->replication.latest_completed_before_t < host->sender->replication.oldest_request_after_t)
+ completion = 0.0;
+ else {
+ time_t total = now - host->sender->replication.oldest_request_after_t;
+ time_t current = host->sender->replication.latest_completed_before_t - host->sender->replication.oldest_request_after_t;
+ completion = (NETDATA_DOUBLE) current * 100.0 / (NETDATA_DOUBLE) total;
+ }
+
+ *instances = charts;
+
+ return completion;
+}
+
+void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) {
+ bool online = rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+ buffer_json_member_add_object(wb, key);
+
+ if(host->sender)
+ buffer_json_member_add_uint64(wb, "hops", host->sender->hops);
+
+ buffer_json_member_add_boolean(wb, "online", online);
+
+ if(host->sender && host->sender->last_state_since_t) {
+ buffer_json_member_add_time_t(wb, "since", host->sender->last_state_since_t);
+ buffer_json_member_add_time_t(wb, "age", now - host->sender->last_state_since_t);
+ }
+
+ if(!online && host->sender && host->sender->exit.reason)
+ buffer_json_member_add_string(wb, "reason", host->sender->exit.reason);
+
+ buffer_json_member_add_object(wb, "replication");
+ {
+ size_t instances;
+ NETDATA_DOUBLE completion = rrdhost_sender_replication_completion(host, now, &instances);
+ buffer_json_member_add_boolean(wb, "in_progress", instances);
+ buffer_json_member_add_double(wb, "completion", completion);
+ buffer_json_member_add_uint64(wb, "instances", instances);
+ }
+ buffer_json_object_close(wb);
+
+ if(host->sender) {
+ netdata_mutex_lock(&host->sender->mutex);
+
+ buffer_json_member_add_object(wb, "destination");
+ {
+ char buf[1024 + 1];
+ if(online && host->sender->rrdpush_sender_socket != -1) {
+ SOCKET_PEERS peers = socket_peers(host->sender->rrdpush_sender_socket);
+ bool ssl = SSL_connection(&host->sender->ssl);
+
+ snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : "");
+ buffer_json_member_add_string(wb, "local", buf);
+
+ snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : "");
+ buffer_json_member_add_string(wb, "remote", buf);
+
+ stream_capabilities_to_json_array(wb, host->sender->capabilities, "capabilities");
+
+ buffer_json_member_add_object(wb, "traffic");
+ {
+ bool compression = false;
+#ifdef ENABLE_COMPRESSION
+ compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor);
+#endif
+ buffer_json_member_add_boolean(wb, "compression", compression);
+ buffer_json_member_add_uint64(wb, "data", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_DATA]);
+ buffer_json_member_add_uint64(wb, "metadata", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_METADATA]);
+ buffer_json_member_add_uint64(wb, "functions", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_FUNCTIONS]);
+ buffer_json_member_add_uint64(wb, "replication", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_REPLICATION]);
+ }
+ buffer_json_object_close(wb); // traffic
+ }
+
+ buffer_json_member_add_array(wb, "candidates");
+ struct rrdpush_destinations *d;
+ for (d = host->destinations; d; d = d->next) {
+ buffer_json_add_array_item_object(wb);
+ {
+
+ if (d->ssl) {
+ snprintfz(buf, 1024, "%s:SSL", string2str(d->destination));
+ buffer_json_member_add_string(wb, "destination", buf);
+ }
+ else
+ buffer_json_member_add_string(wb, "destination", string2str(d->destination));
+
+ buffer_json_member_add_time_t(wb, "last_check", d->last_attempt);
+ buffer_json_member_add_time_t(wb, "age", now - d->last_attempt);
+ buffer_json_member_add_string(wb, "last_error", d->last_error);
+ buffer_json_member_add_string(wb, "last_handshake",
+ stream_handshake_error_to_string(d->last_handshake));
+ buffer_json_member_add_time_t(wb, "next_check", d->postpone_reconnection_until);
+ buffer_json_member_add_time_t(wb, "next_in",
+ (d->postpone_reconnection_until > now) ?
+ d->postpone_reconnection_until - now : 0);
+ }
+ buffer_json_object_close(wb); // each candidate
+ }
+ buffer_json_array_close(wb); // candidates
+ }
+ buffer_json_object_close(wb); // destination
+
+ netdata_mutex_unlock(&host->sender->mutex);
+ }
+
+ buffer_json_object_close(wb); // streaming
+}
+
static bool rrdhost_set_sender(RRDHOST *host) {
if(unlikely(!host->sender)) return false;
@@ -1092,10 +1220,14 @@ static bool rrdhost_set_sender(RRDHOST *host) {
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
host->sender->tid = gettid();
+ host->sender->last_state_since_t = now_realtime_sec();
+ host->sender->exit.reason = NULL;
ret = true;
}
netdata_mutex_unlock(&host->sender->mutex);
+ rrdpush_reset_destinations_postpone_time(host);
+
return ret;
}
@@ -1105,9 +1237,11 @@ static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) {
if(host->sender->tid == gettid()) {
host->sender->tid = 0;
host->sender->exit.shutdown = false;
- host->sender->exit.reason = NULL;
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ host->sender->last_state_since_t = now_realtime_sec();
}
+
+ rrdpush_reset_destinations_postpone_time(host);
}
static bool rrdhost_sender_should_exit(struct sender_state *s) {
@@ -1134,7 +1268,7 @@ static bool rrdhost_sender_should_exit(struct sender_state *s) {
if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) {
if(!s->exit.reason)
- s->exit.reason = "RECEIVER LEFT";
+ s->exit.reason = "RECEIVER LEFT (ORPHAN HOST)";
return true;
}
@@ -1162,6 +1296,32 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
freez(s);
}
+void rrdpush_initialize_ssl_ctx(RRDHOST *host) {
+#ifdef ENABLE_HTTPS
+ static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
+ netdata_spinlock_lock(&sp);
+
+ if(netdata_ssl_streaming_sender_ctx || !host) {
+ netdata_spinlock_unlock(&sp);
+ return;
+ }
+
+ for(struct rrdpush_destinations *d = host->destinations; d ; d = d->next) {
+ if (d->ssl) {
+ // we need to initialize SSL
+
+ netdata_ssl_initialize_ctx(NETDATA_SSL_STREAMING_SENDER_CTX);
+ ssl_security_location_for_context(netdata_ssl_streaming_sender_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
+
+ // stop the loop
+ break;
+ }
+ }
+
+ netdata_spinlock_unlock(&sp);
+#endif
+}
+
void *rrdpush_sender_thread(void *ptr) {
worker_register("STREAMSND");
worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect");
@@ -1206,17 +1366,7 @@ void *rrdpush_sender_thread(void *ptr) {
return NULL;
}
-#ifdef ENABLE_HTTPS
- if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ) {
- static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
- netdata_spinlock_lock(&sp);
- if(!netdata_ssl_client_ctx) {
- security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING);
- ssl_security_location_for_context(netdata_ssl_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
- }
- netdata_spinlock_unlock(&sp);
- }
-#endif
+ rrdpush_initialize_ssl_ctx(s->host);
info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid());
@@ -1287,6 +1437,7 @@ void *rrdpush_sender_thread(void *ptr) {
now_s = s->last_traffic_seen_t = now_monotonic_sec();
rrdpush_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
+ s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);