diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-06-14 19:20:33 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-06-14 19:20:33 +0000 |
commit | 6cf8f2d5174a53f582e61d715edbb88d6e3367cc (patch) | |
tree | 78cec0fd8d09c4a6a052461d42f4b2be3af6d396 /streaming/sender.c | |
parent | Adding upstream version 1.39.1. (diff) | |
download | netdata-6cf8f2d5174a53f582e61d715edbb88d6e3367cc.tar.xz netdata-6cf8f2d5174a53f582e61d715edbb88d6e3367cc.zip |
Adding upstream version 1.40.0.upstream/1.40.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/sender.c')
-rw-r--r-- | streaming/sender.c | 391 |
1 files changed, 271 insertions, 120 deletions
diff --git a/streaming/sender.c b/streaming/sender.c index 179c2dc6..c74c9b40 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -29,7 +29,6 @@ #endif extern struct config stream_config; -extern int netdata_use_ssl_on_stream; extern char *netdata_ssl_ca_path; extern char *netdata_ssl_ca_file; @@ -85,7 +84,7 @@ static inline void deactivate_compression(struct sender_state *s) { #define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3 // Collector thread finishing a transmission -void sender_commit(struct sender_state *s, BUFFER *wb) { +void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) { if(unlikely(wb != sender_thread_buffer)) fatal("STREAMING: sender is trying to commit a buffer that is not this thread's buffer."); @@ -164,6 +163,8 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { if(cbuffer_add_unsafe(s->buffer, dst, dst_len)) s->flags |= SENDER_FLAG_OVERFLOW; + else + s->sent_bytes_on_this_connection_per_type[type] += dst_len; src = src + size_to_compress; src_len -= size_to_compress; @@ -171,9 +172,13 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { } else if(cbuffer_add_unsafe(s->buffer, src, src_len)) s->flags |= SENDER_FLAG_OVERFLOW; + else + s->sent_bytes_on_this_connection_per_type[type] += src_len; #else if(cbuffer_add_unsafe(s->buffer, src, src_len)) s->flags |= SENDER_FLAG_OVERFLOW; + else + s->sent_bytes_on_this_connection_per_type[type] += src_len; #endif replication_recalculate_buffer_used_ratio_unsafe(s); @@ -205,7 +210,7 @@ void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQU if(rrdhost_can_send_definitions_to_parent(host)) { BUFFER *wb = sender_start(host->sender); rrdpush_sender_add_host_variable_to_buffer(wb, rva); - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); } } @@ -234,7 +239,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { }; int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp); (void)ret; - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); @@ -320,6 +325,10 @@ static void rrdpush_sender_after_connect(RRDHOST *host) { } static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { +#ifdef ENABLE_HTTPS + netdata_ssl_close(&host->sender->ssl); +#endif + if(host->sender->rrdpush_sender_socket != -1) { close(host->sender->rrdpush_sender_socket); host->sender->rrdpush_sender_socket = -1; @@ -335,11 +344,11 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) { - se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):""; - se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):""; - se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):""; - se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):""; - se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):""; + se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz(""); + se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz(""); + se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz(""); + se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):strdupz(""); + se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz(""); } void rrdpush_clean_encoded(stream_encoded_t *se) @@ -423,6 +432,33 @@ struct { .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 1 * 60, // 1 minute }, + { + .response = START_STREAMING_ERROR_BUSY_TRY_LATER, + .length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1, + .version = STREAM_HANDSHAKE_BUSY_TRY_LATER, + .dynamic = false, + .error = "remote server is currently busy, we should try later", + .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, + .postpone_reconnect_seconds = 2 * 60, // 2 minutes + }, + { + .response = START_STREAMING_ERROR_INTERNAL_ERROR, + .length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1, + .version = STREAM_HANDSHAKE_INTERNAL_ERROR, + .dynamic = false, + .error = "remote server is encountered an internal error, we should try later", + .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, + .postpone_reconnect_seconds = 5 * 60, // 5 minutes + }, + { + .response = START_STREAMING_ERROR_INITIALIZATION, + .length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1, + .version = STREAM_HANDSHAKE_INITIALIZATION, + .dynamic = false, + .error = "remote server is initializing, we should try later", + .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, + .postpone_reconnect_seconds = 2 * 60, // 2 minute + }, // terminator { @@ -480,6 +516,53 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender return false; } +static bool rrdpush_sender_connect_ssl(struct sender_state *s) { +#ifdef ENABLE_HTTPS + RRDHOST *host = s->host; + bool ssl_required = host->destination && host->destination->ssl; + + netdata_ssl_close(&host->sender->ssl); + + if(!ssl_required) + return true; + + if (netdata_ssl_open(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket)) { + if(!netdata_ssl_connect(&host->sender->ssl)) { + // couldn't connect + + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); + rrdpush_sender_thread_close_socket(host); + host->destination->last_error = "SSL error"; + host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR; + host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; + return false; + } + + if (netdata_ssl_validate_certificate_sender && + security_test_certificate(host->sender->ssl.conn)) { + // certificate is not valid + + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); + error("SSL: closing the stream connection, because the server SSL certificate is not valid."); + rrdpush_sender_thread_close_socket(host); + host->destination->last_error = "invalid SSL certificate"; + host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE; + host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; + return false; + } + + return true; + } + + // failed to establish connection + return false; + +#else + // SSL is not enabled + return true; +#endif +} + static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) { struct timeval tv = { @@ -507,35 +590,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p // info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to); -#ifdef ENABLE_HTTPS - if(netdata_ssl_client_ctx){ - host->sender->ssl.flags = NETDATA_SSL_START; - if (!host->sender->ssl.conn){ - host->sender->ssl.conn = SSL_new(netdata_ssl_client_ctx); - if(!host->sender->ssl.conn){ - error("Failed to allocate SSL structure."); - host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; - } - } - else{ - SSL_clear(host->sender->ssl.conn); - } - - if (host->sender->ssl.conn) - { - if (SSL_set_fd(host->sender->ssl.conn, s->rrdpush_sender_socket) != 1) { - error("Failed to set the socket to the SSL on socket fd %d.", s->rrdpush_sender_socket); - host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; - } else{ - host->sender->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE; - } - } - } - else { - host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; - } -#endif - // reset our capabilities to default s->capabilities = stream_our_capabilities(); @@ -651,43 +705,8 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p http[eol] = 0x00; rrdpush_clean_encoded(&se); -#ifdef ENABLE_HTTPS - if (!host->sender->ssl.flags) { - ERR_clear_error(); - SSL_set_connect_state(host->sender->ssl.conn); - int err = SSL_connect(host->sender->ssl.conn); - if (err != 1){ - err = SSL_get_error(host->sender->ssl.conn, err); - error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->sender->ssl.conn,err),NULL)); - if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) { - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); - rrdpush_sender_thread_close_socket(host); - host->destination->last_error = "SSL error"; - host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR; - host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; - return false; - } - else { - host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; - } - } - else { - if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) { - if (netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE) { - if ( security_test_certificate(host->sender->ssl.conn)) { - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); - error("Closing the stream connection, because the server SSL certificate is not valid."); - rrdpush_sender_thread_close_socket(host); - host->destination->last_error = "invalid SSL certificate"; - host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE; - host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; - return false; - } - } - } - } - } -#endif + if(!rrdpush_sender_connect_ssl(s)) + return false; ssize_t bytes; @@ -733,6 +752,12 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p return false; } + if(sock_setnonblock(s->rrdpush_sender_socket) < 0) + error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); + + if(sock_enlarge_out(s->rrdpush_sender_socket) < 0) + error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to); + http[bytes] = '\0'; debug(D_STREAM, "Response to sender from far end: %s", http); if(!rrdpush_sender_validate_response(host, s, http, bytes)) @@ -749,12 +774,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p log_sender_capabilities(s); - if(sock_setnonblock(s->rrdpush_sender_socket) < 0) - error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); - - if(sock_enlarge_out(s->rrdpush_sender_socket) < 0) - error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to); - debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket); return true; @@ -764,6 +783,10 @@ static bool attempt_to_connect(struct sender_state *state) { state->send_attempts = 0; + // reset the bytes we have sent for this session + state->sent_bytes_on_this_connection = 0; + memset(state->sent_bytes_on_this_connection_per_type, 0, sizeof(state->sent_bytes_on_this_connection_per_type)); + if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) { // reset the buffer, to properly send charts and metrics rrdpush_sender_on_connect(state->host); @@ -774,9 +797,6 @@ static bool attempt_to_connect(struct sender_state *state) // make sure the next reconnection will be immediate state->not_connected_loops = 0; - // reset the bytes we have sent for this session - state->sent_bytes_on_this_connection = 0; - // let the data collection threads know we are ready rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); @@ -790,9 +810,6 @@ static bool attempt_to_connect(struct sender_state *state) // increase the failed connections counter state->not_connected_loops++; - // reset the number of bytes sent - state->sent_bytes_on_this_connection = 0; - // slow re-connection on repeating errors usec_t now_ut = now_monotonic_usec(); usec_t end_ut = now_ut + USEC_PER_SEC * state->reconnect_delay; @@ -819,9 +836,8 @@ static ssize_t attempt_to_send(struct sender_state *s) { debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); #ifdef ENABLE_HTTPS - SSL *conn = s->ssl.conn ; - if(conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) - ret = netdata_ssl_write(conn, chunk, outstanding); + if(SSL_connection(&s->ssl)) + ret = netdata_ssl_write(&s->ssl, chunk, outstanding); else ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT); #else @@ -852,25 +868,17 @@ static ssize_t attempt_to_send(struct sender_state *s) { } static ssize_t attempt_read(struct sender_state *s) { - ssize_t ret = 0; + ssize_t ret; #ifdef ENABLE_HTTPS - if (s->ssl.conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { - size_t desired = sizeof(s->read_buffer) - s->read_len - 1; - ret = netdata_ssl_read(s->ssl.conn, s->read_buffer, desired); - if (ret > 0 ) { - s->read_len += (int)ret; - return ret; - } - - if (ret == -1) { - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); - rrdpush_sender_thread_close_socket(s->host); - } - return ret; - } -#endif + if (SSL_connection(&s->ssl)) + ret = netdata_ssl_read(&s->ssl, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1); + else + ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT); +#else ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT); +#endif + if (ret > 0) { s->read_len += ret; return ret; @@ -879,6 +887,12 @@ static ssize_t attempt_read(struct sender_state *s) { if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) return ret; +#ifdef ENABLE_HTTPS + if (SSL_connection(&s->ssl)) + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); + else +#endif + if (ret == 0 || errno == ECONNRESET) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED); error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to); @@ -887,6 +901,7 @@ static ssize_t attempt_read(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR); error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret); } + rrdpush_sender_thread_close_socket(s->host); return ret; @@ -915,7 +930,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb)); pluginsd_function_result_end_to_buffer(wb); - sender_commit(s, wb); + sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); sender_thread_buffer_free(); internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).", @@ -1083,6 +1098,119 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) { } } +static NETDATA_DOUBLE rrdhost_sender_replication_completion(RRDHOST *host, time_t now, size_t *instances) { + size_t charts = rrdhost_sender_replicating_charts(host); + NETDATA_DOUBLE completion; + if(!charts || !host->sender || !host->sender->replication.oldest_request_after_t) + completion = 100.0; + else if(!host->sender->replication.latest_completed_before_t || host->sender->replication.latest_completed_before_t < host->sender->replication.oldest_request_after_t) + completion = 0.0; + else { + time_t total = now - host->sender->replication.oldest_request_after_t; + time_t current = host->sender->replication.latest_completed_before_t - host->sender->replication.oldest_request_after_t; + completion = (NETDATA_DOUBLE) current * 100.0 / (NETDATA_DOUBLE) total; + } + + *instances = charts; + + return completion; +} + +void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) { + bool online = rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); + buffer_json_member_add_object(wb, key); + + if(host->sender) + buffer_json_member_add_uint64(wb, "hops", host->sender->hops); + + buffer_json_member_add_boolean(wb, "online", online); + + if(host->sender && host->sender->last_state_since_t) { + buffer_json_member_add_time_t(wb, "since", host->sender->last_state_since_t); + buffer_json_member_add_time_t(wb, "age", now - host->sender->last_state_since_t); + } + + if(!online && host->sender && host->sender->exit.reason) + buffer_json_member_add_string(wb, "reason", host->sender->exit.reason); + + buffer_json_member_add_object(wb, "replication"); + { + size_t instances; + NETDATA_DOUBLE completion = rrdhost_sender_replication_completion(host, now, &instances); + buffer_json_member_add_boolean(wb, "in_progress", instances); + buffer_json_member_add_double(wb, "completion", completion); + buffer_json_member_add_uint64(wb, "instances", instances); + } + buffer_json_object_close(wb); + + if(host->sender) { + netdata_mutex_lock(&host->sender->mutex); + + buffer_json_member_add_object(wb, "destination"); + { + char buf[1024 + 1]; + if(online && host->sender->rrdpush_sender_socket != -1) { + SOCKET_PEERS peers = socket_peers(host->sender->rrdpush_sender_socket); + bool ssl = SSL_connection(&host->sender->ssl); + + snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : ""); + buffer_json_member_add_string(wb, "local", buf); + + snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : ""); + buffer_json_member_add_string(wb, "remote", buf); + + stream_capabilities_to_json_array(wb, host->sender->capabilities, "capabilities"); + + buffer_json_member_add_object(wb, "traffic"); + { + bool compression = false; +#ifdef ENABLE_COMPRESSION + compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor); +#endif + buffer_json_member_add_boolean(wb, "compression", compression); + buffer_json_member_add_uint64(wb, "data", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_DATA]); + buffer_json_member_add_uint64(wb, "metadata", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_METADATA]); + buffer_json_member_add_uint64(wb, "functions", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_FUNCTIONS]); + buffer_json_member_add_uint64(wb, "replication", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_REPLICATION]); + } + buffer_json_object_close(wb); // traffic + } + + buffer_json_member_add_array(wb, "candidates"); + struct rrdpush_destinations *d; + for (d = host->destinations; d; d = d->next) { + buffer_json_add_array_item_object(wb); + { + + if (d->ssl) { + snprintfz(buf, 1024, "%s:SSL", string2str(d->destination)); + buffer_json_member_add_string(wb, "destination", buf); + } + else + buffer_json_member_add_string(wb, "destination", string2str(d->destination)); + + buffer_json_member_add_time_t(wb, "last_check", d->last_attempt); + buffer_json_member_add_time_t(wb, "age", now - d->last_attempt); + buffer_json_member_add_string(wb, "last_error", d->last_error); + buffer_json_member_add_string(wb, "last_handshake", + stream_handshake_error_to_string(d->last_handshake)); + buffer_json_member_add_time_t(wb, "next_check", d->postpone_reconnection_until); + buffer_json_member_add_time_t(wb, "next_in", + (d->postpone_reconnection_until > now) ? + d->postpone_reconnection_until - now : 0); + } + buffer_json_object_close(wb); // each candidate + } + buffer_json_array_close(wb); // candidates + } + buffer_json_object_close(wb); // destination + + netdata_mutex_unlock(&host->sender->mutex); + } + + buffer_json_object_close(wb); // streaming +} + static bool rrdhost_set_sender(RRDHOST *host) { if(unlikely(!host->sender)) return false; @@ -1092,10 +1220,14 @@ static bool rrdhost_set_sender(RRDHOST *host) { rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); host->sender->tid = gettid(); + host->sender->last_state_since_t = now_realtime_sec(); + host->sender->exit.reason = NULL; ret = true; } netdata_mutex_unlock(&host->sender->mutex); + rrdpush_reset_destinations_postpone_time(host); + return ret; } @@ -1105,9 +1237,11 @@ static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) { if(host->sender->tid == gettid()) { host->sender->tid = 0; host->sender->exit.shutdown = false; - host->sender->exit.reason = NULL; rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + host->sender->last_state_since_t = now_realtime_sec(); } + + rrdpush_reset_destinations_postpone_time(host); } static bool rrdhost_sender_should_exit(struct sender_state *s) { @@ -1134,7 +1268,7 @@ static bool rrdhost_sender_should_exit(struct sender_state *s) { if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) { if(!s->exit.reason) - s->exit.reason = "RECEIVER LEFT"; + s->exit.reason = "RECEIVER LEFT (ORPHAN HOST)"; return true; } @@ -1162,6 +1296,32 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { freez(s); } +void rrdpush_initialize_ssl_ctx(RRDHOST *host) { +#ifdef ENABLE_HTTPS + static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER; + netdata_spinlock_lock(&sp); + + if(netdata_ssl_streaming_sender_ctx || !host) { + netdata_spinlock_unlock(&sp); + return; + } + + for(struct rrdpush_destinations *d = host->destinations; d ; d = d->next) { + if (d->ssl) { + // we need to initialize SSL + + netdata_ssl_initialize_ctx(NETDATA_SSL_STREAMING_SENDER_CTX); + ssl_security_location_for_context(netdata_ssl_streaming_sender_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path); + + // stop the loop + break; + } + } + + netdata_spinlock_unlock(&sp); +#endif +} + void *rrdpush_sender_thread(void *ptr) { worker_register("STREAMSND"); worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect"); @@ -1206,17 +1366,7 @@ void *rrdpush_sender_thread(void *ptr) { return NULL; } -#ifdef ENABLE_HTTPS - if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ) { - static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER; - netdata_spinlock_lock(&sp); - if(!netdata_ssl_client_ctx) { - security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING); - ssl_security_location_for_context(netdata_ssl_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path); - } - netdata_spinlock_unlock(&sp); - } -#endif + rrdpush_initialize_ssl_ctx(s->host); info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid()); @@ -1287,6 +1437,7 @@ void *rrdpush_sender_thread(void *ptr) { now_s = s->last_traffic_seen_t = now_monotonic_sec(); rrdpush_claimed_id(s->host); rrdpush_send_host_labels(s->host); + s->replication.oldest_request_after_t = 0; rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); |