diff options
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 809 |
1 files changed, 809 insertions, 0 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c new file mode 100644 index 0000000..61ee33b --- /dev/null +++ b/streaming/receiver.c @@ -0,0 +1,809 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "rrdpush.h" +#include "parser/parser.h" + +// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly +#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) +#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2) + +// this has to be the same at parser.h +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) + +#if WORKER_PARSER_FIRST_JOB < 1 +#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1 +#endif + +extern struct config stream_config; + +void destroy_receiver_state(struct receiver_state *rpt) { + freez(rpt->key); + freez(rpt->hostname); + freez(rpt->registry_hostname); + freez(rpt->machine_guid); + freez(rpt->os); + freez(rpt->timezone); + freez(rpt->abbrev_timezone); + freez(rpt->tags); + freez(rpt->client_ip); + freez(rpt->client_port); + freez(rpt->program_name); + freez(rpt->program_version); +#ifdef ENABLE_HTTPS + if(rpt->ssl.conn){ + SSL_free(rpt->ssl.conn); + } +#endif +#ifdef ENABLE_COMPRESSION + if (rpt->decompressor) + rpt->decompressor->destroy(&rpt->decompressor); +#endif + freez(rpt); +} + +static void rrdpush_receiver_thread_cleanup(void *ptr) { + worker_unregister(); + + static __thread int executed = 0; + if(!executed) { + executed = 1; + struct receiver_state *rpt = (struct receiver_state *) ptr; + // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch + // the host pointer as it is unpredictable when the RRDHOST is deleted. Do the cleanup from rrdhost_free(). + if (netdata_exit && rpt->host) { + rpt->exited = 1; + return; + } + + // Make sure that we detach this thread and don't kill a freshly arriving receiver + if (!netdata_exit && rpt->host) { + netdata_mutex_lock(&rpt->host->receiver_lock); + if (rpt->host->receiver == rpt) + rpt->host->receiver = NULL; + netdata_mutex_unlock(&rpt->host->receiver_lock); + } + + info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); + destroy_receiver_state(rpt); + } +} + +#include "collectors/plugins.d/pluginsd_parser.h" + +PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user) +{ + const char *host_uuid_str = get_word(words, num_words, 1); + const char *claim_id_str = get_word(words, num_words, 2); + + if (!host_uuid_str || !claim_id_str) { + error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'", + host_uuid_str ? host_uuid_str : "[unset]", + claim_id_str ? claim_id_str : "[unset]"); + return PARSER_RC_ERROR; + } + + uuid_t uuid; + RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; + + // We don't need the parsed UUID + // just do it to check the format + if(uuid_parse(host_uuid_str, uuid)) { + error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str); + return PARSER_RC_ERROR; + } + if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) { + error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str); + return PARSER_RC_ERROR; + } + + if(strcmp(host_uuid_str, host->machine_guid)) { + error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid); + return PARSER_RC_OK; //the message is OK problem must be somewhere else + } + + rrdhost_aclk_state_lock(host); + if (host->aclk_state.claimed_id) + freez(host->aclk_state.claimed_id); + host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; + + metaqueue_store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL); + + rrdhost_aclk_state_unlock(host); + + rrdpush_claimed_id(host); + + return PARSER_RC_OK; +} + +static int read_stream(struct receiver_state *r, char* buffer, size_t size) { + if(unlikely(!size)) { + internal_error(true, "%s() asked to read zero bytes", __FUNCTION__); + return 0; + } + +#ifdef ENABLE_HTTPS + if (r->ssl.conn && r->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) + return (int)netdata_ssl_read(r->ssl.conn, buffer, size); +#endif + + ssize_t bytes_read = read(r->fd, buffer, size); + if(bytes_read == 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) { + error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__); + bytes_read = -3; + } + else if (bytes_read == 0) { + error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__); + bytes_read = -1; + } + else if (bytes_read < 0) { + error("STREAM: %s() failed to read from socket!", __FUNCTION__); + bytes_read = -2; + } + +// do { +// bytes_read = (int) fread(buffer, 1, size, fp); +// if (unlikely(bytes_read <= 0)) { +// if(feof(fp)) { +// internal_error(true, "%s(): fread() failed with EOF", __FUNCTION__); +// bytes_read = -2; +// } +// else if(ferror(fp)) { +// internal_error(true, "%s(): fread() failed with ERROR", __FUNCTION__); +// bytes_read = -3; +// } +// else bytes_read = 0; +// } +// else +// worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, bytes_read); +// } while(bytes_read == 0); + + return (int)bytes_read; +} + +static bool receiver_read_uncompressed(struct receiver_state *r) { +#ifdef NETDATA_INTERNAL_CHECKS + if(r->read_buffer[r->read_len] != '\0') + fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); +#endif + + int bytes_read = read_stream(r, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1); + if(unlikely(bytes_read <= 0)) + return false; + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read); + + r->read_len += bytes_read; + r->read_buffer[r->read_len] = '\0'; + + return true; +} + +#ifdef ENABLE_COMPRESSION +static bool receiver_read_compressed(struct receiver_state *r) { + +#ifdef NETDATA_INTERNAL_CHECKS + if(r->read_buffer[r->read_len] != '\0') + fatal("%s: read_buffer does not start with zero #2", __FUNCTION__ ); +#endif + + // first use any available uncompressed data + if (r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { + size_t available = sizeof(r->read_buffer) - r->read_len - 1; + if (available) { + size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available); + if (!len) { + internal_error(true, "decompressor returned zero length #1"); + return false; + } + + r->read_len += (int)len; + r->read_buffer[r->read_len] = '\0'; + } + else + internal_error(true, "The line to read is too big! Already have %d bytes in read_buffer.", r->read_len); + + return true; + } + + // no decompressed data available + // read the compression signature of the next block + + if(unlikely(r->read_len + r->decompressor->signature_size > sizeof(r->read_buffer) - 1)) { + internal_error(true, "The last incomplete line does not leave enough room for the next compression header! Already have %d bytes in read_buffer.", r->read_len); + return false; + } + + // read the compression signature from the stream + // we have to do a loop here, because read_stream() may return less than the data we need + int bytes_read = 0; + do { + int ret = read_stream(r, r->read_buffer + r->read_len + bytes_read, r->decompressor->signature_size - bytes_read); + if (unlikely(ret <= 0)) + return false; + + bytes_read += ret; + } while(unlikely(bytes_read < (int)r->decompressor->signature_size)); + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); + + if(unlikely(bytes_read != (int)r->decompressor->signature_size)) + fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor->signature_size); + + size_t compressed_message_size = r->decompressor->start(r->decompressor, r->read_buffer + r->read_len, bytes_read); + if (unlikely(!compressed_message_size)) { + internal_error(true, "multiplexed uncompressed data in compressed stream!"); + r->read_len += bytes_read; + r->read_buffer[r->read_len] = '\0'; + return true; + } + + if(unlikely(compressed_message_size > COMPRESSION_MAX_MSG_SIZE)) { + error("received a compressed message of %zu bytes, which is bigger than the max compressed message size supported of %zu. Ignoring message.", + compressed_message_size, (size_t)COMPRESSION_MAX_MSG_SIZE); + return false; + } + + // delete compression header from our read buffer + r->read_buffer[r->read_len] = '\0'; + + // Read the entire compressed block of compressed data + char compressed[compressed_message_size]; + size_t compressed_bytes_read = 0; + do { + size_t start = compressed_bytes_read; + size_t remaining = compressed_message_size - start; + + int last_read_bytes = read_stream(r, &compressed[start], remaining); + if (unlikely(last_read_bytes <= 0)) { + internal_error(true, "read_stream() failed #2, with code %d", last_read_bytes); + return false; + } + + compressed_bytes_read += last_read_bytes; + + } while(unlikely(compressed_message_size > compressed_bytes_read)); + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)compressed_bytes_read); + + // decompress the compressed block + size_t bytes_to_parse = r->decompressor->decompress(r->decompressor, compressed, compressed_bytes_read); + if (!bytes_to_parse) { + internal_error(true, "no bytes to parse."); + return false; + } + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_to_parse); + + // fill read buffer with decompressed data + size_t len = (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1); + if (!len) { + internal_error(true, "decompressor returned zero length #2"); + return false; + } + r->read_len += (int)len; + r->read_buffer[r->read_len] = '\0'; + + return true; +} +#else // !ENABLE_COMPRESSION +static bool receiver_read_compressed(struct receiver_state *r) { + return receiver_read_uncompressed(r); +} +#endif // ENABLE_COMPRESSION + +/* Produce a full line if one exists, statefully return where we start next time. + * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. + */ +static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) { + size_t start = *pos; + + char *ss = &r->read_buffer[start]; + char *se = &r->read_buffer[r->read_len]; + char *ds = buffer; + char *de = &buffer[buffer_length - 2]; + + if(ss >= se) { + *ds = '\0'; + *pos = 0; + r->read_len = 0; + r->read_buffer[r->read_len] = '\0'; + return NULL; + } + + // copy all bytes to buffer + while(ss < se && ds < de && *ss != '\n') + *ds++ = *ss++; + + // if we have a newline, return the buffer + if(ss < se && ds < de && *ss == '\n') { + // newline found in the r->read_buffer + + *ds++ = *ss++; // copy the newline too + *ds = '\0'; + + *pos = ss - r->read_buffer; + return buffer; + } + + // if the destination is full, oops! + if(ds == de) { + error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX); + *ds = '\0'; + *pos = ss - r->read_buffer; + return buffer; + } + + // no newline found in the r->read_buffer + // move everything to the beginning + memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start); + r->read_len -= (int)start; + r->read_buffer[r->read_len] = '\0'; + *ds = '\0'; + *pos = 0; + return NULL; +} + +static void streaming_parser_thread_cleanup(void *ptr) { + PARSER *parser = (PARSER *)ptr; + rrd_collector_finished(); + parser_destroy(parser); +} + +static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) { + size_t result; + + PARSER_USER_OBJECT user = { + .enabled = cd->enabled, + .host = rpt->host, + .opaque = rpt, + .cd = cd, + .trust_durations = 1 + }; + + PARSER *parser = parser_init(rpt->host, &user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl); + + rrd_collector_started(); + + // this keeps the parser with its current value + // so, parser needs to be allocated before pushing it + netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser); + + parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); + + user.parser = parser; + + bool compressed_connection = false; +#ifdef ENABLE_COMPRESSION + if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { + compressed_connection = true; + + if (!rpt->decompressor) + rpt->decompressor = create_decompressor(); + else + rpt->decompressor->reset(rpt->decompressor); + } +#endif + + rpt->read_buffer[0] = '\0'; + rpt->read_len = 0; + + size_t read_buffer_start = 0; + char buffer[PLUGINSD_LINE_MAX + 2] = ""; + while(!netdata_exit) { + if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) { + bool have_new_data; + if(compressed_connection) + have_new_data = receiver_read_compressed(rpt); + else + have_new_data = receiver_read_uncompressed(rpt); + + if(!have_new_data) + break; + + rpt->last_msg_t = now_realtime_sec(); + continue; + } + + if(unlikely(netdata_exit)) { + internal_error(true, "exiting..."); + goto done; + } + if(unlikely(rpt->shutdown)) { + internal_error(true, "parser shutdown..."); + goto done; + } + + if (unlikely(parser_action(parser, buffer))) { + internal_error(true, "parser_action() failed on keyword '%s'.", buffer); + break; + } + } + +done: + internal_error(true, "Streaming receiver thread stopping..."); + + result = user.count; + + // free parser with the pop function + netdata_thread_cleanup_pop(1); + + return result; +} + +static void rrdpush_receiver_replication_reset(struct receiver_state *rpt) { + RRDSET *st; + rrdset_foreach_read(st, rpt->host) { + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); + rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + } + rrdset_foreach_done(st); + rrdhost_receiver_replicating_charts_zero(rpt->host); +} + +static int rrdpush_receive(struct receiver_state *rpt) +{ + int history = default_rrd_history_entries; + RRD_MEMORY_MODE mode = default_rrd_memory_mode; + int health_enabled = default_health_enabled; + int rrdpush_enabled = default_rrdpush_enabled; + char *rrdpush_destination = default_rrdpush_destination; + char *rrdpush_api_key = default_rrdpush_api_key; + char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; + bool rrdpush_enable_replication = default_rrdpush_enable_replication; + time_t rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate; + time_t rrdpush_replication_step = default_rrdpush_replication_step; + time_t alarms_delay = 60; + + rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every); + if(rpt->update_every < 0) rpt->update_every = 1; + + history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", history); + history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", history); + if(history < 5) history = 5; + + mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(mode))); + mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(mode))); + + if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { + error("STREAM %s [receive from %s:%s]: dbengine is not enabled, falling back to default.", rpt->hostname, rpt->client_ip, rpt->client_port); + mode = default_rrd_memory_mode; + } + + health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled); + health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", health_enabled); + + alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", alarms_delay); + alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", alarms_delay); + + rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rrdpush_enabled); + rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rrdpush_enabled); + + rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rrdpush_destination); + rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rrdpush_destination); + + rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rrdpush_api_key); + rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rrdpush_api_key); + + rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching); + rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching); + + rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rrdpush_enable_replication); + rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rrdpush_enable_replication); + + rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rrdpush_seconds_to_replicate); + rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rrdpush_seconds_to_replicate); + + rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rrdpush_replication_step); + rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rrdpush_replication_step); + +#ifdef ENABLE_COMPRESSION + unsigned int rrdpush_compression = default_compression_enabled; + rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression); + rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rrdpush_compression); + rpt->rrdpush_compression = (rrdpush_compression && default_compression_enabled); +#endif //ENABLE_COMPRESSION + + (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); + + if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT"); + error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child, or is this an inter-agent loop?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid); + char initial_response[HTTP_HEADER_SIZE + 1]; + snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); +#ifdef ENABLE_HTTPS + if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { +#else + if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { +#endif + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); + error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + close(rpt->fd); + return 0; + } + close(rpt->fd); + return 0; + } + + if (rpt->host==NULL) { + + rpt->host = rrdhost_find_or_create( + rpt->hostname + , rpt->registry_hostname + , rpt->machine_guid + , rpt->os + , rpt->timezone + , rpt->abbrev_timezone + , rpt->utc_offset + , rpt->tags + , rpt->program_name + , rpt->program_version + , rpt->update_every + , history + , mode + , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO) + , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) + , rrdpush_destination + , rrdpush_api_key + , rrdpush_send_charts_matching + , rrdpush_enable_replication + , rrdpush_seconds_to_replicate + , rrdpush_replication_step + , rpt->system_info + , 0 + ); + + if(!rpt->host) { + close(rpt->fd); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - CANNOT ACQUIRE HOST"); + error("STREAM %s [receive from [%s]:%s]: failed to find/create host structure.", rpt->hostname, rpt->client_ip, rpt->client_port); + return 1; + } + + netdata_mutex_lock(&rpt->host->receiver_lock); + if (rpt->host->receiver == NULL) + rpt->host->receiver = rpt; + else { + error("Multiple receivers connected for %s concurrently, cancelling this one...", rpt->machine_guid); + netdata_mutex_unlock(&rpt->host->receiver_lock); + close(rpt->fd); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - BEATEN TO HOST CREATION"); + return 1; + } + netdata_mutex_unlock(&rpt->host->receiver_lock); + } + else { + rrd_wrlock(); + rrdhost_update( + rpt->host, + rpt->hostname, + rpt->registry_hostname, + rpt->machine_guid, + rpt->os, + rpt->timezone, + rpt->abbrev_timezone, + rpt->utc_offset, + rpt->tags, + rpt->program_name, + rpt->program_version, + rpt->update_every, + history, + mode, + (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO), + (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key), + rrdpush_destination, + rrdpush_api_key, + rrdpush_send_charts_matching, + rrdpush_enable_replication, + rrdpush_seconds_to_replicate, + rrdpush_replication_step, + rpt->system_info); + rrd_unlock(); + } + +#ifdef NETDATA_INTERNAL_CHECKS + int ssl = 0; +#ifdef ENABLE_HTTPS + if (rpt->ssl.conn != NULL) + ssl = 1; +#endif + info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'" + , rpt->hostname + , rpt->client_ip + , rpt->client_port + , rrdhost_hostname(rpt->host) + , rpt->host->machine_guid + , rpt->host->rrd_update_every + , rpt->host->rrd_history_entries + , rrd_memory_mode_name(rpt->host->rrd_memory_mode) + , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") + , ssl ? " SSL," : "" + , rrdhost_tags(rpt->host) + ); +#endif // NETDATA_INTERNAL_CHECKS + + + struct plugind cd = { + .enabled = 1, + .update_every = default_rrd_update_every, + .pid = 0, + .serial_failures = 0, + .successful_collections = 0, + .obsolete = 0, + .started_t = now_realtime_sec(), + .next = NULL, + .capabilities = 0, + }; + + // put the client IP and port into the buffers used by plugins.d + snprintfz(cd.id, CONFIG_MAX_NAME, "%s:%s", rpt->client_ip, rpt->client_port); + snprintfz(cd.filename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); + snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); + snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port); + +#ifdef ENABLE_COMPRESSION + if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { + if (!rpt->rrdpush_compression) + rpt->capabilities &= ~STREAM_CAP_COMPRESSION; + } +#endif + + info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + char initial_response[HTTP_HEADER_SIZE]; + if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities); + } + else if (stream_has_capability(rpt, STREAM_CAP_VN)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities)); + } else if (stream_has_capability(rpt, STREAM_CAP_V2)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2); + } else { // stream_has_capability(rpt, STREAM_CAP_V1) + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1); + } + debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); +#ifdef ENABLE_HTTPS + if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { +#else + if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { +#endif + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); + error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + close(rpt->fd); + return 0; + } + + // remove the non-blocking flag from the socket + if(sock_delnonblock(rpt->fd) < 0) + error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); + + struct timeval timeout; + timeout.tv_sec = 600; + timeout.tv_usec = 0; + if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) + error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); + + rrdhost_wrlock(rpt->host); +/* if(rpt->host->connected_senders > 0) { + rrdhost_unlock(rpt->host); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "REJECTED - ALREADY CONNECTED"); + info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + fclose(fp); + return 0; + } +*/ + +// rpt->host->connected_senders++; + if(health_enabled != CONFIG_BOOLEAN_NO) { + if(alarms_delay > 0) { + rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay; + log_health( + "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", + rrdhost_hostname(rpt->host), + (int64_t)alarms_delay); + } + } + rpt->host->senders_connect_time = now_realtime_sec(); + rpt->host->senders_last_chart_command = 0; + rpt->host->trigger_chart_obsoletion_check = 1; + + rrdhost_unlock(rpt->host); + + // call the plugins.d processor to receive the metrics + info("STREAM %s [receive from [%s]:%s]: receiving metrics...", + rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + + log_stream_connection(rpt->client_ip, rpt->client_port, + rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED"); + + cd.capabilities = rpt->capabilities; + +#ifdef ENABLE_ACLK + // in case we have cloud connection we inform cloud + // new child connected + if (netdata_cloud_setting) + aclk_host_state_update(rpt->host, 1); +#endif + + rrdhost_set_is_parent_label(++localhost->senders_count); + + rrdpush_receiver_replication_reset(rpt); + rrdcontext_host_child_connected(rpt->host); + + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + + size_t count = streaming_parser(rpt, &cd, rpt->fd, +#ifdef ENABLE_HTTPS + (rpt->ssl.conn) ? &rpt->ssl : NULL +#else + NULL +#endif + ); + + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + + log_stream_connection(rpt->client_ip, rpt->client_port, + rpt->key, rpt->host->machine_guid, rpt->hostname, + "DISCONNECTED"); + + error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", + rpt->hostname, rpt->client_ip, rpt->client_port, count); + + rrdcontext_host_child_disconnected(rpt->host); + rrdpush_receiver_replication_reset(rpt); + +#ifdef ENABLE_ACLK + // in case we have cloud connection we inform cloud + // a child disconnected + if (netdata_cloud_setting) + aclk_host_state_update(rpt->host, 0); +#endif + + rrdhost_set_is_parent_label(--localhost->senders_count); + + // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread + if (!netdata_exit && rpt->host) { + rrd_rdlock(); + rrdhost_wrlock(rpt->host); + netdata_mutex_lock(&rpt->host->receiver_lock); + if (rpt->host->receiver == rpt) { + rpt->host->senders_connect_time = 0; + rpt->host->trigger_chart_obsoletion_check = 0; + rpt->host->senders_disconnected_time = now_realtime_sec(); + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN); + if(health_enabled == CONFIG_BOOLEAN_AUTO) + rpt->host->health_enabled = 0; + } + rrdhost_unlock(rpt->host); + if (rpt->host->receiver == rpt) { + rrdpush_sender_thread_stop(rpt->host); + } + netdata_mutex_unlock(&rpt->host->receiver_lock); + rrd_unlock(); + } + + // cleanup + close(rpt->fd); + return (int)count; +} + +void *rrdpush_receiver_thread(void *ptr) { + netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr); + + struct receiver_state *rpt = (struct receiver_state *)ptr; + info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); + + worker_register("STREAMRCV"); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE); + rrdpush_receive(rpt); + worker_unregister(); + + netdata_thread_cleanup_pop(1); + return NULL; +} + |