summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c152
1 files changed, 103 insertions, 49 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index ff7a95629..709f15bd5 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -31,10 +31,14 @@ void receiver_state_free(struct receiver_state *rpt) {
freez(rpt->program_version);
#ifdef ENABLE_HTTPS
- if(rpt->ssl.conn)
- SSL_free(rpt->ssl.conn);
+ netdata_ssl_close(&rpt->ssl);
#endif
+ if(rpt->fd != -1) {
+ internal_error(true, "closing socket...");
+ close(rpt->fd);
+ }
+
#ifdef ENABLE_COMPRESSION
if (rpt->decompressor)
rpt->decompressor->destroy(&rpt->decompressor);
@@ -100,13 +104,18 @@ static int read_stream(struct receiver_state *r, char* buffer, size_t size) {
return 0;
}
+ ssize_t bytes_read;
+
#ifdef ENABLE_HTTPS
- if (r->ssl.conn && r->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
- return (int)netdata_ssl_read(r->ssl.conn, buffer, size);
+ if (SSL_connection(&r->ssl))
+ bytes_read = netdata_ssl_read(&r->ssl, buffer, size);
+ else
+ bytes_read = read(r->fd, buffer, size);
+#else
+ bytes_read = read(r->fd, buffer, size);
#endif
- ssize_t bytes_read = read(r->fd, buffer, size);
- if(bytes_read == 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) {
+ if((bytes_read == 0 || bytes_read == -1) && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) {
error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__);
bytes_read = -3;
}
@@ -119,23 +128,6 @@ static int read_stream(struct receiver_state *r, char* buffer, size_t size) {
bytes_read = -2;
}
-// do {
-// bytes_read = (int) fread(buffer, 1, size, fp);
-// if (unlikely(bytes_read <= 0)) {
-// if(feof(fp)) {
-// internal_error(true, "%s(): fread() failed with EOF", __FUNCTION__);
-// bytes_read = -2;
-// }
-// else if(ferror(fp)) {
-// internal_error(true, "%s(): fread() failed with ERROR", __FUNCTION__);
-// bytes_read = -3;
-// }
-// else bytes_read = 0;
-// }
-// else
-// worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, bytes_read);
-// } while(bytes_read == 0);
-
return (int)bytes_read;
}
@@ -323,12 +315,6 @@ static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t b
return NULL;
}
-static void streaming_parser_thread_cleanup(void *ptr) {
- PARSER *parser = (PARSER *)ptr;
- rrd_collector_finished();
- parser_destroy(parser);
-}
-
bool plugin_is_enabled(struct plugind *cd);
static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) {
@@ -352,7 +338,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
// this keeps the parser with its current value
// so, parser needs to be allocated before pushing it
- netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser);
+ netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
@@ -437,6 +423,58 @@ static void rrdpush_receiver_replication_reset(RRDHOST *host) {
rrdhost_receiver_replicating_charts_zero(host);
}
+void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) {
+ size_t receiver_hops = host->system_info ? host->system_info->hops : (host == localhost) ? 0 : 1;
+
+ netdata_mutex_lock(&host->receiver_lock);
+
+ buffer_json_member_add_object(wb, key);
+ buffer_json_member_add_uint64(wb, "hops", receiver_hops);
+
+ bool online = host == localhost || !rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+ buffer_json_member_add_boolean(wb, "online", online);
+
+ if(host->child_connect_time || host->child_disconnected_time) {
+ time_t since = MAX(host->child_connect_time, host->child_disconnected_time);
+ buffer_json_member_add_time_t(wb, "since", since);
+ buffer_json_member_add_time_t(wb, "age", now - since);
+ }
+
+ if(!online && host->rrdpush_last_receiver_exit_reason)
+ buffer_json_member_add_string(wb, "reason", host->rrdpush_last_receiver_exit_reason);
+
+ if(host != localhost && host->receiver) {
+ buffer_json_member_add_object(wb, "replication");
+ {
+ size_t instances = rrdhost_receiver_replicating_charts(host);
+ buffer_json_member_add_boolean(wb, "in_progress", instances);
+ buffer_json_member_add_double(wb, "completion", host->rrdpush_receiver_replication_percent);
+ buffer_json_member_add_uint64(wb, "instances", instances);
+ }
+ buffer_json_object_close(wb); // replication
+
+ buffer_json_member_add_object(wb, "source");
+ {
+
+ char buf[1024 + 1];
+ SOCKET_PEERS peers = socket_peers(host->receiver->fd);
+ bool ssl = SSL_connection(&host->receiver->ssl);
+
+ snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : "");
+ buffer_json_member_add_string(wb, "local", buf);
+
+ snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : "");
+ buffer_json_member_add_string(wb, "remote", buf);
+
+ stream_capabilities_to_json_array(wb, host->receiver->capabilities, "capabilities");
+ }
+ buffer_json_object_close(wb); // source
+ }
+ buffer_json_object_close(wb); // collection
+
+ netdata_mutex_unlock(&host->receiver_lock);
+}
+
static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
bool signal_rrdcontext = false;
bool set_this = false;
@@ -474,6 +512,8 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
aclk_queue_node_info(rpt->host, true);
+ rrdpush_reset_destinations_postpone_time(host);
+
set_this = true;
}
@@ -506,16 +546,17 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) {
signal_rrdcontext = true;
rrdpush_receiver_replication_reset(host);
- if (host->receiver == rpt)
- host->receiver = NULL;
-
rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN);
+ host->receiver = NULL;
+ host->rrdpush_last_receiver_exit_reason = rpt->exit.reason;
}
netdata_mutex_unlock(&host->receiver_lock);
if(signal_rrdcontext)
rrdcontext_host_child_disconnected(host);
+
+ rrdpush_reset_destinations_postpone_time(host);
}
}
@@ -549,7 +590,7 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) {
"thread %d takes too long to stop, giving up..."
, rrdhost_hostname(host)
, host->receiver->client_ip, host->receiver->client_port
- , gettid());
+ , host->receiver->tid);
else
ret = true;
@@ -558,6 +599,18 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) {
return ret;
}
+static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *rpt, const char *msg) {
+ (void) send_timeout(
+#ifdef ENABLE_HTTPS
+ &rpt->ssl,
+#endif
+ rpt->fd,
+ (char *)msg,
+ strlen(msg),
+ 0,
+ 5);
+}
+
void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) {
log_stream_connection(rpt->client_ip, rpt->client_port,
@@ -585,7 +638,7 @@ static void rrdhost_reset_destinations(RRDHOST *host) {
d->postpone_reconnection_until = 0;
}
-static int rrdpush_receive(struct receiver_state *rpt)
+static void rrdpush_receive(struct receiver_state *rpt)
{
rpt->config.mode = default_rrd_memory_mode;
rpt->config.history = default_rrd_history_entries;
@@ -689,14 +742,14 @@ static int rrdpush_receive(struct receiver_state *rpt)
if(!host) {
rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION");
- close(rpt->fd);
- return 1;
+ rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INTERNAL_ERROR);
+ goto cleanup;
}
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER");
- close(rpt->fd);
- return 1;
+ rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION);
+ goto cleanup;
}
// system_info has been consumed by the host structure
@@ -704,8 +757,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
if(!rrdhost_set_receiver(host, rpt)) {
rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION");
- close(rpt->fd);
- return 1;
+ rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_ALREADY_STREAMING);
+ goto cleanup;
}
}
@@ -776,15 +829,16 @@ static int rrdpush_receive(struct receiver_state *rpt)
}
debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
- if(send_timeout(
+ ssize_t bytes_sent = send_timeout(
#ifdef ENABLE_HTTPS
&rpt->ssl,
#endif
- rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
+ rpt->fd, initial_response, strlen(initial_response), 0, 60);
+ if(bytes_sent != (ssize_t)strlen(initial_response)) {
+ internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response));
rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION");
- close(rpt->fd);
- return 0;
+ goto cleanup;
}
}
@@ -850,9 +904,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdhost_set_is_parent_label(--localhost->connected_children_count);
- // cleanup
- close(rpt->fd);
- return (int)count;
+cleanup:
+ ;
}
static void rrdpush_receiver_thread_cleanup(void *ptr) {
@@ -879,7 +932,8 @@ void *rrdpush_receiver_thread(void *ptr) {
worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE);
struct receiver_state *rpt = (struct receiver_state *)ptr;
- info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+ rpt->tid = gettid();
+ info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid);
rrdpush_receive(rpt);