diff options
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 449 |
1 files changed, 179 insertions, 270 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 709f15bd..3ff022e9 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -2,17 +2,6 @@ #include "rrdpush.h" -// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly -#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) -#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2) - -// this has to be the same at parser.h -#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) - -#if WORKER_PARSER_FIRST_JOB < 1 -#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1 -#endif - extern struct config stream_config; void receiver_state_free(struct receiver_state *rpt) { @@ -39,13 +28,12 @@ void receiver_state_free(struct receiver_state *rpt) { close(rpt->fd); } -#ifdef ENABLE_COMPRESSION - if (rpt->decompressor) - rpt->decompressor->destroy(&rpt->decompressor); +#ifdef ENABLE_RRDPUSH_COMPRESSION + rrdpush_decompressor_destroy(&rpt->decompressor); #endif if(rpt->system_info) - rrdhost_system_info_free(rpt->system_info); + rrdhost_system_info_free(rpt->system_info); __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); @@ -54,125 +42,96 @@ void receiver_state_free(struct receiver_state *rpt) { #include "collectors/plugins.d/pluginsd_parser.h" -PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user) -{ - const char *host_uuid_str = get_word(words, num_words, 1); - const char *claim_id_str = get_word(words, num_words, 2); - - if (!host_uuid_str || !claim_id_str) { - error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'", - host_uuid_str ? host_uuid_str : "[unset]", - claim_id_str ? claim_id_str : "[unset]"); - return PARSER_RC_ERROR; - } - - uuid_t uuid; - RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; - - // We don't need the parsed UUID - // just do it to check the format - if(uuid_parse(host_uuid_str, uuid)) { - error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str); - return PARSER_RC_ERROR; - } - if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) { - error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str); - return PARSER_RC_ERROR; - } - - if(strcmp(host_uuid_str, host->machine_guid)) { - error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid); - return PARSER_RC_OK; //the message is OK problem must be somewhere else - } - - rrdhost_aclk_state_lock(host); - if (host->aclk_state.claimed_id) - freez(host->aclk_state.claimed_id); - host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; - rrdhost_aclk_state_unlock(host); - - rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE); +// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly +#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) +#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2) - rrdpush_claimed_id(host); +// this has to be the same at parser.h +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) - return PARSER_RC_OK; -} +#if WORKER_PARSER_FIRST_JOB < 1 +#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1 +#endif -static int read_stream(struct receiver_state *r, char* buffer, size_t size) { +static inline int read_stream(struct receiver_state *r, char* buffer, size_t size) { if(unlikely(!size)) { internal_error(true, "%s() asked to read zero bytes", __FUNCTION__); return 0; } + int tries = 100; ssize_t bytes_read; + do { + errno = 0; + #ifdef ENABLE_HTTPS - if (SSL_connection(&r->ssl)) - bytes_read = netdata_ssl_read(&r->ssl, buffer, size); - else - bytes_read = read(r->fd, buffer, size); + if (SSL_connection(&r->ssl)) + bytes_read = netdata_ssl_read(&r->ssl, buffer, size); + else + bytes_read = read(r->fd, buffer, size); #else - bytes_read = read(r->fd, buffer, size); + bytes_read = read(r->fd, buffer, size); #endif + } while(bytes_read < 0 && errno == EINTR && tries--); + if((bytes_read == 0 || bytes_read == -1) && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) { - error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__); + netdata_log_error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__); bytes_read = -3; } else if (bytes_read == 0) { - error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__); + netdata_log_error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__); bytes_read = -1; } else if (bytes_read < 0) { - error("STREAM: %s() failed to read from socket!", __FUNCTION__); + netdata_log_error("STREAM: %s() failed to read from socket!", __FUNCTION__); bytes_read = -2; } return (int)bytes_read; } -static bool receiver_read_uncompressed(struct receiver_state *r) { +static inline bool receiver_read_uncompressed(struct receiver_state *r) { #ifdef NETDATA_INTERNAL_CHECKS - if(r->read_buffer[r->read_len] != '\0') + if(r->reader.read_buffer[r->reader.read_len] != '\0') fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); #endif - int bytes_read = read_stream(r, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1); + int bytes_read = read_stream(r, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1); if(unlikely(bytes_read <= 0)) return false; worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read); - r->read_len += bytes_read; - r->read_buffer[r->read_len] = '\0'; + r->reader.read_len += bytes_read; + r->reader.read_buffer[r->reader.read_len] = '\0'; return true; } -#ifdef ENABLE_COMPRESSION -static bool receiver_read_compressed(struct receiver_state *r) { +#ifdef ENABLE_RRDPUSH_COMPRESSION +static inline bool receiver_read_compressed(struct receiver_state *r) { -#ifdef NETDATA_INTERNAL_CHECKS - if(r->read_buffer[r->read_len] != '\0') - fatal("%s: read_buffer does not start with zero #2", __FUNCTION__ ); -#endif + internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0', + "%s: read_buffer does not start with zero #2", __FUNCTION__ ); // first use any available uncompressed data - if (r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { - size_t available = sizeof(r->read_buffer) - r->read_len - 1; - if (available) { - size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available); - if (!len) { + if (likely(rrdpush_decompressed_bytes_in_buffer(&r->decompressor))) { + size_t available = sizeof(r->reader.read_buffer) - r->reader.read_len - 1; + if (likely(available)) { + size_t len = rrdpush_decompressor_get(&r->decompressor, r->reader.read_buffer + r->reader.read_len, available); + if (unlikely(!len)) { internal_error(true, "decompressor returned zero length #1"); return false; } - r->read_len += (int)len; - r->read_buffer[r->read_len] = '\0'; + r->reader.read_len += (int)len; + r->reader.read_buffer[r->reader.read_len] = '\0'; } else - internal_error(true, "The line to read is too big! Already have %d bytes in read_buffer.", r->read_len); + internal_fatal(true, "The line to read is too big! Already have %zd bytes in read_buffer.", r->reader.read_len); return true; } @@ -180,8 +139,9 @@ static bool receiver_read_compressed(struct receiver_state *r) { // no decompressed data available // read the compression signature of the next block - if(unlikely(r->read_len + r->decompressor->signature_size > sizeof(r->read_buffer) - 1)) { - internal_error(true, "The last incomplete line does not leave enough room for the next compression header! Already have %d bytes in read_buffer.", r->read_len); + if(unlikely(r->reader.read_len + r->decompressor.signature_size > sizeof(r->reader.read_buffer) - 1)) { + internal_error(true, "The last incomplete line does not leave enough room for the next compression header! " + "Already have %zd bytes in read_buffer.", r->reader.read_len); return false; } @@ -189,34 +149,34 @@ static bool receiver_read_compressed(struct receiver_state *r) { // we have to do a loop here, because read_stream() may return less than the data we need int bytes_read = 0; do { - int ret = read_stream(r, r->read_buffer + r->read_len + bytes_read, r->decompressor->signature_size - bytes_read); + int ret = read_stream(r, r->reader.read_buffer + r->reader.read_len + bytes_read, r->decompressor.signature_size - bytes_read); if (unlikely(ret <= 0)) return false; bytes_read += ret; - } while(unlikely(bytes_read < (int)r->decompressor->signature_size)); + } while(unlikely(bytes_read < (int)r->decompressor.signature_size)); worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); - if(unlikely(bytes_read != (int)r->decompressor->signature_size)) - fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor->signature_size); + if(unlikely(bytes_read != (int)r->decompressor.signature_size)) + fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor.signature_size); - size_t compressed_message_size = r->decompressor->start(r->decompressor, r->read_buffer + r->read_len, bytes_read); + size_t compressed_message_size = rrdpush_decompressor_start(&r->decompressor, r->reader.read_buffer + r->reader.read_len, bytes_read); if (unlikely(!compressed_message_size)) { internal_error(true, "multiplexed uncompressed data in compressed stream!"); - r->read_len += bytes_read; - r->read_buffer[r->read_len] = '\0'; + r->reader.read_len += bytes_read; + r->reader.read_buffer[r->reader.read_len] = '\0'; return true; } if(unlikely(compressed_message_size > COMPRESSION_MAX_MSG_SIZE)) { - error("received a compressed message of %zu bytes, which is bigger than the max compressed message size supported of %zu. Ignoring message.", + netdata_log_error("received a compressed message of %zu bytes, which is bigger than the max compressed message size supported of %zu. Ignoring message.", compressed_message_size, (size_t)COMPRESSION_MAX_MSG_SIZE); return false; } // delete compression header from our read buffer - r->read_buffer[r->read_len] = '\0'; + r->reader.read_buffer[r->reader.read_len] = '\0'; // Read the entire compressed block of compressed data char compressed[compressed_message_size]; @@ -238,8 +198,8 @@ static bool receiver_read_compressed(struct receiver_state *r) { worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)compressed_bytes_read); // decompress the compressed block - size_t bytes_to_parse = r->decompressor->decompress(r->decompressor, compressed, compressed_bytes_read); - if (!bytes_to_parse) { + size_t bytes_to_parse = rrdpush_decompress(&r->decompressor, compressed, compressed_bytes_read); + if (unlikely(!bytes_to_parse)) { internal_error(true, "no bytes to parse."); return false; } @@ -247,38 +207,38 @@ static bool receiver_read_compressed(struct receiver_state *r) { worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_to_parse); // fill read buffer with decompressed data - size_t len = (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1); - if (!len) { + size_t len = (int) rrdpush_decompressor_get(&r->decompressor, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1); + if (unlikely(!len)) { internal_error(true, "decompressor returned zero length #2"); return false; } - r->read_len += (int)len; - r->read_buffer[r->read_len] = '\0'; + r->reader.read_len += (int)len; + r->reader.read_buffer[r->reader.read_len] = '\0'; return true; } -#else // !ENABLE_COMPRESSION -static bool receiver_read_compressed(struct receiver_state *r) { +#else // !ENABLE_RRDPUSH_COMPRESSION +static inline bool receiver_read_compressed(struct receiver_state *r) { return receiver_read_uncompressed(r); } -#endif // ENABLE_COMPRESSION +#endif // ENABLE_RRDPUSH_COMPRESSION /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. */ -static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) { - size_t start = *pos; +inline char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size) { + size_t start = reader->pos; - char *ss = &r->read_buffer[start]; - char *se = &r->read_buffer[r->read_len]; - char *ds = buffer; - char *de = &buffer[buffer_length - 2]; + char *ss = &reader->read_buffer[start]; + char *se = &reader->read_buffer[reader->read_len]; + char *ds = dst; + char *de = &dst[dst_size - 2]; if(ss >= se) { *ds = '\0'; - *pos = 0; - r->read_len = 0; - r->read_buffer[r->read_len] = '\0'; + reader->pos = 0; + reader->read_len = 0; + reader->read_buffer[reader->read_len] = '\0'; return NULL; } @@ -293,44 +253,73 @@ static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t b *ds++ = *ss++; // copy the newline too *ds = '\0'; - *pos = ss - r->read_buffer; - return buffer; + reader->pos = ss - reader->read_buffer; + return dst; } // if the destination is full, oops! if(ds == de) { - error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX); + netdata_log_error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX); *ds = '\0'; - *pos = ss - r->read_buffer; - return buffer; + reader->pos = ss - reader->read_buffer; + return dst; } // no newline found in the r->read_buffer // move everything to the beginning - memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start); - r->read_len -= (int)start; - r->read_buffer[r->read_len] = '\0'; + memmove(reader->read_buffer, &reader->read_buffer[start], reader->read_len - start); + reader->read_len -= (int)start; + reader->read_buffer[reader->read_len] = '\0'; *ds = '\0'; - *pos = 0; + reader->pos = 0; return NULL; } bool plugin_is_enabled(struct plugind *cd); +static void receiver_set_exit_reason(struct receiver_state *rpt, STREAM_HANDSHAKE reason, bool force) { + if(force || !rpt->exit.reason) + rpt->exit.reason = reason; +} + +static inline bool receiver_should_stop(struct receiver_state *rpt) { + static __thread size_t counter = 0; + + if(unlikely(rpt->exit.shutdown)) { + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false); + return true; + } + + if(unlikely(!service_running(SERVICE_STREAMING))) { + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, false); + return true; + } + + if(unlikely((counter++ % 1000) == 0)) { + // check every 1000 lines read + netdata_thread_testcancel(); + rpt->last_msg_t = now_monotonic_sec(); + } + + return false; +} + static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) { - size_t result; - - PARSER_USER_OBJECT user = { - .enabled = plugin_is_enabled(cd), - .host = rpt->host, - .opaque = rpt, - .cd = cd, - .trust_durations = 1, - .capabilities = rpt->capabilities, - }; + size_t result = 0; - PARSER *parser = parser_init(&user, NULL, NULL, fd, - PARSER_INPUT_SPLIT, ssl); + PARSER *parser = NULL; + { + PARSER_USER_OBJECT user = { + .enabled = plugin_is_enabled(cd), + .host = rpt->host, + .opaque = rpt, + .cd = cd, + .trust_durations = 1, + .capabilities = rpt->capabilities, + }; + + parser = parser_init(&user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl); + } pluginsd_keywords_init(parser, PARSER_INIT_STREAMING); @@ -340,72 +329,41 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); - - user.parser = parser; - bool compressed_connection = false; -#ifdef ENABLE_COMPRESSION + +#ifdef ENABLE_RRDPUSH_COMPRESSION if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { compressed_connection = true; - - if (!rpt->decompressor) - rpt->decompressor = create_decompressor(); - else - rpt->decompressor->reset(rpt->decompressor); + rrdpush_decompressor_reset(&rpt->decompressor); } + else + rrdpush_decompressor_destroy(&rpt->decompressor); #endif - rpt->read_buffer[0] = '\0'; - rpt->read_len = 0; + buffered_reader_init(&rpt->reader); - size_t read_buffer_start = 0; char buffer[PLUGINSD_LINE_MAX + 2] = ""; - while(service_running(SERVICE_STREAMING)) { - netdata_thread_testcancel(); + while(!receiver_should_stop(rpt)) { - if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) { - bool have_new_data; - if(likely(compressed_connection)) - have_new_data = receiver_read_compressed(rpt); - else - have_new_data = receiver_read_uncompressed(rpt); + if(!buffered_reader_next_line(&rpt->reader, buffer, PLUGINSD_LINE_MAX + 2)) { + bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt); if(unlikely(!have_new_data)) { - if(!rpt->exit.reason) - rpt->exit.reason = "SOCKET READ ERROR"; - + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, false); break; } - rpt->last_msg_t = now_realtime_sec(); continue; } - if(unlikely(!service_running(SERVICE_STREAMING))) { - if(!rpt->exit.reason) - rpt->exit.reason = "NETDATA EXIT"; - goto done; - } - if(unlikely(rpt->exit.shutdown)) { - if(!rpt->exit.reason) - rpt->exit.reason = "SHUTDOWN REQUESTED"; - - goto done; - } - if (unlikely(parser_action(parser, buffer))) { internal_error(true, "parser_action() failed on keyword '%s'.", buffer); - - if(!rpt->exit.reason) - rpt->exit.reason = "PARSER FAILED"; - + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); break; } } -done: - result = user.data_collections_count; + result = parser->user.data_collections_count; // free parser with the pop function netdata_thread_cleanup_pop(1); @@ -423,67 +381,18 @@ static void rrdpush_receiver_replication_reset(RRDHOST *host) { rrdhost_receiver_replicating_charts_zero(host); } -void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) { - size_t receiver_hops = host->system_info ? host->system_info->hops : (host == localhost) ? 0 : 1; - - netdata_mutex_lock(&host->receiver_lock); - - buffer_json_member_add_object(wb, key); - buffer_json_member_add_uint64(wb, "hops", receiver_hops); - - bool online = host == localhost || !rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); - buffer_json_member_add_boolean(wb, "online", online); - - if(host->child_connect_time || host->child_disconnected_time) { - time_t since = MAX(host->child_connect_time, host->child_disconnected_time); - buffer_json_member_add_time_t(wb, "since", since); - buffer_json_member_add_time_t(wb, "age", now - since); - } - - if(!online && host->rrdpush_last_receiver_exit_reason) - buffer_json_member_add_string(wb, "reason", host->rrdpush_last_receiver_exit_reason); - - if(host != localhost && host->receiver) { - buffer_json_member_add_object(wb, "replication"); - { - size_t instances = rrdhost_receiver_replicating_charts(host); - buffer_json_member_add_boolean(wb, "in_progress", instances); - buffer_json_member_add_double(wb, "completion", host->rrdpush_receiver_replication_percent); - buffer_json_member_add_uint64(wb, "instances", instances); - } - buffer_json_object_close(wb); // replication - - buffer_json_member_add_object(wb, "source"); - { - - char buf[1024 + 1]; - SOCKET_PEERS peers = socket_peers(host->receiver->fd); - bool ssl = SSL_connection(&host->receiver->ssl); - - snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : ""); - buffer_json_member_add_string(wb, "local", buf); - - snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : ""); - buffer_json_member_add_string(wb, "remote", buf); - - stream_capabilities_to_json_array(wb, host->receiver->capabilities, "capabilities"); - } - buffer_json_object_close(wb); // source - } - buffer_json_object_close(wb); // collection - - netdata_mutex_unlock(&host->receiver_lock); -} - static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { bool signal_rrdcontext = false; bool set_this = false; netdata_mutex_lock(&host->receiver_lock); - if (!host->receiver || host->receiver == rpt) { + if (!host->receiver) { rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); + host->rrdpush_receiver_connection_counter++; + __atomic_add_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED); + host->receiver = rpt; rpt->host = host; @@ -495,13 +404,15 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) { if (rpt->config.alarms_delay > 0) { host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay; - log_health( + netdata_log_health( "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", rrdhost_hostname(host), (int64_t) rpt->config.alarms_delay); } } + host->health_log.health_log_history = rpt->config.alarms_history; + // this is a test // if(rpt->hops <= host->sender->hops) // rrdpush_sender_thread_stop(host, "HOPS MISMATCH", false); @@ -534,6 +445,9 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { // Make sure that we detach this thread and don't kill a freshly arriving receiver if(host->receiver == rpt) { + __atomic_sub_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED); + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + host->trigger_chart_obsoletion_check = 0; host->child_connect_time = 0; host->child_disconnected_time = now_realtime_sec(); @@ -541,7 +455,7 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { if (rpt->config.health_enabled == CONFIG_BOOLEAN_AUTO) host->health.health_enabled = 0; - rrdpush_sender_thread_stop(host, "RECEIVER LEFT", false); + rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false); signal_rrdcontext = true; rrdpush_receiver_replication_reset(host); @@ -560,7 +474,7 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { } } -bool stop_streaming_receiver(RRDHOST *host, const char *reason) { +bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) { bool ret = false; netdata_mutex_lock(&host->receiver_lock); @@ -568,7 +482,7 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) { if(host->receiver) { if(!host->receiver->exit.shutdown) { host->receiver->exit.shutdown = true; - host->receiver->exit.reason = reason; + receiver_set_exit_reason(host->receiver, reason, true); shutdown(host->receiver->fd, SHUT_RDWR); } @@ -586,7 +500,7 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) { } if(host->receiver) - error("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_error("STREAM '%s' [receive from [%s]:%s]: " "thread %d takes too long to stop, giving up..." , rrdhost_hostname(host) , host->receiver->client_ip, host->receiver->client_port @@ -619,25 +533,20 @@ void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, con (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-", status); - info("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "%s. " "STATUS: %s%s%s%s" , rpt->hostname , rpt->client_ip, rpt->client_port , msg , status - , rpt->exit.reason?" (":"" - , rpt->exit.reason?rpt->exit.reason:"" - , rpt->exit.reason?")":"" + , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":"" + , stream_handshake_error_to_string(rpt->exit.reason) + , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":"" ); } -static void rrdhost_reset_destinations(RRDHOST *host) { - for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) - d->postpone_reconnection_until = 0; -} - static void rrdpush_receive(struct receiver_state *rpt) { rpt->config.mode = default_rrd_memory_mode; @@ -645,6 +554,7 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.health_enabled = (int)default_health_enabled; rpt->config.alarms_delay = 60; + rpt->config.alarms_history = HEALTH_LOG_DEFAULT_HISTORY; rpt->config.rrdpush_enabled = (int)default_rrdpush_enabled; rpt->config.rrdpush_destination = default_rrdpush_destination; @@ -666,7 +576,7 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode))); if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { - error("STREAM '%s' [receive from %s:%s]: " + netdata_log_error("STREAM '%s' [receive from %s:%s]: " "dbengine is not enabled, falling back to default." , rpt->hostname , rpt->client_ip, rpt->client_port @@ -681,6 +591,9 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", rpt->config.alarms_delay); rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", rpt->config.alarms_delay); + rpt->config.alarms_history = appconfig_get_number(&stream_config, rpt->key, "default health log history", rpt->config.alarms_history); + rpt->config.alarms_history = appconfig_get_number(&stream_config, rpt->machine_guid, "health log history", rpt->config.alarms_history); + rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rpt->config.rrdpush_enabled); rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rpt->config.rrdpush_enabled); @@ -702,12 +615,11 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step); rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step); -#ifdef ENABLE_COMPRESSION - rpt->config.rrdpush_compression = default_compression_enabled; +#ifdef ENABLE_RRDPUSH_COMPRESSION + rpt->config.rrdpush_compression = default_rrdpush_compression_enabled; rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression); rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression); - rpt->rrdpush_compression = (rpt->config.rrdpush_compression && default_compression_enabled); -#endif //ENABLE_COMPRESSION +#endif // ENABLE_RRDPUSH_COMPRESSION (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); @@ -763,9 +675,9 @@ static void rrdpush_receive(struct receiver_state *rpt) } #ifdef NETDATA_INTERNAL_CHECKS - info("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "client willing to stream metrics for host '%s' with machine_guid '%s': " - "update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'" + "update every = %d, history = %d, memory mode = %s, health %s,%s tags '%s'" , rpt->hostname , rpt->client_ip , rpt->client_port @@ -801,15 +713,15 @@ static void rrdpush_receive(struct receiver_state *rpt) snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port); -#ifdef ENABLE_COMPRESSION +#ifdef ENABLE_RRDPUSH_COMPRESSION if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { - if (!rpt->rrdpush_compression) + if (!rpt->config.rrdpush_compression) rpt->capabilities &= ~STREAM_CAP_COMPRESSION; } -#endif +#endif // ENABLE_RRDPUSH_COMPRESSION { - // info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + // netdata_log_info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); char initial_response[HTTP_HEADER_SIZE]; if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) { log_receiver_capabilities(rpt); @@ -828,7 +740,7 @@ static void rrdpush_receive(struct receiver_state *rpt) sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1); } - debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); + netdata_log_debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); ssize_t bytes_sent = send_timeout( #ifdef ENABLE_HTTPS &rpt->ssl, @@ -845,7 +757,7 @@ static void rrdpush_receive(struct receiver_state *rpt) { // remove the non-blocking flag from the socket if(sock_delnonblock(rpt->fd) < 0) - error("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_error("STREAM '%s' [receive from [%s]:%s]: " "cannot remove the non-blocking flag from socket %d" , rrdhost_hostname(rpt->host) , rpt->client_ip, rpt->client_port @@ -855,7 +767,7 @@ static void rrdpush_receive(struct receiver_state *rpt) timeout.tv_sec = 600; timeout.tv_usec = 0; if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) - error("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_error("STREAM '%s' [receive from [%s]:%s]: " "cannot set timeout for socket %d" , rrdhost_hostname(rpt->host) , rpt->client_ip, rpt->client_port @@ -867,14 +779,14 @@ static void rrdpush_receive(struct receiver_state *rpt) #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // new child connected - if (netdata_cloud_setting) + if (netdata_cloud_enabled) aclk_host_state_update(rpt->host, 1); #endif - rrdhost_set_is_parent_label(++localhost->connected_children_count); + rrdhost_set_is_parent_label(); // let it reconnect to parent immediately - rrdhost_reset_destinations(rpt->host); + rrdpush_reset_destinations_postpone_time(rpt->host); size_t count = streaming_parser(rpt, &cd, rpt->fd, #ifdef ENABLE_HTTPS @@ -884,10 +796,7 @@ static void rrdpush_receive(struct receiver_state *rpt) #endif ); - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); - - if(!rpt->exit.reason) - rpt->exit.reason = "PARSER EXIT"; + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, false); { char msg[100 + 1]; @@ -898,12 +807,10 @@ static void rrdpush_receive(struct receiver_state *rpt) #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // a child disconnected - if (netdata_cloud_setting) + if (netdata_cloud_enabled) aclk_host_state_update(rpt->host, 0); #endif - rrdhost_set_is_parent_label(--localhost->connected_children_count); - cleanup: ; } @@ -914,13 +821,15 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) { rrdhost_clear_receiver(rpt); - info("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "receive thread ended (task id %d)" , rpt->hostname ? rpt->hostname : "-" , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-" , gettid()); receiver_state_free(rpt); + + rrdhost_set_is_parent_label(); } void *rrdpush_receiver_thread(void *ptr) { @@ -933,7 +842,7 @@ void *rrdpush_receiver_thread(void *ptr) { struct receiver_state *rpt = (struct receiver_state *)ptr; rpt->tid = gettid(); - info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid); + netdata_log_info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid); rrdpush_receive(rpt); |