diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-06-14 19:20:36 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-06-14 19:20:36 +0000 |
commit | dd24e74edfbafc09eaeb2dde0fda7eb3e1e86d0b (patch) | |
tree | 1e52f4dac2622ab377c7649f218fb49003b4cbb9 /streaming | |
parent | Releasing debian version 1.39.1-2. (diff) | |
download | netdata-dd24e74edfbafc09eaeb2dde0fda7eb3e1e86d0b.tar.xz netdata-dd24e74edfbafc09eaeb2dde0fda7eb3e1e86d0b.zip |
Merging upstream version 1.40.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/receiver.c | 152 | ||||
-rw-r--r-- | streaming/replication.c | 11 | ||||
-rw-r--r-- | streaming/rrdpush.c | 186 | ||||
-rw-r--r-- | streaming/rrdpush.h | 41 | ||||
-rw-r--r-- | streaming/sender.c | 391 |
5 files changed, 554 insertions, 227 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index ff7a95629..709f15bd5 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -31,10 +31,14 @@ void receiver_state_free(struct receiver_state *rpt) { freez(rpt->program_version); #ifdef ENABLE_HTTPS - if(rpt->ssl.conn) - SSL_free(rpt->ssl.conn); + netdata_ssl_close(&rpt->ssl); #endif + if(rpt->fd != -1) { + internal_error(true, "closing socket..."); + close(rpt->fd); + } + #ifdef ENABLE_COMPRESSION if (rpt->decompressor) rpt->decompressor->destroy(&rpt->decompressor); @@ -100,13 +104,18 @@ static int read_stream(struct receiver_state *r, char* buffer, size_t size) { return 0; } + ssize_t bytes_read; + #ifdef ENABLE_HTTPS - if (r->ssl.conn && r->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) - return (int)netdata_ssl_read(r->ssl.conn, buffer, size); + if (SSL_connection(&r->ssl)) + bytes_read = netdata_ssl_read(&r->ssl, buffer, size); + else + bytes_read = read(r->fd, buffer, size); +#else + bytes_read = read(r->fd, buffer, size); #endif - ssize_t bytes_read = read(r->fd, buffer, size); - if(bytes_read == 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) { + if((bytes_read == 0 || bytes_read == -1) && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) { error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__); bytes_read = -3; } @@ -119,23 +128,6 @@ static int read_stream(struct receiver_state *r, char* buffer, size_t size) { bytes_read = -2; } -// do { -// bytes_read = (int) fread(buffer, 1, size, fp); -// if (unlikely(bytes_read <= 0)) { -// if(feof(fp)) { -// internal_error(true, "%s(): fread() failed with EOF", __FUNCTION__); -// bytes_read = -2; -// } -// else if(ferror(fp)) { -// internal_error(true, "%s(): fread() failed with ERROR", __FUNCTION__); -// bytes_read = -3; -// } -// else bytes_read = 0; -// } -// else -// worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, bytes_read); -// } while(bytes_read == 0); - return (int)bytes_read; } @@ -323,12 +315,6 @@ static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t b return NULL; } -static void streaming_parser_thread_cleanup(void *ptr) { - PARSER *parser = (PARSER *)ptr; - rrd_collector_finished(); - parser_destroy(parser); -} - bool plugin_is_enabled(struct plugind *cd); static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) { @@ -352,7 +338,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i // this keeps the parser with its current value // so, parser needs to be allocated before pushing it - netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser); + netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); @@ -437,6 +423,58 @@ static void rrdpush_receiver_replication_reset(RRDHOST *host) { rrdhost_receiver_replicating_charts_zero(host); } +void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) { + size_t receiver_hops = host->system_info ? host->system_info->hops : (host == localhost) ? 0 : 1; + + netdata_mutex_lock(&host->receiver_lock); + + buffer_json_member_add_object(wb, key); + buffer_json_member_add_uint64(wb, "hops", receiver_hops); + + bool online = host == localhost || !rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + buffer_json_member_add_boolean(wb, "online", online); + + if(host->child_connect_time || host->child_disconnected_time) { + time_t since = MAX(host->child_connect_time, host->child_disconnected_time); + buffer_json_member_add_time_t(wb, "since", since); + buffer_json_member_add_time_t(wb, "age", now - since); + } + + if(!online && host->rrdpush_last_receiver_exit_reason) + buffer_json_member_add_string(wb, "reason", host->rrdpush_last_receiver_exit_reason); + + if(host != localhost && host->receiver) { + buffer_json_member_add_object(wb, "replication"); + { + size_t instances = rrdhost_receiver_replicating_charts(host); + buffer_json_member_add_boolean(wb, "in_progress", instances); + buffer_json_member_add_double(wb, "completion", host->rrdpush_receiver_replication_percent); + buffer_json_member_add_uint64(wb, "instances", instances); + } + buffer_json_object_close(wb); // replication + + buffer_json_member_add_object(wb, "source"); + { + + char buf[1024 + 1]; + SOCKET_PEERS peers = socket_peers(host->receiver->fd); + bool ssl = SSL_connection(&host->receiver->ssl); + + snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : ""); + buffer_json_member_add_string(wb, "local", buf); + + snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : ""); + buffer_json_member_add_string(wb, "remote", buf); + + stream_capabilities_to_json_array(wb, host->receiver->capabilities, "capabilities"); + } + buffer_json_object_close(wb); // source + } + buffer_json_object_close(wb); // collection + + netdata_mutex_unlock(&host->receiver_lock); +} + static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { bool signal_rrdcontext = false; bool set_this = false; @@ -474,6 +512,8 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); aclk_queue_node_info(rpt->host, true); + rrdpush_reset_destinations_postpone_time(host); + set_this = true; } @@ -506,16 +546,17 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { signal_rrdcontext = true; rrdpush_receiver_replication_reset(host); - if (host->receiver == rpt) - host->receiver = NULL; - rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN); + host->receiver = NULL; + host->rrdpush_last_receiver_exit_reason = rpt->exit.reason; } netdata_mutex_unlock(&host->receiver_lock); if(signal_rrdcontext) rrdcontext_host_child_disconnected(host); + + rrdpush_reset_destinations_postpone_time(host); } } @@ -549,7 +590,7 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) { "thread %d takes too long to stop, giving up..." , rrdhost_hostname(host) , host->receiver->client_ip, host->receiver->client_port - , gettid()); + , host->receiver->tid); else ret = true; @@ -558,6 +599,18 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) { return ret; } +static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *rpt, const char *msg) { + (void) send_timeout( +#ifdef ENABLE_HTTPS + &rpt->ssl, +#endif + rpt->fd, + (char *)msg, + strlen(msg), + 0, + 5); +} + void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) { log_stream_connection(rpt->client_ip, rpt->client_port, @@ -585,7 +638,7 @@ static void rrdhost_reset_destinations(RRDHOST *host) { d->postpone_reconnection_until = 0; } -static int rrdpush_receive(struct receiver_state *rpt) +static void rrdpush_receive(struct receiver_state *rpt) { rpt->config.mode = default_rrd_memory_mode; rpt->config.history = default_rrd_history_entries; @@ -689,14 +742,14 @@ static int rrdpush_receive(struct receiver_state *rpt) if(!host) { rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION"); - close(rpt->fd); - return 1; + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INTERNAL_ERROR); + goto cleanup; } if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) { rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER"); - close(rpt->fd); - return 1; + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION); + goto cleanup; } // system_info has been consumed by the host structure @@ -704,8 +757,8 @@ static int rrdpush_receive(struct receiver_state *rpt) if(!rrdhost_set_receiver(host, rpt)) { rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION"); - close(rpt->fd); - return 1; + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_ALREADY_STREAMING); + goto cleanup; } } @@ -776,15 +829,16 @@ static int rrdpush_receive(struct receiver_state *rpt) } debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); - if(send_timeout( + ssize_t bytes_sent = send_timeout( #ifdef ENABLE_HTTPS &rpt->ssl, #endif - rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { + rpt->fd, initial_response, strlen(initial_response), 0, 60); + if(bytes_sent != (ssize_t)strlen(initial_response)) { + internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response)); rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION"); - close(rpt->fd); - return 0; + goto cleanup; } } @@ -850,9 +904,8 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_set_is_parent_label(--localhost->connected_children_count); - // cleanup - close(rpt->fd); - return (int)count; +cleanup: + ; } static void rrdpush_receiver_thread_cleanup(void *ptr) { @@ -879,7 +932,8 @@ void *rrdpush_receiver_thread(void *ptr) { worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE); struct receiver_state *rpt = (struct receiver_state *)ptr; - info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); + rpt->tid = gettid(); + info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid); rrdpush_receive(rpt); diff --git a/streaming/replication.c b/streaming/replication.c index a50913a1a..c6fafc357 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -274,6 +274,12 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q, replication_queries.queries_finished += queries; replication_queries.points_read += q->points_read; replication_queries.points_generated += q->points_generated; + + if(q->st && q->st->rrdhost->sender) { + struct sender_state *s = q->st->rrdhost->sender; + s->replication.latest_completed_before_t = q->query.before; + } + netdata_spinlock_unlock(&replication_queries.spinlock); } @@ -644,7 +650,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size buffer_fast_strcat(wb, "\n", 1); worker_is_busy(WORKER_JOB_BUFFER_COMMIT); - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_REPLICATION); worker_is_busy(WORKER_JOB_CLEANUP); if(enable_streaming) { @@ -1466,6 +1472,9 @@ void replication_add_request(struct sender_state *sender, const char *chart_id, .not_indexed_preprocessing = false, }; + if(!sender->replication.oldest_request_after_t || rq.after < sender->replication.oldest_request_after_t) + sender->replication.oldest_request_after_t = rq.after; + if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) replication_execute_request(&rq, false); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 62b537f0c..c481871cc 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -49,7 +49,6 @@ bool default_rrdpush_enable_replication = true; time_t default_rrdpush_seconds_to_replicate = 86400; time_t default_rrdpush_replication_step = 600; #ifdef ENABLE_HTTPS -int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL; char *netdata_ssl_ca_path = NULL; char *netdata_ssl_ca_file = NULL; #endif @@ -137,24 +136,10 @@ int rrdpush_init() { } #ifdef ENABLE_HTTPS - if (netdata_use_ssl_on_stream == NETDATA_SSL_OPTIONAL) { - if (default_rrdpush_destination){ - char *test = strstr(default_rrdpush_destination,":SSL"); - if(test){ - *test = 0X00; - netdata_use_ssl_on_stream = NETDATA_SSL_FORCE; - } - } - } + netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate); - bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO); - - if(invalid_certificate == CONFIG_BOOLEAN_YES){ - if(netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE){ - info("Netdata is configured to accept invalid SSL certificate."); - netdata_ssl_validate_server = NETDATA_SSL_INVALID_CERTIFICATE; - } - } + if(!netdata_ssl_validate_certificate_sender) + info("SSL: streaming senders will skip SSL certificates verification."); netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL); netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL); @@ -390,7 +375,7 @@ bool rrdset_push_chart_definition_now(RRDSET *st) { BUFFER *wb = sender_start(host->sender); rrdpush_send_chart_definition(wb, st); - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); return true; @@ -458,7 +443,7 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); } - sender_commit(st->rrdhost->sender, rsb->wb); + sender_commit(st->rrdhost->sender, rsb->wb, STREAM_TRAFFIC_TYPE_DATA); *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, }; } @@ -498,7 +483,7 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock if(unlikely(!exposed_upstream)) { BUFFER *wb = sender_start(host->sender); replication_in_progress = rrdpush_send_chart_definition(wb, st); - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); } if(replication_in_progress) @@ -529,7 +514,7 @@ void rrdpush_send_host_labels(RRDHOST *host) { rrdlabels_walkthrough_read(host->rrdlabels, send_labels_callback, wb); buffer_sprintf(wb, "OVERWRITE %s\n", "labels"); - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); } @@ -548,7 +533,7 @@ void rrdpush_claimed_id(RRDHOST *host) buffer_sprintf(wb, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") ); rrdhost_aclk_state_unlock(host); - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); } @@ -579,6 +564,7 @@ int connect_to_one_of_destinations( if (reconnects_counter) *reconnects_counter += 1; + d->last_attempt = now; sock = connect_to_this(string2str(d->destination), default_port, timeout); if (sock != -1) { @@ -610,6 +596,14 @@ bool destinations_init_add_one(char *entry, void *data) { struct destinations_init_tmp *t = data; struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations)); + char *colon_ssl = strstr(entry, ":SSL"); + if(colon_ssl) { + *colon_ssl = '\0'; + d->ssl = true; + } + else + d->ssl = false; + d->destination = string_strdupz(entry); __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED); @@ -712,7 +706,7 @@ int rrdpush_receiver_permission_denied(struct web_client *w) { // we always respond with the same message and error code // to prevent an attacker from gaining info about the error buffer_flush(w->response.data); - buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info."); + buffer_strcat(w->response.data, START_STREAMING_ERROR_NOT_PERMITTED); return HTTP_RESP_UNAUTHORIZED; } @@ -720,10 +714,35 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) { // we always respond with the same message and error code // to prevent an attacker from gaining info about the error buffer_flush(w->response.data); - buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later."); + buffer_strcat(w->response.data, START_STREAMING_ERROR_BUSY_TRY_LATER); return HTTP_RESP_SERVICE_UNAVAILABLE; } +static void rrdpush_receiver_takeover_web_connection(struct web_client *w, struct receiver_state *rpt) { + rpt->fd = w->ifd; + +#ifdef ENABLE_HTTPS + rpt->ssl.conn = w->ssl.conn; + rpt->ssl.state = w->ssl.state; + + w->ssl = NETDATA_SSL_UNSET_CONNECTION; +#endif + + WEB_CLIENT_IS_DEAD(w); + + if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) { + web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET); + } + else { + if(w->ifd == w->ofd) + w->ifd = w->ofd = -1; + else + w->ifd = -1; + } + + buffer_flush(w->response.data); +} + void *rrdpush_receiver_thread(void *ptr); int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) { @@ -741,20 +760,16 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info)); rpt->system_info->hops = rpt->hops; - rpt->fd = w->ifd; + rpt->fd = -1; rpt->client_ip = strdupz(w->client_ip); rpt->client_port = strdupz(w->client_port); - rpt->config.update_every = default_rrd_update_every; - #ifdef ENABLE_HTTPS - rpt->ssl.conn = w->ssl.conn; - rpt->ssl.flags = w->ssl.flags; - - w->ssl.conn = NULL; - w->ssl.flags = NETDATA_SSL_START; + rpt->ssl = NETDATA_SSL_UNSET_CONNECTION; #endif + rpt->config.update_every = default_rrd_update_every; + // parse the parameters and fill rpt and rpt->system_info while(decoded_query_string) { @@ -1011,6 +1026,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { + rrdpush_receiver_takeover_web_connection(w, rpt); + rrdpush_receive_log_status( rpt, "machine GUID is my own", @@ -1032,9 +1049,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri ); } - close(rpt->fd); receiver_state_free(rpt); - return web_client_socket_is_now_used_for_streaming(w); + return HTTP_RESP_OK; } if(unlikely(web_client_streaming_rate_t > 0)) { @@ -1130,7 +1146,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up buffer_flush(w->response.data); - buffer_strcat(w->response.data, "This GUID is already streaming to this server"); + buffer_strcat(w->response.data, START_STREAMING_ERROR_ALREADY_STREAMING); receiver_state_free(rpt); return HTTP_RESP_CONFLICT; } @@ -1138,8 +1154,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri debug(D_SYSTEM, "starting STREAM receive thread."); - char tag[FILENAME_MAX + 1]; - snprintfz(tag, FILENAME_MAX, THREAD_TAG_STREAM_RECEIVER "[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port); + rrdpush_receiver_takeover_web_connection(w, rpt); + + char tag[NETDATA_THREAD_TAG_MAX + 1]; + snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_RECEIVER "[%s]", rpt->hostname); + tag[NETDATA_THREAD_TAG_MAX] = '\0'; if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) { rrdpush_receive_log_status( @@ -1154,23 +1173,86 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri } // prevent the caller from closing the streaming socket - return web_client_socket_is_now_used_for_streaming(w); + return HTTP_RESP_OK; +} + +void rrdpush_reset_destinations_postpone_time(RRDHOST *host) { + struct rrdpush_destinations *d; + for (d = host->destinations; d; d = d->next) + d->postpone_reconnection_until = 0; } +static struct { + STREAM_HANDSHAKE err; + const char *str; +} handshake_errors[] = { + { STREAM_HANDSHAKE_OK_V5, "OK_V5" }, + { STREAM_HANDSHAKE_OK_V4, "OK_V4" }, + { STREAM_HANDSHAKE_OK_V3, "OK_V3" }, + { STREAM_HANDSHAKE_OK_V2, "OK_V2" }, + { STREAM_HANDSHAKE_OK_V1, "OK_V1" }, + { STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, "BAD HANDSHAKE" }, + { STREAM_HANDSHAKE_ERROR_LOCALHOST, "LOCALHOST" }, + { STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, "ALREADY CONNECTED" }, + { STREAM_HANDSHAKE_ERROR_DENIED, "DENIED" }, + { STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT, "SEND TIMEOUT" }, + { STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT, "RECEIVE TIMEOUT" }, + { STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE, "INVALID CERTIFICATE" }, + { STREAM_HANDSHAKE_ERROR_SSL_ERROR, "SSL ERROR" }, + { STREAM_HANDSHAKE_ERROR_CANT_CONNECT, "CANT CONNECT" }, + { STREAM_HANDSHAKE_BUSY_TRY_LATER, "BUSY TRY LATER" }, + { STREAM_HANDSHAKE_INTERNAL_ERROR, "INTERNAL ERROR" }, + { STREAM_HANDSHAKE_INITIALIZATION, "INITIALIZING" }, + { 0, NULL }, +}; + +const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error) { + for(size_t i = 0; handshake_errors[i].str ; i++) { + if(handshake_error == handshake_errors[i].err) + return handshake_errors[i].str; + } + + return ""; +} + +static struct { + STREAM_CAPABILITIES cap; + const char *str; +} capability_names[] = { + { STREAM_CAP_V1, "V1" }, + { STREAM_CAP_V2, "V2" }, + { STREAM_CAP_VN, "VN" }, + { STREAM_CAP_VCAPS, "VCAPS" }, + { STREAM_CAP_HLABELS, "HLABELS" }, + { STREAM_CAP_CLAIM, "CLAIM" }, + { STREAM_CAP_CLABELS, "CLABELS" }, + { STREAM_CAP_COMPRESSION, "COMPRESSION" }, + { STREAM_CAP_FUNCTIONS, "FUNCTIONS" }, + { STREAM_CAP_REPLICATION, "REPLICATION" }, + { STREAM_CAP_BINARY, "BINARY" }, + { STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, + { STREAM_CAP_IEEE754, "IEEE754" }, + { 0 , NULL }, +}; + static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) { - if(caps & STREAM_CAP_V1) buffer_strcat(wb, "V1 "); - if(caps & STREAM_CAP_V2) buffer_strcat(wb, "V2 "); - if(caps & STREAM_CAP_VN) buffer_strcat(wb, "VN "); - if(caps & STREAM_CAP_VCAPS) buffer_strcat(wb, "VCAPS "); - if(caps & STREAM_CAP_HLABELS) buffer_strcat(wb, "HLABELS "); - if(caps & STREAM_CAP_CLAIM) buffer_strcat(wb, "CLAIM "); - if(caps & STREAM_CAP_CLABELS) buffer_strcat(wb, "CLABELS "); - if(caps & STREAM_CAP_COMPRESSION) buffer_strcat(wb, "COMPRESSION "); - if(caps & STREAM_CAP_FUNCTIONS) buffer_strcat(wb, "FUNCTIONS "); - if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION "); - if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY "); - if(caps & STREAM_CAP_INTERPOLATED) buffer_strcat(wb, "INTERPOLATED "); - if(caps & STREAM_CAP_IEEE754) buffer_strcat(wb, "IEEE754 "); + for(size_t i = 0; capability_names[i].str ; i++) { + if(caps & capability_names[i].cap) { + buffer_strcat(wb, capability_names[i].str); + buffer_strcat(wb, " "); + } + } +} + +void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key) { + buffer_json_member_add_array(wb, key); + + for(size_t i = 0; capability_names[i].str ; i++) { + if(caps & capability_names[i].cap) + buffer_json_add_array_item_string(wb, capability_names[i].str); + } + + buffer_json_array_close(wb); } void log_receiver_capabilities(struct receiver_state *rpt) { diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index ff8958440..f97c8ddfb 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -72,6 +72,9 @@ STREAM_CAPABILITIES stream_our_capabilities(); #define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back" #define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server" #define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info." +#define START_STREAMING_ERROR_BUSY_TRY_LATER "The server is too busy now to accept this request. Try later." +#define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later." +#define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later." typedef enum { STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION @@ -87,12 +90,27 @@ typedef enum { STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6, STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7, STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8, - STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9 + STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9, + STREAM_HANDSHAKE_BUSY_TRY_LATER = -10, + STREAM_HANDSHAKE_INTERNAL_ERROR = -11, + STREAM_HANDSHAKE_INITIALIZATION = -12, } STREAM_HANDSHAKE; // ---------------------------------------------------------------------------- +typedef enum __attribute__((packed)) { + STREAM_TRAFFIC_TYPE_REPLICATION, + STREAM_TRAFFIC_TYPE_FUNCTIONS, + STREAM_TRAFFIC_TYPE_METADATA, + STREAM_TRAFFIC_TYPE_DATA, + + // terminator + STREAM_TRAFFIC_TYPE_MAX, +} STREAM_TRAFFIC_TYPE; + +// ---------------------------------------------------------------------------- + typedef struct { char *os_name; char *os_id; @@ -148,6 +166,7 @@ struct sender_state { size_t sent_bytes_on_this_connection; size_t send_attempts; time_t last_traffic_seen_t; + time_t last_state_since_t; // the timestamp of the last state (online/offline) change size_t not_connected_loops; // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here. @@ -157,6 +176,8 @@ struct sender_state { int read_len; STREAM_CAPABILITIES capabilities; + size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX]; + int rrdpush_sender_pipe[2]; // collector to sender thread signaling int rrdpush_sender_socket; @@ -166,7 +187,7 @@ struct sender_state { struct compressor_state *compressor; #endif #ifdef ENABLE_HTTPS - struct netdata_ssl ssl; // structure used to encrypt the connection + NETDATA_SSL ssl; // structure used to encrypt the connection #endif struct { @@ -176,6 +197,8 @@ struct sender_state { struct { DICTIONARY *requests; // de-duplication of replication requests, per chart + time_t oldest_request_after_t; // the timestamp of the oldest replication request + time_t latest_completed_before_t; // the timestamp of the latest replication request struct { size_t pending_requests; // the currently outstanding replication requests @@ -221,6 +244,7 @@ struct sender_state { struct receiver_state { RRDHOST *host; + pid_t tid; netdata_thread_t thread; int fd; char *key; @@ -266,7 +290,7 @@ struct receiver_state { } config; #ifdef ENABLE_HTTPS - struct netdata_ssl ssl; + NETDATA_SSL ssl; #endif #ifdef ENABLE_COMPRESSION unsigned int rrdpush_compression; @@ -278,8 +302,10 @@ struct receiver_state { struct rrdpush_destinations { STRING *destination; + bool ssl; const char *last_error; + time_t last_attempt; time_t postpone_reconnection_until; STREAM_HANDSHAKE last_handshake; @@ -303,7 +329,7 @@ void rrdpush_destinations_init(RRDHOST *host); void rrdpush_destinations_free(RRDHOST *host); BUFFER *sender_start(struct sender_state *s); -void sender_commit(struct sender_state *s, BUFFER *wb); +void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type); int rrdpush_init(); bool rrdpush_receiver_needs_dbengine(); int configured_as_parent(); @@ -351,7 +377,9 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s); struct compressor_state *create_compressor(); struct decompressor_state *create_decompressor(); #endif - +void rrdpush_reset_destinations_postpone_time(RRDHOST *host); +const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error); +void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key); void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status); void log_receiver_capabilities(struct receiver_state *rpt); void log_sender_capabilities(struct sender_state *s); @@ -363,6 +391,9 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason); void sender_thread_buffer_free(void); +void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused); +void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused); + #include "replication.h" #endif //NETDATA_RRDPUSH_H diff --git a/streaming/sender.c b/streaming/sender.c index 179c2dc60..c74c9b407 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -29,7 +29,6 @@ #endif extern struct config stream_config; -extern int netdata_use_ssl_on_stream; extern char *netdata_ssl_ca_path; extern char *netdata_ssl_ca_file; @@ -85,7 +84,7 @@ static inline void deactivate_compression(struct sender_state *s) { #define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3 // Collector thread finishing a transmission -void sender_commit(struct sender_state *s, BUFFER *wb) { +void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) { if(unlikely(wb != sender_thread_buffer)) fatal("STREAMING: sender is trying to commit a buffer that is not this thread's buffer."); @@ -164,6 +163,8 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { if(cbuffer_add_unsafe(s->buffer, dst, dst_len)) s->flags |= SENDER_FLAG_OVERFLOW; + else + s->sent_bytes_on_this_connection_per_type[type] += dst_len; src = src + size_to_compress; src_len -= size_to_compress; @@ -171,9 +172,13 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { } else if(cbuffer_add_unsafe(s->buffer, src, src_len)) s->flags |= SENDER_FLAG_OVERFLOW; + else + s->sent_bytes_on_this_connection_per_type[type] += src_len; #else if(cbuffer_add_unsafe(s->buffer, src, src_len)) s->flags |= SENDER_FLAG_OVERFLOW; + else + s->sent_bytes_on_this_connection_per_type[type] += src_len; #endif replication_recalculate_buffer_used_ratio_unsafe(s); @@ -205,7 +210,7 @@ void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQU if(rrdhost_can_send_definitions_to_parent(host)) { BUFFER *wb = sender_start(host->sender); rrdpush_sender_add_host_variable_to_buffer(wb, rva); - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); } } @@ -234,7 +239,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { }; int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp); (void)ret; - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); @@ -320,6 +325,10 @@ static void rrdpush_sender_after_connect(RRDHOST *host) { } static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { +#ifdef ENABLE_HTTPS + netdata_ssl_close(&host->sender->ssl); +#endif + if(host->sender->rrdpush_sender_socket != -1) { close(host->sender->rrdpush_sender_socket); host->sender->rrdpush_sender_socket = -1; @@ -335,11 +344,11 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) { - se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):""; - se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):""; - se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):""; - se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):""; - se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):""; + se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz(""); + se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz(""); + se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz(""); + se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):strdupz(""); + se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz(""); } void rrdpush_clean_encoded(stream_encoded_t *se) @@ -423,6 +432,33 @@ struct { .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 1 * 60, // 1 minute }, + { + .response = START_STREAMING_ERROR_BUSY_TRY_LATER, + .length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1, + .version = STREAM_HANDSHAKE_BUSY_TRY_LATER, + .dynamic = false, + .error = "remote server is currently busy, we should try later", + .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, + .postpone_reconnect_seconds = 2 * 60, // 2 minutes + }, + { + .response = START_STREAMING_ERROR_INTERNAL_ERROR, + .length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1, + .version = STREAM_HANDSHAKE_INTERNAL_ERROR, + .dynamic = false, + .error = "remote server is encountered an internal error, we should try later", + .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, + .postpone_reconnect_seconds = 5 * 60, // 5 minutes + }, + { + .response = START_STREAMING_ERROR_INITIALIZATION, + .length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1, + .version = STREAM_HANDSHAKE_INITIALIZATION, + .dynamic = false, + .error = "remote server is initializing, we should try later", + .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, + .postpone_reconnect_seconds = 2 * 60, // 2 minute + }, // terminator { @@ -480,6 +516,53 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender return false; } +static bool rrdpush_sender_connect_ssl(struct sender_state *s) { +#ifdef ENABLE_HTTPS + RRDHOST *host = s->host; + bool ssl_required = host->destination && host->destination->ssl; + + netdata_ssl_close(&host->sender->ssl); + + if(!ssl_required) + return true; + + if (netdata_ssl_open(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket)) { + if(!netdata_ssl_connect(&host->sender->ssl)) { + // couldn't connect + + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); + rrdpush_sender_thread_close_socket(host); + host->destination->last_error = "SSL error"; + host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR; + host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; + return false; + } + + if (netdata_ssl_validate_certificate_sender && + security_test_certificate(host->sender->ssl.conn)) { + // certificate is not valid + + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); + error("SSL: closing the stream connection, because the server SSL certificate is not valid."); + rrdpush_sender_thread_close_socket(host); + host->destination->last_error = "invalid SSL certificate"; + host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE; + host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; + return false; + } + + return true; + } + + // failed to establish connection + return false; + +#else + // SSL is not enabled + return true; +#endif +} + static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) { struct timeval tv = { @@ -507,35 +590,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p // info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to); -#ifdef ENABLE_HTTPS - if(netdata_ssl_client_ctx){ - host->sender->ssl.flags = NETDATA_SSL_START; - if (!host->sender->ssl.conn){ - host->sender->ssl.conn = SSL_new(netdata_ssl_client_ctx); - if(!host->sender->ssl.conn){ - error("Failed to allocate SSL structure."); - host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; - } - } - else{ - SSL_clear(host->sender->ssl.conn); - } - - if (host->sender->ssl.conn) - { - if (SSL_set_fd(host->sender->ssl.conn, s->rrdpush_sender_socket) != 1) { - error("Failed to set the socket to the SSL on socket fd %d.", s->rrdpush_sender_socket); - host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; - } else{ - host->sender->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE; - } - } - } - else { - host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; - } -#endif - // reset our capabilities to default s->capabilities = stream_our_capabilities(); @@ -651,43 +705,8 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p http[eol] = 0x00; rrdpush_clean_encoded(&se); -#ifdef ENABLE_HTTPS - if (!host->sender->ssl.flags) { - ERR_clear_error(); - SSL_set_connect_state(host->sender->ssl.conn); - int err = SSL_connect(host->sender->ssl.conn); - if (err != 1){ - err = SSL_get_error(host->sender->ssl.conn, err); - error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->sender->ssl.conn,err),NULL)); - if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) { - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); - rrdpush_sender_thread_close_socket(host); - host->destination->last_error = "SSL error"; - host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR; - host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; - return false; - } - else { - host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; - } - } - else { - if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) { - if (netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE) { - if ( security_test_certificate(host->sender->ssl.conn)) { - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); - error("Closing the stream connection, because the server SSL certificate is not valid."); - rrdpush_sender_thread_close_socket(host); - host->destination->last_error = "invalid SSL certificate"; - host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE; - host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; - return false; - } - } - } - } - } -#endif + if(!rrdpush_sender_connect_ssl(s)) + return false; ssize_t bytes; @@ -733,6 +752,12 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p return false; } + if(sock_setnonblock(s->rrdpush_sender_socket) < 0) + error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); + + if(sock_enlarge_out(s->rrdpush_sender_socket) < 0) + error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to); + http[bytes] = '\0'; debug(D_STREAM, "Response to sender from far end: %s", http); if(!rrdpush_sender_validate_response(host, s, http, bytes)) @@ -749,12 +774,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p log_sender_capabilities(s); - if(sock_setnonblock(s->rrdpush_sender_socket) < 0) - error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); - - if(sock_enlarge_out(s->rrdpush_sender_socket) < 0) - error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to); - debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket); return true; @@ -764,6 +783,10 @@ static bool attempt_to_connect(struct sender_state *state) { state->send_attempts = 0; + // reset the bytes we have sent for this session + state->sent_bytes_on_this_connection = 0; + memset(state->sent_bytes_on_this_connection_per_type, 0, sizeof(state->sent_bytes_on_this_connection_per_type)); + if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) { // reset the buffer, to properly send charts and metrics rrdpush_sender_on_connect(state->host); @@ -774,9 +797,6 @@ static bool attempt_to_connect(struct sender_state *state) // make sure the next reconnection will be immediate state->not_connected_loops = 0; - // reset the bytes we have sent for this session - state->sent_bytes_on_this_connection = 0; - // let the data collection threads know we are ready rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); @@ -790,9 +810,6 @@ static bool attempt_to_connect(struct sender_state *state) // increase the failed connections counter state->not_connected_loops++; - // reset the number of bytes sent - state->sent_bytes_on_this_connection = 0; - // slow re-connection on repeating errors usec_t now_ut = now_monotonic_usec(); usec_t end_ut = now_ut + USEC_PER_SEC * state->reconnect_delay; @@ -819,9 +836,8 @@ static ssize_t attempt_to_send(struct sender_state *s) { debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); #ifdef ENABLE_HTTPS - SSL *conn = s->ssl.conn ; - if(conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) - ret = netdata_ssl_write(conn, chunk, outstanding); + if(SSL_connection(&s->ssl)) + ret = netdata_ssl_write(&s->ssl, chunk, outstanding); else ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT); #else @@ -852,25 +868,17 @@ static ssize_t attempt_to_send(struct sender_state *s) { } static ssize_t attempt_read(struct sender_state *s) { - ssize_t ret = 0; + ssize_t ret; #ifdef ENABLE_HTTPS - if (s->ssl.conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { - size_t desired = sizeof(s->read_buffer) - s->read_len - 1; - ret = netdata_ssl_read(s->ssl.conn, s->read_buffer, desired); - if (ret > 0 ) { - s->read_len += (int)ret; - return ret; - } - - if (ret == -1) { - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); - rrdpush_sender_thread_close_socket(s->host); - } - return ret; - } -#endif + if (SSL_connection(&s->ssl)) + ret = netdata_ssl_read(&s->ssl, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1); + else + ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT); +#else ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT); +#endif + if (ret > 0) { s->read_len += ret; return ret; @@ -879,6 +887,12 @@ static ssize_t attempt_read(struct sender_state *s) { if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) return ret; +#ifdef ENABLE_HTTPS + if (SSL_connection(&s->ssl)) + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); + else +#endif + if (ret == 0 || errno == ECONNRESET) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED); error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to); @@ -887,6 +901,7 @@ static ssize_t attempt_read(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR); error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret); } + rrdpush_sender_thread_close_socket(s->host); return ret; @@ -915,7 +930,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb)); pluginsd_function_result_end_to_buffer(wb); - sender_commit(s, wb); + sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); sender_thread_buffer_free(); internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).", @@ -1083,6 +1098,119 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) { } } +static NETDATA_DOUBLE rrdhost_sender_replication_completion(RRDHOST *host, time_t now, size_t *instances) { + size_t charts = rrdhost_sender_replicating_charts(host); + NETDATA_DOUBLE completion; + if(!charts || !host->sender || !host->sender->replication.oldest_request_after_t) + completion = 100.0; + else if(!host->sender->replication.latest_completed_before_t || host->sender->replication.latest_completed_before_t < host->sender->replication.oldest_request_after_t) + completion = 0.0; + else { + time_t total = now - host->sender->replication.oldest_request_after_t; + time_t current = host->sender->replication.latest_completed_before_t - host->sender->replication.oldest_request_after_t; + completion = (NETDATA_DOUBLE) current * 100.0 / (NETDATA_DOUBLE) total; + } + + *instances = charts; + + return completion; +} + +void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) { + bool online = rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); + buffer_json_member_add_object(wb, key); + + if(host->sender) + buffer_json_member_add_uint64(wb, "hops", host->sender->hops); + + buffer_json_member_add_boolean(wb, "online", online); + + if(host->sender && host->sender->last_state_since_t) { + buffer_json_member_add_time_t(wb, "since", host->sender->last_state_since_t); + buffer_json_member_add_time_t(wb, "age", now - host->sender->last_state_since_t); + } + + if(!online && host->sender && host->sender->exit.reason) + buffer_json_member_add_string(wb, "reason", host->sender->exit.reason); + + buffer_json_member_add_object(wb, "replication"); + { + size_t instances; + NETDATA_DOUBLE completion = rrdhost_sender_replication_completion(host, now, &instances); + buffer_json_member_add_boolean(wb, "in_progress", instances); + buffer_json_member_add_double(wb, "completion", completion); + buffer_json_member_add_uint64(wb, "instances", instances); + } + buffer_json_object_close(wb); + + if(host->sender) { + netdata_mutex_lock(&host->sender->mutex); + + buffer_json_member_add_object(wb, "destination"); + { + char buf[1024 + 1]; + if(online && host->sender->rrdpush_sender_socket != -1) { + SOCKET_PEERS peers = socket_peers(host->sender->rrdpush_sender_socket); + bool ssl = SSL_connection(&host->sender->ssl); + + snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : ""); + buffer_json_member_add_string(wb, "local", buf); + + snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : ""); + buffer_json_member_add_string(wb, "remote", buf); + + stream_capabilities_to_json_array(wb, host->sender->capabilities, "capabilities"); + + buffer_json_member_add_object(wb, "traffic"); + { + bool compression = false; +#ifdef ENABLE_COMPRESSION + compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor); +#endif + buffer_json_member_add_boolean(wb, "compression", compression); + buffer_json_member_add_uint64(wb, "data", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_DATA]); + buffer_json_member_add_uint64(wb, "metadata", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_METADATA]); + buffer_json_member_add_uint64(wb, "functions", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_FUNCTIONS]); + buffer_json_member_add_uint64(wb, "replication", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_REPLICATION]); + } + buffer_json_object_close(wb); // traffic + } + + buffer_json_member_add_array(wb, "candidates"); + struct rrdpush_destinations *d; + for (d = host->destinations; d; d = d->next) { + buffer_json_add_array_item_object(wb); + { + + if (d->ssl) { + snprintfz(buf, 1024, "%s:SSL", string2str(d->destination)); + buffer_json_member_add_string(wb, "destination", buf); + } + else + buffer_json_member_add_string(wb, "destination", string2str(d->destination)); + + buffer_json_member_add_time_t(wb, "last_check", d->last_attempt); + buffer_json_member_add_time_t(wb, "age", now - d->last_attempt); + buffer_json_member_add_string(wb, "last_error", d->last_error); + buffer_json_member_add_string(wb, "last_handshake", + stream_handshake_error_to_string(d->last_handshake)); + buffer_json_member_add_time_t(wb, "next_check", d->postpone_reconnection_until); + buffer_json_member_add_time_t(wb, "next_in", + (d->postpone_reconnection_until > now) ? + d->postpone_reconnection_until - now : 0); + } + buffer_json_object_close(wb); // each candidate + } + buffer_json_array_close(wb); // candidates + } + buffer_json_object_close(wb); // destination + + netdata_mutex_unlock(&host->sender->mutex); + } + + buffer_json_object_close(wb); // streaming +} + static bool rrdhost_set_sender(RRDHOST *host) { if(unlikely(!host->sender)) return false; @@ -1092,10 +1220,14 @@ static bool rrdhost_set_sender(RRDHOST *host) { rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); host->sender->tid = gettid(); + host->sender->last_state_since_t = now_realtime_sec(); + host->sender->exit.reason = NULL; ret = true; } netdata_mutex_unlock(&host->sender->mutex); + rrdpush_reset_destinations_postpone_time(host); + return ret; } @@ -1105,9 +1237,11 @@ static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) { if(host->sender->tid == gettid()) { host->sender->tid = 0; host->sender->exit.shutdown = false; - host->sender->exit.reason = NULL; rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + host->sender->last_state_since_t = now_realtime_sec(); } + + rrdpush_reset_destinations_postpone_time(host); } static bool rrdhost_sender_should_exit(struct sender_state *s) { @@ -1134,7 +1268,7 @@ static bool rrdhost_sender_should_exit(struct sender_state *s) { if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) { if(!s->exit.reason) - s->exit.reason = "RECEIVER LEFT"; + s->exit.reason = "RECEIVER LEFT (ORPHAN HOST)"; return true; } @@ -1162,6 +1296,32 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { freez(s); } +void rrdpush_initialize_ssl_ctx(RRDHOST *host) { +#ifdef ENABLE_HTTPS + static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER; + netdata_spinlock_lock(&sp); + + if(netdata_ssl_streaming_sender_ctx || !host) { + netdata_spinlock_unlock(&sp); + return; + } + + for(struct rrdpush_destinations *d = host->destinations; d ; d = d->next) { + if (d->ssl) { + // we need to initialize SSL + + netdata_ssl_initialize_ctx(NETDATA_SSL_STREAMING_SENDER_CTX); + ssl_security_location_for_context(netdata_ssl_streaming_sender_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path); + + // stop the loop + break; + } + } + + netdata_spinlock_unlock(&sp); +#endif +} + void *rrdpush_sender_thread(void *ptr) { worker_register("STREAMSND"); worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect"); @@ -1206,17 +1366,7 @@ void *rrdpush_sender_thread(void *ptr) { return NULL; } -#ifdef ENABLE_HTTPS - if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ) { - static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER; - netdata_spinlock_lock(&sp); - if(!netdata_ssl_client_ctx) { - security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING); - ssl_security_location_for_context(netdata_ssl_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path); - } - netdata_spinlock_unlock(&sp); - } -#endif + rrdpush_initialize_ssl_ctx(s->host); info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid()); @@ -1287,6 +1437,7 @@ void *rrdpush_sender_thread(void *ptr) { now_s = s->last_traffic_seen_t = now_monotonic_sec(); rrdpush_claimed_id(s->host); rrdpush_send_host_labels(s->host); + s->replication.oldest_request_after_t = 0; rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); |