diff options
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 708 |
1 files changed, 375 insertions, 333 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 0890ebbcd..61ee33bc4 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -1,6 +1,18 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" +#include "parser/parser.h" + +// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly +#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) +#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2) + +// this has to be the same at parser.h +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) + +#if WORKER_PARSER_FIRST_JOB < 1 +#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1 +#endif extern struct config stream_config; @@ -58,105 +70,43 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) { #include "collectors/plugins.d/pluginsd_parser.h" -PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user) { - UNUSED(plugins_action); - char *remote_time_txt = words[1]; - time_t remote_time = 0; - RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; - struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd; - if (cd->version < VERSION_GAP_FILLING ) { - error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", host->hostname, cd->cmd, - cd->version); - return PARSER_RC_OK; // Ignore error and continue stream - } - if (remote_time_txt && *remote_time_txt) { - remote_time = str2ull(remote_time_txt); - time_t now = now_realtime_sec(), prev = rrdhost_last_entry_t(host); - time_t gap = 0; - if (prev == 0) - info( - "STREAM %s from %s: Initial connection (no gap to check), " - "remote=%"PRId64" local=%"PRId64" slew=%"PRId64"", - host->hostname, - cd->cmd, - (int64_t)remote_time, - (int64_t)now, - (int64_t)now - remote_time); - else { - gap = now - prev; - info( - "STREAM %s from %s: Checking for gaps... " - "remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap", - host->hostname, - cd->cmd, - (int64_t)remote_time, - (int64_t)prev, - (int64_t)now, - (int64_t)(remote_time - now), - (int64_t)gap); - } - char message[128]; - sprintf( - message, - "REPLICATE %"PRId64" %"PRId64"\n", - (int64_t)(remote_time - gap), - (int64_t)remote_time); - int ret; -#ifdef ENABLE_HTTPS - SSL *conn = host->stream_ssl.conn ; - if(conn && !host->stream_ssl.flags) { - ret = SSL_write(conn, message, strlen(message)); - } else { - ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT); - } -#else - ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT); -#endif - if (ret != (int)strlen(message)) - error("Failed to send initial timestamp - gaps may appear in charts"); - return PARSER_RC_OK; - } - return PARSER_RC_ERROR; -} + const char *host_uuid_str = get_word(words, num_words, 1); + const char *claim_id_str = get_word(words, num_words, 2); -#define CLAIMED_ID_MIN_WORDS 3 -PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugins_action) -{ - UNUSED(plugins_action); + if (!host_uuid_str || !claim_id_str) { + error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'", + host_uuid_str ? host_uuid_str : "[unset]", + claim_id_str ? claim_id_str : "[unset]"); + return PARSER_RC_ERROR; + } - int i; uuid_t uuid; RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; - for (i = 0; words[i]; i++) ; - if (i != CLAIMED_ID_MIN_WORDS) { - error("Command CLAIMED_ID came malformed %d parameters are expected but %d received", CLAIMED_ID_MIN_WORDS - 1, i - 1); - return PARSER_RC_ERROR; - } - // We don't need the parsed UUID // just do it to check the format - if(uuid_parse(words[1], uuid)) { - error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[1]); + if(uuid_parse(host_uuid_str, uuid)) { + error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str); return PARSER_RC_ERROR; } - if(uuid_parse(words[2], uuid) && strcmp(words[2], "NULL")) { - error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[2]); + if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) { + error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str); return PARSER_RC_ERROR; } - if(strcmp(words[1], host->machine_guid)) { - error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", words[1], host->machine_guid); + if(strcmp(host_uuid_str, host->machine_guid)) { + error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid); return PARSER_RC_OK; //the message is OK problem must be somewhere else } rrdhost_aclk_state_lock(host); if (host->aclk_state.claimed_id) freez(host->aclk_state.claimed_id); - host->aclk_state.claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL; + host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; - store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL); + metaqueue_store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL); rrdhost_aclk_state_unlock(host); @@ -165,197 +115,242 @@ PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugin return PARSER_RC_OK; } +static int read_stream(struct receiver_state *r, char* buffer, size_t size) { + if(unlikely(!size)) { + internal_error(true, "%s() asked to read zero bytes", __FUNCTION__); + return 0; + } -#ifndef ENABLE_COMPRESSION -/* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing. - */ -static int receiver_read(struct receiver_state *r, FILE *fp) { #ifdef ENABLE_HTTPS - if (r->ssl.conn && !r->ssl.flags) { - ERR_clear_error(); - int desired = sizeof(r->read_buffer) - r->read_len - 1; - int ret = SSL_read(r->ssl.conn, r->read_buffer + r->read_len, desired); - if (ret > 0 ) { - r->read_len += ret; - return 0; - } - // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket - u_long err; - char buf[256]; - while ((err = ERR_get_error()) != 0) { - ERR_error_string_n(err, buf, sizeof(buf)); - error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf); - } - return 1; + if (r->ssl.conn && r->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) + return (int)netdata_ssl_read(r->ssl.conn, buffer, size); +#endif + + ssize_t bytes_read = read(r->fd, buffer, size); + if(bytes_read == 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) { + error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__); + bytes_read = -3; } + else if (bytes_read == 0) { + error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__); + bytes_read = -1; + } + else if (bytes_read < 0) { + error("STREAM: %s() failed to read from socket!", __FUNCTION__); + bytes_read = -2; + } + +// do { +// bytes_read = (int) fread(buffer, 1, size, fp); +// if (unlikely(bytes_read <= 0)) { +// if(feof(fp)) { +// internal_error(true, "%s(): fread() failed with EOF", __FUNCTION__); +// bytes_read = -2; +// } +// else if(ferror(fp)) { +// internal_error(true, "%s(): fread() failed with ERROR", __FUNCTION__); +// bytes_read = -3; +// } +// else bytes_read = 0; +// } +// else +// worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, bytes_read); +// } while(bytes_read == 0); + + return (int)bytes_read; +} + +static bool receiver_read_uncompressed(struct receiver_state *r) { +#ifdef NETDATA_INTERNAL_CHECKS + if(r->read_buffer[r->read_len] != '\0') + fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); #endif - if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp)) - return 1; - r->read_len = strlen(r->read_buffer); - return 0; + + int bytes_read = read_stream(r, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1); + if(unlikely(bytes_read <= 0)) + return false; + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read); + + r->read_len += bytes_read; + r->read_buffer[r->read_len] = '\0'; + + return true; } -#else -/* - * The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing. - * if SSL encryption is on, then use SSL API for reading stream data. - * Use line oriented fgets() in buffer from receiver_state is provided. - * In other cases use fread to read binary data from socket. - * Return zero on success and the number of bytes were read using pointer in the last argument. - */ -static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t size, int* ret) { - if (!ret) - return 1; - *ret = 0; -#ifdef ENABLE_HTTPS - if (r->ssl.conn && !r->ssl.flags) { - ERR_clear_error(); - if (buffer != r->read_buffer + r->read_len) { - *ret = SSL_read(r->ssl.conn, buffer, size); - if (*ret > 0 ) - return 0; - } else { - // we need to receive data with LF to parse compression header - size_t ofs = 0; - int res = 0; - errno = 0; - while (ofs < size) { - do { - res = SSL_read(r->ssl.conn, buffer + ofs, 1); - // When either SSL_ERROR_SYSCALL (OpenSSL < 3.0) or SSL_ERROR_SSL(OpenSSL > 3.0) happens, - // the connection was lost https://www.openssl.org/docs/man3.0/man3/SSL_get_error.html, - // without the test we will have an infinite loop https://github.com/netdata/netdata/issues/13092 - int local_ssl_err = SSL_get_error(r->ssl.conn, res); - if (local_ssl_err == SSL_ERROR_SYSCALL || local_ssl_err == SSL_ERROR_SSL) { - error("The SSL connection has error SSL_ERROR_SYSCALL(%d) and system is registering errno = %d", - local_ssl_err, errno); - return 1; - } - } while (res == 0); - - if (res < 0) - break; - if (buffer[ofs] == '\n') - break; - ofs += res; - } - if (res > 0) { - ofs += res; - *ret = ofs; - buffer[ofs] = 0; - return 0; + +#ifdef ENABLE_COMPRESSION +static bool receiver_read_compressed(struct receiver_state *r) { + +#ifdef NETDATA_INTERNAL_CHECKS + if(r->read_buffer[r->read_len] != '\0') + fatal("%s: read_buffer does not start with zero #2", __FUNCTION__ ); +#endif + + // first use any available uncompressed data + if (r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { + size_t available = sizeof(r->read_buffer) - r->read_len - 1; + if (available) { + size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available); + if (!len) { + internal_error(true, "decompressor returned zero length #1"); + return false; } + + r->read_len += (int)len; + r->read_buffer[r->read_len] = '\0'; } - // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket - u_long err; - char buf[256]; - while ((err = ERR_get_error()) != 0) { - ERR_error_string_n(err, buf, sizeof(buf)); - error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf); - } - return 1; + else + internal_error(true, "The line to read is too big! Already have %d bytes in read_buffer.", r->read_len); + + return true; } -#endif - if (buffer != r->read_buffer + r->read_len) { - // read to external buffer - *ret = fread(buffer, 1, size, fp); - if (!*ret) - return 1; - } else { - if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp)) - return 1; - *ret = strlen(r->read_buffer); + + // no decompressed data available + // read the compression signature of the next block + + if(unlikely(r->read_len + r->decompressor->signature_size > sizeof(r->read_buffer) - 1)) { + internal_error(true, "The last incomplete line does not leave enough room for the next compression header! Already have %d bytes in read_buffer.", r->read_len); + return false; } - return 0; -} -/* - * Get the next line of data for parsing. - * Return data from the decompressor buffer if available. - * Otherwise read next line from the socket and check for compression header. - * Return the line was read If no compression header was found. - * Otherwise read the entire block of compressed data, decompress it - * and return it in receiver_state buffer. - * Return zero on success. - */ -static int receiver_read(struct receiver_state *r, FILE *fp) { - // check any decompressed data present - if (r->decompressor && - r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { - size_t available = sizeof(r->read_buffer) - r->read_len; - if (available) { - size_t len = r->decompressor->get(r->decompressor, - r->read_buffer + r->read_len, available); - if (!len) - return 1; - r->read_len += len; - } - return 0; + // read the compression signature from the stream + // we have to do a loop here, because read_stream() may return less than the data we need + int bytes_read = 0; + do { + int ret = read_stream(r, r->read_buffer + r->read_len + bytes_read, r->decompressor->signature_size - bytes_read); + if (unlikely(ret <= 0)) + return false; + + bytes_read += ret; + } while(unlikely(bytes_read < (int)r->decompressor->signature_size)); + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); + + if(unlikely(bytes_read != (int)r->decompressor->signature_size)) + fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor->signature_size); + + size_t compressed_message_size = r->decompressor->start(r->decompressor, r->read_buffer + r->read_len, bytes_read); + if (unlikely(!compressed_message_size)) { + internal_error(true, "multiplexed uncompressed data in compressed stream!"); + r->read_len += bytes_read; + r->read_buffer[r->read_len] = '\0'; + return true; } - int ret = 0; - if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) - return 1; - - if (!is_compressed_data(r->read_buffer, ret)) { - r->read_len += ret; - return 0; + + if(unlikely(compressed_message_size > COMPRESSION_MAX_MSG_SIZE)) { + error("received a compressed message of %zu bytes, which is bigger than the max compressed message size supported of %zu. Ignoring message.", + compressed_message_size, (size_t)COMPRESSION_MAX_MSG_SIZE); + return false; } - if (unlikely(!r->decompressor)) - r->decompressor = create_decompressor(); - - size_t bytes_to_read = r->decompressor->start(r->decompressor, - r->read_buffer, ret); + // delete compression header from our read buffer + r->read_buffer[r->read_len] = '\0'; - // Read the entire block of compressed data because - // we're unable to decompress incomplete block - char compressed[bytes_to_read]; + // Read the entire compressed block of compressed data + char compressed[compressed_message_size]; + size_t compressed_bytes_read = 0; do { - if (read_stream(r, fp, compressed, bytes_to_read, &ret)) - return 1; - // Send input data to decompressor - if (ret) - r->decompressor->put(r->decompressor, compressed, ret); - bytes_to_read -= ret; - } while (bytes_to_read > 0); - // Decompress - size_t bytes_to_parse = r->decompressor->decompress(r->decompressor); - if (!bytes_to_parse) - return 1; - // Fill read buffer with decompressed data - r->read_len = r->decompressor->get(r->decompressor, - r->read_buffer, sizeof(r->read_buffer)); - return 0; -} + size_t start = compressed_bytes_read; + size_t remaining = compressed_message_size - start; -#endif + int last_read_bytes = read_stream(r, &compressed[start], remaining); + if (unlikely(last_read_bytes <= 0)) { + internal_error(true, "read_stream() failed #2, with code %d", last_read_bytes); + return false; + } + + compressed_bytes_read += last_read_bytes; + + } while(unlikely(compressed_message_size > compressed_bytes_read)); + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)compressed_bytes_read); + + // decompress the compressed block + size_t bytes_to_parse = r->decompressor->decompress(r->decompressor, compressed, compressed_bytes_read); + if (!bytes_to_parse) { + internal_error(true, "no bytes to parse."); + return false; + } + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_to_parse); + + // fill read buffer with decompressed data + size_t len = (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1); + if (!len) { + internal_error(true, "decompressor returned zero length #2"); + return false; + } + r->read_len += (int)len; + r->read_buffer[r->read_len] = '\0'; + + return true; +} +#else // !ENABLE_COMPRESSION +static bool receiver_read_compressed(struct receiver_state *r) { + return receiver_read_uncompressed(r); +} +#endif // ENABLE_COMPRESSION /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. */ -static char *receiver_next_line(struct receiver_state *r, int *pos) { - int start = *pos, scan = *pos; - if (scan >= r->read_len) { +static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) { + size_t start = *pos; + + char *ss = &r->read_buffer[start]; + char *se = &r->read_buffer[r->read_len]; + char *ds = buffer; + char *de = &buffer[buffer_length - 2]; + + if(ss >= se) { + *ds = '\0'; + *pos = 0; r->read_len = 0; + r->read_buffer[r->read_len] = '\0'; return NULL; } - while (scan < r->read_len && r->read_buffer[scan] != '\n') - scan++; - if (scan < r->read_len && r->read_buffer[scan] == '\n') { - *pos = scan+1; - r->read_buffer[scan] = 0; - return &r->read_buffer[start]; + + // copy all bytes to buffer + while(ss < se && ds < de && *ss != '\n') + *ds++ = *ss++; + + // if we have a newline, return the buffer + if(ss < se && ds < de && *ss == '\n') { + // newline found in the r->read_buffer + + *ds++ = *ss++; // copy the newline too + *ds = '\0'; + + *pos = ss - r->read_buffer; + return buffer; } + + // if the destination is full, oops! + if(ds == de) { + error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX); + *ds = '\0'; + *pos = ss - r->read_buffer; + return buffer; + } + + // no newline found in the r->read_buffer + // move everything to the beginning memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start); - r->read_len -= start; + r->read_len -= (int)start; + r->read_buffer[r->read_len] = '\0'; + *ds = '\0'; + *pos = 0; return NULL; } static void streaming_parser_thread_cleanup(void *ptr) { PARSER *parser = (PARSER *)ptr; + rrd_collector_finished(); parser_destroy(parser); } -size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) { +static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) { size_t result; PARSER_USER_OBJECT user = { @@ -366,49 +361,68 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp .trust_durations = 1 }; - PARSER *parser = parser_init(rpt->host, &user, fp, PARSER_INPUT_SPLIT); + PARSER *parser = parser_init(rpt->host, &user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl); + + rrd_collector_started(); // this keeps the parser with its current value // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser); - parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp); parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); - parser->plugins_action->begin_action = &pluginsd_begin_action; - parser->plugins_action->flush_action = &pluginsd_flush_action; - parser->plugins_action->end_action = &pluginsd_end_action; - parser->plugins_action->disable_action = &pluginsd_disable_action; - parser->plugins_action->variable_action = &pluginsd_variable_action; - parser->plugins_action->dimension_action = &pluginsd_dimension_action; - parser->plugins_action->label_action = &pluginsd_label_action; - parser->plugins_action->overwrite_action = &pluginsd_overwrite_action; - parser->plugins_action->chart_action = &pluginsd_chart_action; - parser->plugins_action->set_action = &pluginsd_set_action; - parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action; - parser->plugins_action->clabel_action = &pluginsd_clabel_action; - user.parser = parser; + bool compressed_connection = false; #ifdef ENABLE_COMPRESSION - if (rpt->decompressor) - rpt->decompressor->reset(rpt->decompressor); + if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { + compressed_connection = true; + + if (!rpt->decompressor) + rpt->decompressor = create_decompressor(); + else + rpt->decompressor->reset(rpt->decompressor); + } #endif - do{ - if (receiver_read(rpt, fp)) + rpt->read_buffer[0] = '\0'; + rpt->read_len = 0; + + size_t read_buffer_start = 0; + char buffer[PLUGINSD_LINE_MAX + 2] = ""; + while(!netdata_exit) { + if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) { + bool have_new_data; + if(compressed_connection) + have_new_data = receiver_read_compressed(rpt); + else + have_new_data = receiver_read_uncompressed(rpt); + + if(!have_new_data) + break; + + rpt->last_msg_t = now_realtime_sec(); + continue; + } + + if(unlikely(netdata_exit)) { + internal_error(true, "exiting..."); + goto done; + } + if(unlikely(rpt->shutdown)) { + internal_error(true, "parser shutdown..."); + goto done; + } + + if (unlikely(parser_action(parser, buffer))) { + internal_error(true, "parser_action() failed on keyword '%s'.", buffer); break; - int pos = 0; - char *line; - while ((line = receiver_next_line(rpt, &pos))) { - if (unlikely(netdata_exit || rpt->shutdown || parser_action(parser, line))) - goto done; } - rpt->last_msg_t = now_realtime_sec(); } - while(!netdata_exit); done: + internal_error(true, "Streaming receiver thread stopping..."); + result = user.count; // free parser with the pop function @@ -417,6 +431,15 @@ done: return result; } +static void rrdpush_receiver_replication_reset(struct receiver_state *rpt) { + RRDSET *st; + rrdset_foreach_read(st, rpt->host) { + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); + rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + } + rrdset_foreach_done(st); + rrdhost_receiver_replicating_charts_zero(rpt->host); +} static int rrdpush_receive(struct receiver_state *rpt) { @@ -427,6 +450,9 @@ static int rrdpush_receive(struct receiver_state *rpt) char *rrdpush_destination = default_rrdpush_destination; char *rrdpush_api_key = default_rrdpush_api_key; char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; + bool rrdpush_enable_replication = default_rrdpush_enable_replication; + time_t rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate; + time_t rrdpush_replication_step = default_rrdpush_replication_step; time_t alarms_delay = 60; rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every); @@ -439,13 +465,10 @@ static int rrdpush_receive(struct receiver_state *rpt) mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(mode))); mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(mode))); -#ifndef ENABLE_DBENGINE - if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE)) { - close(rpt->fd); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "REJECTED -- DBENGINE MEMORY MODE NOT SUPPORTED"); - return 1; + if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { + error("STREAM %s [receive from %s:%s]: dbengine is not enabled, falling back to default.", rpt->hostname, rpt->client_ip, rpt->client_port); + mode = default_rrd_memory_mode; } -#endif health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled); health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", health_enabled); @@ -465,6 +488,15 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching); rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching); + rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rrdpush_enable_replication); + rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rrdpush_enable_replication); + + rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rrdpush_seconds_to_replicate); + rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rrdpush_seconds_to_replicate); + + rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rrdpush_replication_step); + rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rrdpush_replication_step); + #ifdef ENABLE_COMPRESSION unsigned int rrdpush_compression = default_compression_enabled; rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression); @@ -480,14 +512,12 @@ static int rrdpush_receive(struct receiver_state *rpt) char initial_response[HTTP_HEADER_SIZE + 1]; snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); #ifdef ENABLE_HTTPS - rpt->host->stream_ssl.conn = rpt->ssl.conn; - rpt->host->stream_ssl.flags = rpt->ssl.flags; if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { #else if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { #endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send command.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); + error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); close(rpt->fd); return 0; } @@ -516,6 +546,9 @@ static int rrdpush_receive(struct receiver_state *rpt) , rrdpush_destination , rrdpush_api_key , rrdpush_send_charts_matching + , rrdpush_enable_replication + , rrdpush_seconds_to_replicate + , rrdpush_replication_step , rpt->system_info , 0 ); @@ -561,6 +594,9 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdpush_destination, rrdpush_api_key, rrdpush_send_charts_matching, + rrdpush_enable_replication, + rrdpush_seconds_to_replicate, + rrdpush_replication_step, rpt->system_info); rrd_unlock(); } @@ -575,14 +611,14 @@ static int rrdpush_receive(struct receiver_state *rpt) , rpt->hostname , rpt->client_ip , rpt->client_port - , rpt->host->hostname + , rrdhost_hostname(rpt->host) , rpt->host->machine_guid , rpt->host->rrd_update_every , rpt->host->rrd_history_entries , rrd_memory_mode_name(rpt->host->rrd_memory_mode) , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") , ssl ? " SSL," : "" - , rpt->host->tags?rpt->host->tags:"" + , rrdhost_tags(rpt->host) ); #endif // NETDATA_INTERNAL_CHECKS @@ -596,7 +632,7 @@ static int rrdpush_receive(struct receiver_state *rpt) .obsolete = 0, .started_t = now_realtime_sec(), .next = NULL, - .version = 0, + .capabilities = 0, }; // put the client IP and port into the buffers used by plugins.d @@ -605,60 +641,50 @@ static int rrdpush_receive(struct receiver_state *rpt) snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port); - info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port); - char initial_response[HTTP_HEADER_SIZE]; - if (rpt->stream_version > 1) { - if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){ #ifdef ENABLE_COMPRESSION - if(!rpt->rrdpush_compression) - rpt->stream_version = STREAM_VERSION_CLABELS; -#else - if(STREAMING_PROTOCOL_CURRENT_VERSION < rpt->stream_version) { - rpt->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; - } + if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { + if (!rpt->rrdpush_compression) + rpt->capabilities &= ~STREAM_CAP_COMPRESSION; + } #endif - } - info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version); - sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version); - } else if (rpt->stream_version == 1) { - info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version); + + info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + char initial_response[HTTP_HEADER_SIZE]; + if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities); + } + else if (stream_has_capability(rpt, STREAM_CAP_VN)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities)); + } else if (stream_has_capability(rpt, STREAM_CAP_V2)) { + log_receiver_capabilities(rpt); sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2); - } else { - info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rpt->host->hostname, rpt->client_ip, rpt->client_port); - sprintf(initial_response, "%s", START_STREAMING_PROMPT); + } else { // stream_has_capability(rpt, STREAM_CAP_V1) + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1); } debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); - #ifdef ENABLE_HTTPS - rpt->host->stream_ssl.conn = rpt->ssl.conn; - rpt->host->stream_ssl.flags = rpt->ssl.flags; +#ifdef ENABLE_HTTPS if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { #else if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { #endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); + error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); close(rpt->fd); return 0; } // remove the non-blocking flag from the socket if(sock_delnonblock(rpt->fd) < 0) - error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); + error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); struct timeval timeout; - timeout.tv_sec = 120; + timeout.tv_sec = 600; timeout.tv_usec = 0; if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) - error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); - - // convert the socket to a FILE * - FILE *fp = fdopen(rpt->fd, "r"); - if(!fp) { - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - SOCKET ERROR"); - error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); - close(rpt->fd); - return 0; - } + error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); rrdhost_wrlock(rpt->host); /* if(rpt->host->connected_senders > 0) { @@ -671,34 +697,29 @@ static int rrdpush_receive(struct receiver_state *rpt) */ // rpt->host->connected_senders++; - if(rpt->stream_version > 0) { - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); - rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); - } - else { - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); - rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); - } - if(health_enabled != CONFIG_BOOLEAN_NO) { if(alarms_delay > 0) { rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay; - info( - "Postponing health checks for %" PRId64 " seconds, on host '%s', because it was just connected.", - (int64_t)alarms_delay, - rpt->host->hostname); + log_health( + "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", + rrdhost_hostname(rpt->host), + (int64_t)alarms_delay); } } rpt->host->senders_connect_time = now_realtime_sec(); rpt->host->senders_last_chart_command = 0; rpt->host->trigger_chart_obsoletion_check = 1; + rrdhost_unlock(rpt->host); // call the plugins.d processor to receive the metrics - info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rpt->host->hostname, rpt->client_ip, rpt->client_port); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "CONNECTED"); + info("STREAM %s [receive from [%s]:%s]: receiving metrics...", + rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - cd.version = rpt->stream_version; + log_stream_connection(rpt->client_ip, rpt->client_port, + rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED"); + + cd.capabilities = rpt->capabilities; #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud @@ -707,24 +728,42 @@ static int rrdpush_receive(struct receiver_state *rpt) aclk_host_state_update(rpt->host, 1); #endif + rrdhost_set_is_parent_label(++localhost->senders_count); + + rrdpush_receiver_replication_reset(rpt); rrdcontext_host_child_connected(rpt->host); - size_t count = streaming_parser(rpt, &cd, fp); + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname, + size_t count = streaming_parser(rpt, &cd, rpt->fd, +#ifdef ENABLE_HTTPS + (rpt->ssl.conn) ? &rpt->ssl : NULL +#else + NULL +#endif + ); + + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + + log_stream_connection(rpt->client_ip, rpt->client_port, + rpt->key, rpt->host->machine_guid, rpt->hostname, "DISCONNECTED"); - error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, - rpt->client_port, count); + + error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", + rpt->hostname, rpt->client_ip, rpt->client_port, count); rrdcontext_host_child_disconnected(rpt->host); + rrdpush_receiver_replication_reset(rpt); #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud - // new child connected + // a child disconnected if (netdata_cloud_setting) aclk_host_state_update(rpt->host, 0); #endif + rrdhost_set_is_parent_label(--localhost->senders_count); + // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread if (!netdata_exit && rpt->host) { rrd_rdlock(); @@ -747,7 +786,7 @@ static int rrdpush_receive(struct receiver_state *rpt) } // cleanup - fclose(fp); + close(rpt->fd); return (int)count; } @@ -758,6 +797,9 @@ void *rrdpush_receiver_thread(void *ptr) { info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); worker_register("STREAMRCV"); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE); rrdpush_receive(rpt); worker_unregister(); |