summaryrefslogtreecommitdiffstats
path: root/src/streaming/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/streaming/receiver.c (renamed from streaming/receiver.c)219
1 files changed, 117 insertions, 102 deletions
diff --git a/streaming/receiver.c b/src/streaming/receiver.c
index a12b94fb4..2cbf247dc 100644
--- a/streaming/receiver.c
+++ b/src/streaming/receiver.c
@@ -14,7 +14,6 @@ void receiver_state_free(struct receiver_state *rpt) {
freez(rpt->os);
freez(rpt->timezone);
freez(rpt->abbrev_timezone);
- freez(rpt->tags);
freez(rpt->client_ip);
freez(rpt->client_port);
freez(rpt->program_name);
@@ -59,8 +58,12 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz
}
#ifdef ENABLE_H2O
- if (is_h2o_rrdpush(r))
+ if (is_h2o_rrdpush(r)) {
+ if(nd_thread_signaled_to_cancel())
+ return -4;
+
return (int)h2o_stream_read(r->h2o_ctx, buffer, size);
+ }
#endif
int tries = 100;
@@ -69,6 +72,29 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz
do {
errno = 0;
+ switch(wait_on_socket_or_cancel_with_timeout(
+#ifdef ENABLE_HTTPS
+ &r->ssl,
+#endif
+ r->fd, 0, POLLIN, NULL))
+ {
+ case 0: // data are waiting
+ break;
+
+ case 1: // timeout reached
+ netdata_log_error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__);
+ return -3;
+
+ case -1: // thread cancelled
+ netdata_log_error("STREAM: %s(): thread has been cancelled timeout while waiting for data on socket!", __FUNCTION__);
+ return -4;
+
+ default:
+ case 2: // error on socket
+ netdata_log_error("STREAM: %s() socket error!", __FUNCTION__);
+ return -2;
+ }
+
#ifdef ENABLE_HTTPS
if (SSL_connection(&r->ssl))
bytes_read = netdata_ssl_read(&r->ssl, buffer, size);
@@ -117,6 +143,10 @@ static inline STREAM_HANDSHAKE read_stream_error_to_reason(int code) {
// timeout
return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT;
+ case -4:
+ // the thread is cancelled
+ return STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN;
+
default:
// anything else
return STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
@@ -262,6 +292,11 @@ static void receiver_set_exit_reason(struct receiver_state *rpt, STREAM_HANDSHAK
static inline bool receiver_should_stop(struct receiver_state *rpt) {
static __thread size_t counter = 0;
+ if(nd_thread_signaled_to_cancel()) {
+ receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false);
+ return true;
+ }
+
if(unlikely(rpt->exit.shutdown)) {
receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false);
return true;
@@ -272,11 +307,8 @@ static inline bool receiver_should_stop(struct receiver_state *rpt) {
return true;
}
- if(unlikely((counter++ % 1000) == 0)) {
- // check every 1000 lines read
- netdata_thread_testcancel();
+ if(unlikely((counter++ % 1000) == 0))
rpt->last_msg_t = now_monotonic_sec();
- }
return false;
}
@@ -308,65 +340,58 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
// this keeps the parser with its current value
// so, parser needs to be allocated before pushing it
- netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
+ CLEANUP_FUNCTION_REGISTER(pluginsd_process_thread_cleanup) parser_ptr = parser;
- {
- bool compressed_connection = rrdpush_decompression_initialize(rpt);
-
- buffered_reader_init(&rpt->reader);
+ bool compressed_connection = rrdpush_decompression_initialize(rpt);
+ buffered_reader_init(&rpt->reader);
#ifdef NETDATA_LOG_STREAM_RECEIVE
- {
- char filename[FILENAME_MAX + 1];
- snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(
- rpt->host) : "unknown"
- );
- parser->user.stream_log_fp = fopen(filename, "w");
- parser->user.stream_log_repertoire = PARSER_REP_METADATA;
- }
+ {
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(
+ rpt->host) : "unknown"
+ );
+ parser->user.stream_log_fp = fopen(filename, "w");
+ parser->user.stream_log_repertoire = PARSER_REP_METADATA;
+ }
#endif
- CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
-
- ND_LOG_STACK lgs[] = {
- ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
- ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
- ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
- ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
- ND_LOG_FIELD_END(),
- };
- ND_LOG_STACK_PUSH(lgs);
-
- while(!receiver_should_stop(rpt)) {
-
- if(!buffered_reader_next_line(&rpt->reader, buffer)) {
- STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
+ CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
- bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason)
- : receiver_read_uncompressed(rpt, &reason);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
+ ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
- if(unlikely(!have_new_data)) {
- receiver_set_exit_reason(rpt, reason, false);
- break;
- }
+ while(!receiver_should_stop(rpt)) {
- continue;
- }
+ if(!buffered_reader_next_line(&rpt->reader, buffer)) {
+ STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
- if(unlikely(parser_action(parser, buffer->buffer))) {
- receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
- break;
- }
+ bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason)
+ : receiver_read_uncompressed(rpt, &reason);
- buffer->len = 0;
- buffer->buffer[0] = '\0';
- }
- result = parser->user.data_collections_count;
+ if(unlikely(!have_new_data)) {
+ receiver_set_exit_reason(rpt, reason, false);
+ break;
}
- // free parser with the pop function
- netdata_thread_cleanup_pop(1);
+ continue;
+ }
+ if(unlikely(parser_action(parser, buffer->buffer))) {
+ receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
+ break;
+ }
+
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+ result = parser->user.data_collections_count;
return result;
}
@@ -436,10 +461,9 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
}
static void rrdhost_clear_receiver(struct receiver_state *rpt) {
- bool signal_rrdcontext = false;
-
RRDHOST *host = rpt->host;
if(host) {
+ bool signal_rrdcontext = false;
netdata_mutex_lock(&host->receiver_lock);
// Make sure that we detach this thread and don't kill a freshly arriving receiver
@@ -451,8 +475,7 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) {
host->child_connect_time = 0;
host->child_disconnected_time = now_realtime_sec();
- if (rpt->config.health_enabled == CONFIG_BOOLEAN_AUTO)
- host->health.health_enabled = 0;
+ host->health.health_enabled = 0;
rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false);
@@ -462,6 +485,9 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) {
rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN);
host->receiver = NULL;
host->rrdpush_last_receiver_exit_reason = rpt->exit.reason;
+
+ if(rpt->config.health_enabled)
+ rrdcalc_child_disconnected(host);
}
netdata_mutex_unlock(&host->receiver_lock);
@@ -485,7 +511,7 @@ bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) {
shutdown(host->receiver->fd, SHUT_RDWR);
}
- netdata_thread_cancel(host->receiver->thread);
+ nd_thread_signal_cancel(host->receiver->thread);
}
int count = 2000;
@@ -556,7 +582,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.mode = default_rrd_memory_mode;
rpt->config.history = default_rrd_history_entries;
- rpt->config.health_enabled = (int)default_health_enabled;
+ rpt->config.health_enabled = health_plugin_enabled();
rpt->config.alarms_delay = 60;
rpt->config.alarms_history = HEALTH_LOG_DEFAULT_HISTORY;
@@ -633,8 +659,6 @@ static void rrdpush_receive(struct receiver_state *rpt)
rrdpush_parse_compression_order(rpt, order);
}
- (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
-
// find the host for this receiver
{
// this will also update the host with our system_info
@@ -646,7 +670,6 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->timezone,
rpt->abbrev_timezone,
rpt->utc_offset,
- rpt->tags,
rpt->program_name,
rpt->program_version,
rpt->config.update_every,
@@ -699,7 +722,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
#ifdef NETDATA_INTERNAL_CHECKS
netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
"client willing to stream metrics for host '%s' with machine_guid '%s': "
- "update every = %d, history = %d, memory mode = %s, health %s,%s tags '%s'"
+ "update every = %d, history = %d, memory mode = %s, health %s,%s"
, rpt->hostname
, rpt->client_ip
, rpt->client_port
@@ -714,7 +737,6 @@ static void rrdpush_receive(struct receiver_state *rpt)
#else
, ""
#endif
- , rrdhost_tags(rpt->host)
);
#endif // NETDATA_INTERNAL_CHECKS
@@ -852,20 +874,18 @@ cleanup:
;
}
-static void rrdpush_receiver_thread_cleanup(void *ptr) {
- struct receiver_state *rpt = (struct receiver_state *) ptr;
- worker_unregister();
-
- rrdhost_clear_receiver(rpt);
+static void rrdpush_receiver_thread_cleanup(void *pptr) {
+ struct receiver_state *rpt = CLEANUP_FUNCTION_GET_PTR(pptr);
+ if(!rpt) return;
netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
"receive thread ended (task id %d)"
- , rpt->hostname ? rpt->hostname : "-"
- , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-"
- , gettid());
+ , rpt->hostname ? rpt->hostname : "-"
+ , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-", gettid_cached());
+ worker_unregister();
+ rrdhost_clear_receiver(rpt);
receiver_state_free(rpt);
-
rrdhost_set_is_parent_label();
}
@@ -892,42 +912,37 @@ static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) {
}
void *rrdpush_receiver_thread(void *ptr) {
- netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
-
- {
- worker_register("STREAMRCV");
-
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ,
- "received bytes", "bytes/s",
- WORKER_METRIC_INCREMENT);
+ CLEANUP_FUNCTION_REGISTER(rrdpush_receiver_thread_cleanup) cleanup_ptr = ptr;
+ worker_register("STREAMRCV");
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED,
- "uncompressed bytes", "bytes/s",
- WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ,
+ "received bytes", "bytes/s",
+ WORKER_METRIC_INCREMENT);
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
- "replication completion", "%",
- WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED,
+ "uncompressed bytes", "bytes/s",
+ WORKER_METRIC_INCREMENT);
- struct receiver_state *rpt = (struct receiver_state *) ptr;
- rpt->tid = gettid();
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
+ "replication completion", "%",
+ WORKER_METRIC_ABSOLUTE);
- ND_LOG_STACK lgs[] = {
- ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
- ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port),
- ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rpt->hostname),
- ND_LOG_FIELD_CB(NDF_SRC_TRANSPORT, stream_receiver_log_transport, rpt),
- ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_receiver_log_capabilities, rpt),
- ND_LOG_FIELD_END(),
- };
- ND_LOG_STACK_PUSH(lgs);
+ struct receiver_state *rpt = (struct receiver_state *) ptr;
+ rpt->tid = gettid_cached();
- netdata_log_info("STREAM %s [%s]:%s: receive thread started", rpt->hostname, rpt->client_ip
- , rpt->client_port);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
+ ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port),
+ ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rpt->hostname),
+ ND_LOG_FIELD_CB(NDF_SRC_TRANSPORT, stream_receiver_log_transport, rpt),
+ ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_receiver_log_capabilities, rpt),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
- rrdpush_receive(rpt);
- }
+ netdata_log_info("STREAM %s [%s]:%s: receive thread started", rpt->hostname, rpt->client_ip
+ , rpt->client_port);
- netdata_thread_cleanup_pop(1);
+ rrdpush_receive(rpt);
return NULL;
}