diff options
Diffstat (limited to '')
-rw-r--r-- | streaming/receiver.c | 152 |
1 files changed, 103 insertions, 49 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index ff7a95629..709f15bd5 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -31,10 +31,14 @@ void receiver_state_free(struct receiver_state *rpt) { freez(rpt->program_version); #ifdef ENABLE_HTTPS - if(rpt->ssl.conn) - SSL_free(rpt->ssl.conn); + netdata_ssl_close(&rpt->ssl); #endif + if(rpt->fd != -1) { + internal_error(true, "closing socket..."); + close(rpt->fd); + } + #ifdef ENABLE_COMPRESSION if (rpt->decompressor) rpt->decompressor->destroy(&rpt->decompressor); @@ -100,13 +104,18 @@ static int read_stream(struct receiver_state *r, char* buffer, size_t size) { return 0; } + ssize_t bytes_read; + #ifdef ENABLE_HTTPS - if (r->ssl.conn && r->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) - return (int)netdata_ssl_read(r->ssl.conn, buffer, size); + if (SSL_connection(&r->ssl)) + bytes_read = netdata_ssl_read(&r->ssl, buffer, size); + else + bytes_read = read(r->fd, buffer, size); +#else + bytes_read = read(r->fd, buffer, size); #endif - ssize_t bytes_read = read(r->fd, buffer, size); - if(bytes_read == 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) { + if((bytes_read == 0 || bytes_read == -1) && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) { error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__); bytes_read = -3; } @@ -119,23 +128,6 @@ static int read_stream(struct receiver_state *r, char* buffer, size_t size) { bytes_read = -2; } -// do { -// bytes_read = (int) fread(buffer, 1, size, fp); -// if (unlikely(bytes_read <= 0)) { -// if(feof(fp)) { -// internal_error(true, "%s(): fread() failed with EOF", __FUNCTION__); -// bytes_read = -2; -// } -// else if(ferror(fp)) { -// internal_error(true, "%s(): fread() failed with ERROR", __FUNCTION__); -// bytes_read = -3; -// } -// else bytes_read = 0; -// } -// else -// worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, bytes_read); -// } while(bytes_read == 0); - return (int)bytes_read; } @@ -323,12 +315,6 @@ static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t b return NULL; } -static void streaming_parser_thread_cleanup(void *ptr) { - PARSER *parser = (PARSER *)ptr; - rrd_collector_finished(); - parser_destroy(parser); -} - bool plugin_is_enabled(struct plugind *cd); static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) { @@ -352,7 +338,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i // this keeps the parser with its current value // so, parser needs to be allocated before pushing it - netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser); + netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); @@ -437,6 +423,58 @@ static void rrdpush_receiver_replication_reset(RRDHOST *host) { rrdhost_receiver_replicating_charts_zero(host); } +void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) { + size_t receiver_hops = host->system_info ? host->system_info->hops : (host == localhost) ? 0 : 1; + + netdata_mutex_lock(&host->receiver_lock); + + buffer_json_member_add_object(wb, key); + buffer_json_member_add_uint64(wb, "hops", receiver_hops); + + bool online = host == localhost || !rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + buffer_json_member_add_boolean(wb, "online", online); + + if(host->child_connect_time || host->child_disconnected_time) { + time_t since = MAX(host->child_connect_time, host->child_disconnected_time); + buffer_json_member_add_time_t(wb, "since", since); + buffer_json_member_add_time_t(wb, "age", now - since); + } + + if(!online && host->rrdpush_last_receiver_exit_reason) + buffer_json_member_add_string(wb, "reason", host->rrdpush_last_receiver_exit_reason); + + if(host != localhost && host->receiver) { + buffer_json_member_add_object(wb, "replication"); + { + size_t instances = rrdhost_receiver_replicating_charts(host); + buffer_json_member_add_boolean(wb, "in_progress", instances); + buffer_json_member_add_double(wb, "completion", host->rrdpush_receiver_replication_percent); + buffer_json_member_add_uint64(wb, "instances", instances); + } + buffer_json_object_close(wb); // replication + + buffer_json_member_add_object(wb, "source"); + { + + char buf[1024 + 1]; + SOCKET_PEERS peers = socket_peers(host->receiver->fd); + bool ssl = SSL_connection(&host->receiver->ssl); + + snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : ""); + buffer_json_member_add_string(wb, "local", buf); + + snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : ""); + buffer_json_member_add_string(wb, "remote", buf); + + stream_capabilities_to_json_array(wb, host->receiver->capabilities, "capabilities"); + } + buffer_json_object_close(wb); // source + } + buffer_json_object_close(wb); // collection + + netdata_mutex_unlock(&host->receiver_lock); +} + static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { bool signal_rrdcontext = false; bool set_this = false; @@ -474,6 +512,8 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); aclk_queue_node_info(rpt->host, true); + rrdpush_reset_destinations_postpone_time(host); + set_this = true; } @@ -506,16 +546,17 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { signal_rrdcontext = true; rrdpush_receiver_replication_reset(host); - if (host->receiver == rpt) - host->receiver = NULL; - rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN); + host->receiver = NULL; + host->rrdpush_last_receiver_exit_reason = rpt->exit.reason; } netdata_mutex_unlock(&host->receiver_lock); if(signal_rrdcontext) rrdcontext_host_child_disconnected(host); + + rrdpush_reset_destinations_postpone_time(host); } } @@ -549,7 +590,7 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) { "thread %d takes too long to stop, giving up..." , rrdhost_hostname(host) , host->receiver->client_ip, host->receiver->client_port - , gettid()); + , host->receiver->tid); else ret = true; @@ -558,6 +599,18 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) { return ret; } +static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *rpt, const char *msg) { + (void) send_timeout( +#ifdef ENABLE_HTTPS + &rpt->ssl, +#endif + rpt->fd, + (char *)msg, + strlen(msg), + 0, + 5); +} + void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) { log_stream_connection(rpt->client_ip, rpt->client_port, @@ -585,7 +638,7 @@ static void rrdhost_reset_destinations(RRDHOST *host) { d->postpone_reconnection_until = 0; } -static int rrdpush_receive(struct receiver_state *rpt) +static void rrdpush_receive(struct receiver_state *rpt) { rpt->config.mode = default_rrd_memory_mode; rpt->config.history = default_rrd_history_entries; @@ -689,14 +742,14 @@ static int rrdpush_receive(struct receiver_state *rpt) if(!host) { rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION"); - close(rpt->fd); - return 1; + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INTERNAL_ERROR); + goto cleanup; } if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) { rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER"); - close(rpt->fd); - return 1; + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION); + goto cleanup; } // system_info has been consumed by the host structure @@ -704,8 +757,8 @@ static int rrdpush_receive(struct receiver_state *rpt) if(!rrdhost_set_receiver(host, rpt)) { rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION"); - close(rpt->fd); - return 1; + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_ALREADY_STREAMING); + goto cleanup; } } @@ -776,15 +829,16 @@ static int rrdpush_receive(struct receiver_state *rpt) } debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); - if(send_timeout( + ssize_t bytes_sent = send_timeout( #ifdef ENABLE_HTTPS &rpt->ssl, #endif - rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { + rpt->fd, initial_response, strlen(initial_response), 0, 60); + if(bytes_sent != (ssize_t)strlen(initial_response)) { + internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response)); rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION"); - close(rpt->fd); - return 0; + goto cleanup; } } @@ -850,9 +904,8 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_set_is_parent_label(--localhost->connected_children_count); - // cleanup - close(rpt->fd); - return (int)count; +cleanup: + ; } static void rrdpush_receiver_thread_cleanup(void *ptr) { @@ -879,7 +932,8 @@ void *rrdpush_receiver_thread(void *ptr) { worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE); struct receiver_state *rpt = (struct receiver_state *)ptr; - info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); + rpt->tid = gettid(); + info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid); rrdpush_receive(rpt); |