From 97e01009d69b8fbebfebf68f51e3d126d0ed43fc Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 30 Nov 2022 19:47:05 +0100 Subject: Merging upstream version 1.37.0. Signed-off-by: Daniel Baumann --- streaming/README.md | 3 +- streaming/compression.c | 301 +++++----- streaming/receiver.c | 708 +++++++++++++----------- streaming/replication.c | 1407 +++++++++++++++++++++++++++++++++++++++++++++++ streaming/replication.h | 33 ++ streaming/rrdpush.c | 613 ++++++++++++++------- streaming/rrdpush.h | 232 ++++++-- streaming/sender.c | 1255 ++++++++++++++++++++++++++++-------------- streaming/stream.conf | 78 ++- 9 files changed, 3410 insertions(+), 1220 deletions(-) create mode 100644 streaming/replication.c create mode 100644 streaming/replication.h (limited to 'streaming') diff --git a/streaming/README.md b/streaming/README.md index 57c392f40..58eb2cc1b 100644 --- a/streaming/README.md +++ b/streaming/README.md @@ -234,8 +234,7 @@ For Netdata v1.9+, streaming can also be monitored via `access.log`. Netdata does not activate TLS encryption by default. To encrypt streaming connections: 1. On the parent node (receiving node), [enable TLS support](/web/server/README.md#enabling-tls-support). -2. On the child node (sending node), [enable TLS support](/web/server/README.md#enabling-tls-support). -3. On the child's `stream.conf`, configure the destination as follows: +2. On the child's `stream.conf`, configure the destination as follows: ``` [stream] diff --git a/streaming/compression.c b/streaming/compression.c index d6178d6c3..7ba9dbf19 100644 --- a/streaming/compression.c +++ b/streaming/compression.c @@ -5,9 +5,7 @@ #define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION" -#define LZ4_MAX_MSG_SIZE 0x4000 -#define LZ4_STREAM_BUFFER_SIZE (0x10000 + LZ4_MAX_MSG_SIZE) - +// signature MUST end with a newline #define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24)) #define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24)) #define SIGNATURE_SIZE 4 @@ -18,8 +16,9 @@ */ struct compressor_data { LZ4_stream_t *stream; - char *stream_buffer; - size_t stream_buffer_pos; + char *input_ring_buffer; + size_t input_ring_buffer_size; + size_t input_ring_buffer_pos; }; @@ -31,9 +30,9 @@ static void lz4_compressor_reset(struct compressor_state *state) if (state->data) { if (state->data->stream) { LZ4_resetStream_fast(state->data->stream); - info("%s: Compressor Reset", STREAM_COMPRESSION_MSG); + internal_error(true, "%s: compressor reset", STREAM_COMPRESSION_MSG); } - state->data->stream_buffer_pos = 0; + state->data->input_ring_buffer_pos = 0; } } @@ -47,10 +46,10 @@ static void lz4_compressor_destroy(struct compressor_state **state) if (s->data) { if (s->data->stream) LZ4_freeStream(s->data->stream); - freez(s->data->stream_buffer); + freez(s->data->input_ring_buffer); freez(s->data); } - freez(s->buffer); + freez(s->compression_result_buffer); freez(s); *state = NULL; debug(D_STREAM, "%s: Compressor Destroyed.", STREAM_COMPRESSION_MSG); @@ -65,37 +64,53 @@ static void lz4_compressor_destroy(struct compressor_state **state) */ static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out) { - if (!state || !size || !out) + if(unlikely(!state || !size || !out)) return 0; - if (size > LZ4_MAX_MSG_SIZE) { - error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, size, LZ4_MAX_MSG_SIZE); + + if(unlikely(size > COMPRESSION_MAX_MSG_SIZE)) { + error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, (long unsigned int)size, COMPRESSION_MAX_MSG_SIZE); return 0; } + size_t max_dst_size = LZ4_COMPRESSBOUND(size); size_t data_size = max_dst_size + SIGNATURE_SIZE; - if (!state->buffer) { - state->buffer = mallocz(data_size); - state->buffer_size = data_size; - } else if (state->buffer_size < data_size) { - state->buffer = reallocz(state->buffer, data_size); - state->buffer_size = data_size; + if (!state->compression_result_buffer) { + state->compression_result_buffer = mallocz(data_size); + state->compression_result_buffer_size = data_size; + } + else if(unlikely(state->compression_result_buffer_size < data_size)) { + state->compression_result_buffer = reallocz(state->compression_result_buffer, data_size); + state->compression_result_buffer_size = data_size; } - memcpy(state->data->stream_buffer + state->data->stream_buffer_pos, data, size); - long int compressed_data_size = LZ4_compress_fast_continue(state->data->stream, - state->data->stream_buffer + state->data->stream_buffer_pos, - state->buffer + SIGNATURE_SIZE, size, max_dst_size, 1); + // the ring buffer always has space for LZ4_MAX_MSG_SIZE + memcpy(state->data->input_ring_buffer + state->data->input_ring_buffer_pos, data, size); + + // this call needs the last 64K of our previous data + // they are available in the ring buffer + long int compressed_data_size = LZ4_compress_fast_continue( + state->data->stream, + state->data->input_ring_buffer + state->data->input_ring_buffer_pos, + state->compression_result_buffer + SIGNATURE_SIZE, + size, + max_dst_size, + 1); + if (compressed_data_size < 0) { error("Data compression error: %ld", compressed_data_size); return 0; } - state->data->stream_buffer_pos += size; - if (state->data->stream_buffer_pos >= LZ4_STREAM_BUFFER_SIZE - LZ4_MAX_MSG_SIZE) - state->data->stream_buffer_pos = 0; + + // update the next writing position of the ring buffer + state->data->input_ring_buffer_pos += size; + if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE)) + state->data->input_ring_buffer_pos = 0; + + // update the signature header uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8; - *(uint32_t *)state->buffer = len | SIGNATURE; - *out = state->buffer; + *(uint32_t *)state->compression_result_buffer = len | SIGNATURE; + *out = state->compression_result_buffer; debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size); return compressed_data_size + SIGNATURE_SIZE; } @@ -114,8 +129,9 @@ struct compressor_state *create_compressor() state->data = callocz(1, sizeof(struct compressor_data)); state->data->stream = LZ4_createStream(); - state->data->stream_buffer = callocz(1, LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE)); - state->buffer_size = LZ4_STREAM_BUFFER_SIZE; + state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2); + state->data->input_ring_buffer = callocz(1, state->data->input_ring_buffer_size); + state->compression_result_buffer_size = 0; state->reset(state); debug(D_STREAM, "%s: Initialize streaming compression!", STREAM_COMPRESSION_MSG); return state; @@ -124,11 +140,12 @@ struct compressor_state *create_compressor() /* * LZ4 streaming API decompressor specific data */ -struct decompressor_data { - LZ4_streamDecode_t *stream; - char *stream_buffer; - size_t stream_buffer_size; - size_t stream_buffer_pos; +struct decompressor_stream { + LZ4_streamDecode_t *lz4_stream; + char *buffer; + size_t size; + size_t write_at; + size_t read_at; }; /* @@ -136,12 +153,12 @@ struct decompressor_data { */ static void lz4_decompressor_reset(struct decompressor_state *state) { - if (state->data) { - if (state->data->stream) - LZ4_setStreamDecode(state->data->stream, NULL, 0); - state->data->stream_buffer_pos = 0; - state->buffer_len = 0; - state->out_buffer_len = 0; + if (state->stream) { + if (state->stream->lz4_stream) + LZ4_setStreamDecode(state->stream->lz4_stream, NULL, 0); + + state->stream->write_at = 0; + state->stream->read_at = 0; } } @@ -152,173 +169,129 @@ static void lz4_decompressor_destroy(struct decompressor_state **state) { if (state && *state) { struct decompressor_state *s = *state; - if (s->data) { + if (s->stream) { debug(D_STREAM, "%s: Destroying decompressor.", STREAM_COMPRESSION_MSG); - if (s->data->stream) - LZ4_freeStreamDecode(s->data->stream); - freez(s->data->stream_buffer); - freez(s->data); + if (s->stream->lz4_stream) + LZ4_freeStreamDecode(s->stream->lz4_stream); + freez(s->stream->buffer); + freez(s->stream); } - freez(s->buffer); freez(s); *state = NULL; } } -static size_t decode_compress_header(const char *data, size_t data_size) -{ - if (!data || !data_size) +static size_t decode_compress_header(const char *data, size_t data_size) { + if (unlikely(!data || !data_size)) return 0; - if (data_size < SIGNATURE_SIZE) + + if (unlikely(data_size != SIGNATURE_SIZE)) return 0; + uint32_t sign = *(uint32_t *)data; - if ((sign & SIGNATURE_MASK) != SIGNATURE) + if (unlikely((sign & SIGNATURE_MASK) != SIGNATURE)) return 0; + size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7)); return length; } -/* - * Check input data for the compression header - * Return the size of compressed data or 0 for uncompressed data - */ -size_t is_compressed_data(const char *data, size_t data_size) -{ - return decode_compress_header(data, data_size); -} - /* * Start the collection of compressed data in an internal buffer * Return the size of compressed data or 0 for uncompressed data */ -static size_t lz4_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) -{ - size_t length = decode_compress_header(header, header_size); - if (!length) - return 0; +static size_t lz4_decompressor_start(struct decompressor_state *state __maybe_unused, const char *header, size_t header_size) { + if(unlikely(state->stream->read_at != state->stream->write_at)) + fatal("%s: asked to decompress new data, while there are unread data in the decompression buffer!" + , STREAM_COMPRESSION_MSG); - if (!state->buffer) { - state->buffer = mallocz(length); - state->buffer_size = length; - } else if (state->buffer_size < length) { - state->buffer = reallocz(state->buffer, length); - state->buffer_size = length; - } - state->buffer_len = length; - state->buffer_pos = 0; - state->out_buffer_pos = 0; - state->out_buffer_len = 0; - return length; + return decode_compress_header(header, header_size); } /* - * Add a chunk of compressed data to the internal buffer - * Return the current size of compressed data or 0 for error + * Decompress the compressed data in the internal buffer + * Return the size of uncompressed data or 0 for error */ -static size_t lz4_decompressor_put(struct decompressor_state *state, const char *data, size_t size) -{ - if (!state || !size || !data) +static size_t lz4_decompressor_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { + if (unlikely(!state || !compressed_data || !compressed_size)) return 0; - if (!state->buffer) - fatal("STREAM: No decompressor buffer allocated"); - if (state->buffer_pos + size > state->buffer_len) { - error("STREAM: Decompressor buffer overflow %lu + %lu > %lu", - state->buffer_pos, size, state->buffer_len); - size = state->buffer_len - state->buffer_pos; + if(unlikely(state->stream->read_at != state->stream->write_at)) + fatal("%s: asked to decompress new data, while there are unread data in the decompression buffer!" + , STREAM_COMPRESSION_MSG); + + if (unlikely(state->stream->write_at >= state->stream->size / 2)) { + state->stream->write_at = 0; + state->stream->read_at = 0; } - memcpy(state->buffer + state->buffer_pos, data, size); - state->buffer_pos += size; - return state->buffer_pos; -} -static size_t saving_percent(size_t comp_len, size_t src_len) -{ - if (comp_len > src_len) - comp_len = src_len; - if (!src_len) - return 0; - return 100 - comp_len * 100 / src_len; -} + long int decompressed_size = LZ4_decompress_safe_continue( + state->stream->lz4_stream + , compressed_data + , state->stream->buffer + state->stream->write_at + , (int)compressed_size + , (int)(state->stream->size - state->stream->write_at) + ); -/* - * Decompress the compressed data in the internal buffer - * Return the size of uncompressed data or 0 for error - */ -static size_t lz4_decompressor_decompress(struct decompressor_state *state) -{ - if (!state) - return 0; - if (!state->buffer) { - error("%s: No decompressor buffer allocated", STREAM_COMPRESSION_MSG); - return 0; - } - - long int decompressed_size = LZ4_decompress_safe_continue(state->data->stream, state->buffer, - state->data->stream_buffer + state->data->stream_buffer_pos, - state->buffer_len, state->data->stream_buffer_size - state->data->stream_buffer_pos); - if (decompressed_size < 0) { - error("%s: Decompressor error %ld", STREAM_COMPRESSION_MSG, decompressed_size); + if (unlikely(decompressed_size < 0)) { + error("%s: decompressor returned negative decompressed bytes: %ld", STREAM_COMPRESSION_MSG, decompressed_size); return 0; } - state->out_buffer = state->data->stream_buffer + state->data->stream_buffer_pos; - state->data->stream_buffer_pos += decompressed_size; - if (state->data->stream_buffer_pos >= state->data->stream_buffer_size - LZ4_MAX_MSG_SIZE) - state->data->stream_buffer_pos = 0; - state->out_buffer_len = decompressed_size; - state->out_buffer_pos = 0; + if(unlikely(decompressed_size + state->stream->write_at > state->stream->size)) + fatal("%s: decompressor overflown the stream_buffer. size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu" + , STREAM_COMPRESSION_MSG + , state->stream->size + , state->stream->write_at + , decompressed_size + , state->stream->write_at + decompressed_size - state->stream->size + ); - // Some compression statistics - size_t old_avg_saving = saving_percent(state->total_compressed, state->total_uncompressed); - size_t old_avg_size = state->packet_count ? state->total_uncompressed / state->packet_count : 0; + state->stream->write_at += decompressed_size; - state->total_compressed += state->buffer_len + SIGNATURE_SIZE; + // statistics + state->total_compressed += compressed_size + SIGNATURE_SIZE; state->total_uncompressed += decompressed_size; state->packet_count++; - size_t saving = saving_percent(state->buffer_len, decompressed_size); - size_t avg_saving = saving_percent(state->total_compressed, state->total_uncompressed); - size_t avg_size = state->total_uncompressed / state->packet_count; - - if (old_avg_saving != avg_saving || old_avg_size != avg_size){ - debug(D_STREAM, "%s: Saving: %lu%% (avg. %lu%%), avg.size: %lu", STREAM_COMPRESSION_MSG, saving, avg_saving, avg_size); - } return decompressed_size; } /* * Return the size of uncompressed data left in the internal buffer or 0 for error */ -static size_t lz4_decompressor_decompressed_bytes_in_buffer(struct decompressor_state *state) -{ - return state->out_buffer_len ? - state->out_buffer_len - state->out_buffer_pos : 0; +static size_t lz4_decompressor_decompressed_bytes_in_buffer(struct decompressor_state *state) { + if(unlikely(state->stream->read_at > state->stream->write_at)) + fatal("%s: invalid read/write stream positions" + , STREAM_COMPRESSION_MSG); + + return state->stream->write_at - state->stream->read_at; } /* * Fill the buffer provided with uncompressed data from the internal buffer * Return the size of uncompressed data copied or 0 for error */ -static size_t lz4_decompressor_get(struct decompressor_state *state, char *data, size_t size) -{ - if (!state || !size || !data) +static size_t lz4_decompressor_get(struct decompressor_state *state, char *dst, size_t size) { + if (unlikely(!state || !size || !dst)) return 0; - if (!state->out_buffer) - fatal("%s: No decompressor output buffer allocated", STREAM_COMPRESSION_MSG); - if (state->out_buffer_pos + size > state->out_buffer_len) - size = state->out_buffer_len - state->out_buffer_pos; - - char *p = state->out_buffer + state->out_buffer_pos, *endp = p + size, *last_lf = NULL; - for (; p < endp; ++p) - if (*p == '\n' || *p == 0) - last_lf = p; - if (last_lf) - size = last_lf + 1 - (state->out_buffer + state->out_buffer_pos); - - memcpy(data, state->out_buffer + state->out_buffer_pos, size); - state->out_buffer_pos += size; - return size; + + size_t remaining = lz4_decompressor_decompressed_bytes_in_buffer(state); + if(unlikely(!remaining)) + return 0; + + size_t bytes_to_return = size; + if(bytes_to_return > remaining) + bytes_to_return = remaining; + + memcpy(dst, state->stream->buffer + state->stream->read_at, bytes_to_return); + state->stream->read_at += bytes_to_return; + + if(unlikely(state->stream->read_at > state->stream->write_at)) + fatal("%s: invalid read/write stream positions" + , STREAM_COMPRESSION_MSG); + + return bytes_to_return; } /* @@ -328,20 +301,20 @@ static size_t lz4_decompressor_get(struct decompressor_state *state, char *data, struct decompressor_state *create_decompressor() { struct decompressor_state *state = callocz(1, sizeof(struct decompressor_state)); + state->signature_size = SIGNATURE_SIZE; state->reset = lz4_decompressor_reset; state->start = lz4_decompressor_start; - state->put = lz4_decompressor_put; state->decompress = lz4_decompressor_decompress; state->get = lz4_decompressor_get; state->decompressed_bytes_in_buffer = lz4_decompressor_decompressed_bytes_in_buffer; state->destroy = lz4_decompressor_destroy; - state->data = callocz(1, sizeof(struct decompressor_data)); - fatal_assert(state->data); - state->data->stream = LZ4_createStreamDecode(); - state->data->stream_buffer_size = LZ4_decoderRingBufferSize(LZ4_MAX_MSG_SIZE); - state->data->stream_buffer = mallocz(state->data->stream_buffer_size); - fatal_assert(state->data->stream_buffer); + state->stream = callocz(1, sizeof(struct decompressor_stream)); + fatal_assert(state->stream); + state->stream->lz4_stream = LZ4_createStreamDecode(); + state->stream->size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE) * 2; + state->stream->buffer = mallocz(state->stream->size); + fatal_assert(state->stream->buffer); state->reset(state); debug(D_STREAM, "%s: Initialize streaming decompression!", STREAM_COMPRESSION_MSG); return state; diff --git a/streaming/receiver.c b/streaming/receiver.c index 0890ebbcd..61ee33bc4 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -1,6 +1,18 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" +#include "parser/parser.h" + +// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly +#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) +#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2) + +// this has to be the same at parser.h +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) + +#if WORKER_PARSER_FIRST_JOB < 1 +#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1 +#endif extern struct config stream_config; @@ -58,105 +70,43 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) { #include "collectors/plugins.d/pluginsd_parser.h" -PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user) { - UNUSED(plugins_action); - char *remote_time_txt = words[1]; - time_t remote_time = 0; - RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; - struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd; - if (cd->version < VERSION_GAP_FILLING ) { - error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", host->hostname, cd->cmd, - cd->version); - return PARSER_RC_OK; // Ignore error and continue stream - } - if (remote_time_txt && *remote_time_txt) { - remote_time = str2ull(remote_time_txt); - time_t now = now_realtime_sec(), prev = rrdhost_last_entry_t(host); - time_t gap = 0; - if (prev == 0) - info( - "STREAM %s from %s: Initial connection (no gap to check), " - "remote=%"PRId64" local=%"PRId64" slew=%"PRId64"", - host->hostname, - cd->cmd, - (int64_t)remote_time, - (int64_t)now, - (int64_t)now - remote_time); - else { - gap = now - prev; - info( - "STREAM %s from %s: Checking for gaps... " - "remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap", - host->hostname, - cd->cmd, - (int64_t)remote_time, - (int64_t)prev, - (int64_t)now, - (int64_t)(remote_time - now), - (int64_t)gap); - } - char message[128]; - sprintf( - message, - "REPLICATE %"PRId64" %"PRId64"\n", - (int64_t)(remote_time - gap), - (int64_t)remote_time); - int ret; -#ifdef ENABLE_HTTPS - SSL *conn = host->stream_ssl.conn ; - if(conn && !host->stream_ssl.flags) { - ret = SSL_write(conn, message, strlen(message)); - } else { - ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT); - } -#else - ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT); -#endif - if (ret != (int)strlen(message)) - error("Failed to send initial timestamp - gaps may appear in charts"); - return PARSER_RC_OK; - } - return PARSER_RC_ERROR; -} + const char *host_uuid_str = get_word(words, num_words, 1); + const char *claim_id_str = get_word(words, num_words, 2); -#define CLAIMED_ID_MIN_WORDS 3 -PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugins_action) -{ - UNUSED(plugins_action); + if (!host_uuid_str || !claim_id_str) { + error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'", + host_uuid_str ? host_uuid_str : "[unset]", + claim_id_str ? claim_id_str : "[unset]"); + return PARSER_RC_ERROR; + } - int i; uuid_t uuid; RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; - for (i = 0; words[i]; i++) ; - if (i != CLAIMED_ID_MIN_WORDS) { - error("Command CLAIMED_ID came malformed %d parameters are expected but %d received", CLAIMED_ID_MIN_WORDS - 1, i - 1); - return PARSER_RC_ERROR; - } - // We don't need the parsed UUID // just do it to check the format - if(uuid_parse(words[1], uuid)) { - error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[1]); + if(uuid_parse(host_uuid_str, uuid)) { + error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str); return PARSER_RC_ERROR; } - if(uuid_parse(words[2], uuid) && strcmp(words[2], "NULL")) { - error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[2]); + if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) { + error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str); return PARSER_RC_ERROR; } - if(strcmp(words[1], host->machine_guid)) { - error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", words[1], host->machine_guid); + if(strcmp(host_uuid_str, host->machine_guid)) { + error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid); return PARSER_RC_OK; //the message is OK problem must be somewhere else } rrdhost_aclk_state_lock(host); if (host->aclk_state.claimed_id) freez(host->aclk_state.claimed_id); - host->aclk_state.claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL; + host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; - store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL); + metaqueue_store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL); rrdhost_aclk_state_unlock(host); @@ -165,197 +115,242 @@ PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugin return PARSER_RC_OK; } +static int read_stream(struct receiver_state *r, char* buffer, size_t size) { + if(unlikely(!size)) { + internal_error(true, "%s() asked to read zero bytes", __FUNCTION__); + return 0; + } -#ifndef ENABLE_COMPRESSION -/* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing. - */ -static int receiver_read(struct receiver_state *r, FILE *fp) { #ifdef ENABLE_HTTPS - if (r->ssl.conn && !r->ssl.flags) { - ERR_clear_error(); - int desired = sizeof(r->read_buffer) - r->read_len - 1; - int ret = SSL_read(r->ssl.conn, r->read_buffer + r->read_len, desired); - if (ret > 0 ) { - r->read_len += ret; - return 0; - } - // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket - u_long err; - char buf[256]; - while ((err = ERR_get_error()) != 0) { - ERR_error_string_n(err, buf, sizeof(buf)); - error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf); - } - return 1; + if (r->ssl.conn && r->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) + return (int)netdata_ssl_read(r->ssl.conn, buffer, size); +#endif + + ssize_t bytes_read = read(r->fd, buffer, size); + if(bytes_read == 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) { + error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__); + bytes_read = -3; } + else if (bytes_read == 0) { + error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__); + bytes_read = -1; + } + else if (bytes_read < 0) { + error("STREAM: %s() failed to read from socket!", __FUNCTION__); + bytes_read = -2; + } + +// do { +// bytes_read = (int) fread(buffer, 1, size, fp); +// if (unlikely(bytes_read <= 0)) { +// if(feof(fp)) { +// internal_error(true, "%s(): fread() failed with EOF", __FUNCTION__); +// bytes_read = -2; +// } +// else if(ferror(fp)) { +// internal_error(true, "%s(): fread() failed with ERROR", __FUNCTION__); +// bytes_read = -3; +// } +// else bytes_read = 0; +// } +// else +// worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, bytes_read); +// } while(bytes_read == 0); + + return (int)bytes_read; +} + +static bool receiver_read_uncompressed(struct receiver_state *r) { +#ifdef NETDATA_INTERNAL_CHECKS + if(r->read_buffer[r->read_len] != '\0') + fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); #endif - if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp)) - return 1; - r->read_len = strlen(r->read_buffer); - return 0; + + int bytes_read = read_stream(r, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1); + if(unlikely(bytes_read <= 0)) + return false; + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read); + + r->read_len += bytes_read; + r->read_buffer[r->read_len] = '\0'; + + return true; } -#else -/* - * The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing. - * if SSL encryption is on, then use SSL API for reading stream data. - * Use line oriented fgets() in buffer from receiver_state is provided. - * In other cases use fread to read binary data from socket. - * Return zero on success and the number of bytes were read using pointer in the last argument. - */ -static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t size, int* ret) { - if (!ret) - return 1; - *ret = 0; -#ifdef ENABLE_HTTPS - if (r->ssl.conn && !r->ssl.flags) { - ERR_clear_error(); - if (buffer != r->read_buffer + r->read_len) { - *ret = SSL_read(r->ssl.conn, buffer, size); - if (*ret > 0 ) - return 0; - } else { - // we need to receive data with LF to parse compression header - size_t ofs = 0; - int res = 0; - errno = 0; - while (ofs < size) { - do { - res = SSL_read(r->ssl.conn, buffer + ofs, 1); - // When either SSL_ERROR_SYSCALL (OpenSSL < 3.0) or SSL_ERROR_SSL(OpenSSL > 3.0) happens, - // the connection was lost https://www.openssl.org/docs/man3.0/man3/SSL_get_error.html, - // without the test we will have an infinite loop https://github.com/netdata/netdata/issues/13092 - int local_ssl_err = SSL_get_error(r->ssl.conn, res); - if (local_ssl_err == SSL_ERROR_SYSCALL || local_ssl_err == SSL_ERROR_SSL) { - error("The SSL connection has error SSL_ERROR_SYSCALL(%d) and system is registering errno = %d", - local_ssl_err, errno); - return 1; - } - } while (res == 0); - - if (res < 0) - break; - if (buffer[ofs] == '\n') - break; - ofs += res; - } - if (res > 0) { - ofs += res; - *ret = ofs; - buffer[ofs] = 0; - return 0; + +#ifdef ENABLE_COMPRESSION +static bool receiver_read_compressed(struct receiver_state *r) { + +#ifdef NETDATA_INTERNAL_CHECKS + if(r->read_buffer[r->read_len] != '\0') + fatal("%s: read_buffer does not start with zero #2", __FUNCTION__ ); +#endif + + // first use any available uncompressed data + if (r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { + size_t available = sizeof(r->read_buffer) - r->read_len - 1; + if (available) { + size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available); + if (!len) { + internal_error(true, "decompressor returned zero length #1"); + return false; } + + r->read_len += (int)len; + r->read_buffer[r->read_len] = '\0'; } - // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket - u_long err; - char buf[256]; - while ((err = ERR_get_error()) != 0) { - ERR_error_string_n(err, buf, sizeof(buf)); - error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf); - } - return 1; + else + internal_error(true, "The line to read is too big! Already have %d bytes in read_buffer.", r->read_len); + + return true; } -#endif - if (buffer != r->read_buffer + r->read_len) { - // read to external buffer - *ret = fread(buffer, 1, size, fp); - if (!*ret) - return 1; - } else { - if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp)) - return 1; - *ret = strlen(r->read_buffer); + + // no decompressed data available + // read the compression signature of the next block + + if(unlikely(r->read_len + r->decompressor->signature_size > sizeof(r->read_buffer) - 1)) { + internal_error(true, "The last incomplete line does not leave enough room for the next compression header! Already have %d bytes in read_buffer.", r->read_len); + return false; } - return 0; -} -/* - * Get the next line of data for parsing. - * Return data from the decompressor buffer if available. - * Otherwise read next line from the socket and check for compression header. - * Return the line was read If no compression header was found. - * Otherwise read the entire block of compressed data, decompress it - * and return it in receiver_state buffer. - * Return zero on success. - */ -static int receiver_read(struct receiver_state *r, FILE *fp) { - // check any decompressed data present - if (r->decompressor && - r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { - size_t available = sizeof(r->read_buffer) - r->read_len; - if (available) { - size_t len = r->decompressor->get(r->decompressor, - r->read_buffer + r->read_len, available); - if (!len) - return 1; - r->read_len += len; - } - return 0; + // read the compression signature from the stream + // we have to do a loop here, because read_stream() may return less than the data we need + int bytes_read = 0; + do { + int ret = read_stream(r, r->read_buffer + r->read_len + bytes_read, r->decompressor->signature_size - bytes_read); + if (unlikely(ret <= 0)) + return false; + + bytes_read += ret; + } while(unlikely(bytes_read < (int)r->decompressor->signature_size)); + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); + + if(unlikely(bytes_read != (int)r->decompressor->signature_size)) + fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor->signature_size); + + size_t compressed_message_size = r->decompressor->start(r->decompressor, r->read_buffer + r->read_len, bytes_read); + if (unlikely(!compressed_message_size)) { + internal_error(true, "multiplexed uncompressed data in compressed stream!"); + r->read_len += bytes_read; + r->read_buffer[r->read_len] = '\0'; + return true; } - int ret = 0; - if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) - return 1; - - if (!is_compressed_data(r->read_buffer, ret)) { - r->read_len += ret; - return 0; + + if(unlikely(compressed_message_size > COMPRESSION_MAX_MSG_SIZE)) { + error("received a compressed message of %zu bytes, which is bigger than the max compressed message size supported of %zu. Ignoring message.", + compressed_message_size, (size_t)COMPRESSION_MAX_MSG_SIZE); + return false; } - if (unlikely(!r->decompressor)) - r->decompressor = create_decompressor(); - - size_t bytes_to_read = r->decompressor->start(r->decompressor, - r->read_buffer, ret); + // delete compression header from our read buffer + r->read_buffer[r->read_len] = '\0'; - // Read the entire block of compressed data because - // we're unable to decompress incomplete block - char compressed[bytes_to_read]; + // Read the entire compressed block of compressed data + char compressed[compressed_message_size]; + size_t compressed_bytes_read = 0; do { - if (read_stream(r, fp, compressed, bytes_to_read, &ret)) - return 1; - // Send input data to decompressor - if (ret) - r->decompressor->put(r->decompressor, compressed, ret); - bytes_to_read -= ret; - } while (bytes_to_read > 0); - // Decompress - size_t bytes_to_parse = r->decompressor->decompress(r->decompressor); - if (!bytes_to_parse) - return 1; - // Fill read buffer with decompressed data - r->read_len = r->decompressor->get(r->decompressor, - r->read_buffer, sizeof(r->read_buffer)); - return 0; -} + size_t start = compressed_bytes_read; + size_t remaining = compressed_message_size - start; -#endif + int last_read_bytes = read_stream(r, &compressed[start], remaining); + if (unlikely(last_read_bytes <= 0)) { + internal_error(true, "read_stream() failed #2, with code %d", last_read_bytes); + return false; + } + + compressed_bytes_read += last_read_bytes; + + } while(unlikely(compressed_message_size > compressed_bytes_read)); + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)compressed_bytes_read); + + // decompress the compressed block + size_t bytes_to_parse = r->decompressor->decompress(r->decompressor, compressed, compressed_bytes_read); + if (!bytes_to_parse) { + internal_error(true, "no bytes to parse."); + return false; + } + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_to_parse); + + // fill read buffer with decompressed data + size_t len = (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1); + if (!len) { + internal_error(true, "decompressor returned zero length #2"); + return false; + } + r->read_len += (int)len; + r->read_buffer[r->read_len] = '\0'; + + return true; +} +#else // !ENABLE_COMPRESSION +static bool receiver_read_compressed(struct receiver_state *r) { + return receiver_read_uncompressed(r); +} +#endif // ENABLE_COMPRESSION /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. */ -static char *receiver_next_line(struct receiver_state *r, int *pos) { - int start = *pos, scan = *pos; - if (scan >= r->read_len) { +static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) { + size_t start = *pos; + + char *ss = &r->read_buffer[start]; + char *se = &r->read_buffer[r->read_len]; + char *ds = buffer; + char *de = &buffer[buffer_length - 2]; + + if(ss >= se) { + *ds = '\0'; + *pos = 0; r->read_len = 0; + r->read_buffer[r->read_len] = '\0'; return NULL; } - while (scan < r->read_len && r->read_buffer[scan] != '\n') - scan++; - if (scan < r->read_len && r->read_buffer[scan] == '\n') { - *pos = scan+1; - r->read_buffer[scan] = 0; - return &r->read_buffer[start]; + + // copy all bytes to buffer + while(ss < se && ds < de && *ss != '\n') + *ds++ = *ss++; + + // if we have a newline, return the buffer + if(ss < se && ds < de && *ss == '\n') { + // newline found in the r->read_buffer + + *ds++ = *ss++; // copy the newline too + *ds = '\0'; + + *pos = ss - r->read_buffer; + return buffer; } + + // if the destination is full, oops! + if(ds == de) { + error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX); + *ds = '\0'; + *pos = ss - r->read_buffer; + return buffer; + } + + // no newline found in the r->read_buffer + // move everything to the beginning memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start); - r->read_len -= start; + r->read_len -= (int)start; + r->read_buffer[r->read_len] = '\0'; + *ds = '\0'; + *pos = 0; return NULL; } static void streaming_parser_thread_cleanup(void *ptr) { PARSER *parser = (PARSER *)ptr; + rrd_collector_finished(); parser_destroy(parser); } -size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) { +static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) { size_t result; PARSER_USER_OBJECT user = { @@ -366,49 +361,68 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp .trust_durations = 1 }; - PARSER *parser = parser_init(rpt->host, &user, fp, PARSER_INPUT_SPLIT); + PARSER *parser = parser_init(rpt->host, &user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl); + + rrd_collector_started(); // this keeps the parser with its current value // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser); - parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp); parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); - parser->plugins_action->begin_action = &pluginsd_begin_action; - parser->plugins_action->flush_action = &pluginsd_flush_action; - parser->plugins_action->end_action = &pluginsd_end_action; - parser->plugins_action->disable_action = &pluginsd_disable_action; - parser->plugins_action->variable_action = &pluginsd_variable_action; - parser->plugins_action->dimension_action = &pluginsd_dimension_action; - parser->plugins_action->label_action = &pluginsd_label_action; - parser->plugins_action->overwrite_action = &pluginsd_overwrite_action; - parser->plugins_action->chart_action = &pluginsd_chart_action; - parser->plugins_action->set_action = &pluginsd_set_action; - parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action; - parser->plugins_action->clabel_action = &pluginsd_clabel_action; - user.parser = parser; + bool compressed_connection = false; #ifdef ENABLE_COMPRESSION - if (rpt->decompressor) - rpt->decompressor->reset(rpt->decompressor); + if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { + compressed_connection = true; + + if (!rpt->decompressor) + rpt->decompressor = create_decompressor(); + else + rpt->decompressor->reset(rpt->decompressor); + } #endif - do{ - if (receiver_read(rpt, fp)) + rpt->read_buffer[0] = '\0'; + rpt->read_len = 0; + + size_t read_buffer_start = 0; + char buffer[PLUGINSD_LINE_MAX + 2] = ""; + while(!netdata_exit) { + if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) { + bool have_new_data; + if(compressed_connection) + have_new_data = receiver_read_compressed(rpt); + else + have_new_data = receiver_read_uncompressed(rpt); + + if(!have_new_data) + break; + + rpt->last_msg_t = now_realtime_sec(); + continue; + } + + if(unlikely(netdata_exit)) { + internal_error(true, "exiting..."); + goto done; + } + if(unlikely(rpt->shutdown)) { + internal_error(true, "parser shutdown..."); + goto done; + } + + if (unlikely(parser_action(parser, buffer))) { + internal_error(true, "parser_action() failed on keyword '%s'.", buffer); break; - int pos = 0; - char *line; - while ((line = receiver_next_line(rpt, &pos))) { - if (unlikely(netdata_exit || rpt->shutdown || parser_action(parser, line))) - goto done; } - rpt->last_msg_t = now_realtime_sec(); } - while(!netdata_exit); done: + internal_error(true, "Streaming receiver thread stopping..."); + result = user.count; // free parser with the pop function @@ -417,6 +431,15 @@ done: return result; } +static void rrdpush_receiver_replication_reset(struct receiver_state *rpt) { + RRDSET *st; + rrdset_foreach_read(st, rpt->host) { + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); + rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + } + rrdset_foreach_done(st); + rrdhost_receiver_replicating_charts_zero(rpt->host); +} static int rrdpush_receive(struct receiver_state *rpt) { @@ -427,6 +450,9 @@ static int rrdpush_receive(struct receiver_state *rpt) char *rrdpush_destination = default_rrdpush_destination; char *rrdpush_api_key = default_rrdpush_api_key; char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; + bool rrdpush_enable_replication = default_rrdpush_enable_replication; + time_t rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate; + time_t rrdpush_replication_step = default_rrdpush_replication_step; time_t alarms_delay = 60; rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every); @@ -439,13 +465,10 @@ static int rrdpush_receive(struct receiver_state *rpt) mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(mode))); mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(mode))); -#ifndef ENABLE_DBENGINE - if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE)) { - close(rpt->fd); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "REJECTED -- DBENGINE MEMORY MODE NOT SUPPORTED"); - return 1; + if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { + error("STREAM %s [receive from %s:%s]: dbengine is not enabled, falling back to default.", rpt->hostname, rpt->client_ip, rpt->client_port); + mode = default_rrd_memory_mode; } -#endif health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled); health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", health_enabled); @@ -465,6 +488,15 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching); rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching); + rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rrdpush_enable_replication); + rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rrdpush_enable_replication); + + rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rrdpush_seconds_to_replicate); + rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rrdpush_seconds_to_replicate); + + rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rrdpush_replication_step); + rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rrdpush_replication_step); + #ifdef ENABLE_COMPRESSION unsigned int rrdpush_compression = default_compression_enabled; rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression); @@ -480,14 +512,12 @@ static int rrdpush_receive(struct receiver_state *rpt) char initial_response[HTTP_HEADER_SIZE + 1]; snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); #ifdef ENABLE_HTTPS - rpt->host->stream_ssl.conn = rpt->ssl.conn; - rpt->host->stream_ssl.flags = rpt->ssl.flags; if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { #else if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { #endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send command.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); + error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); close(rpt->fd); return 0; } @@ -516,6 +546,9 @@ static int rrdpush_receive(struct receiver_state *rpt) , rrdpush_destination , rrdpush_api_key , rrdpush_send_charts_matching + , rrdpush_enable_replication + , rrdpush_seconds_to_replicate + , rrdpush_replication_step , rpt->system_info , 0 ); @@ -561,6 +594,9 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdpush_destination, rrdpush_api_key, rrdpush_send_charts_matching, + rrdpush_enable_replication, + rrdpush_seconds_to_replicate, + rrdpush_replication_step, rpt->system_info); rrd_unlock(); } @@ -575,14 +611,14 @@ static int rrdpush_receive(struct receiver_state *rpt) , rpt->hostname , rpt->client_ip , rpt->client_port - , rpt->host->hostname + , rrdhost_hostname(rpt->host) , rpt->host->machine_guid , rpt->host->rrd_update_every , rpt->host->rrd_history_entries , rrd_memory_mode_name(rpt->host->rrd_memory_mode) , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") , ssl ? " SSL," : "" - , rpt->host->tags?rpt->host->tags:"" + , rrdhost_tags(rpt->host) ); #endif // NETDATA_INTERNAL_CHECKS @@ -596,7 +632,7 @@ static int rrdpush_receive(struct receiver_state *rpt) .obsolete = 0, .started_t = now_realtime_sec(), .next = NULL, - .version = 0, + .capabilities = 0, }; // put the client IP and port into the buffers used by plugins.d @@ -605,60 +641,50 @@ static int rrdpush_receive(struct receiver_state *rpt) snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port); - info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port); - char initial_response[HTTP_HEADER_SIZE]; - if (rpt->stream_version > 1) { - if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){ #ifdef ENABLE_COMPRESSION - if(!rpt->rrdpush_compression) - rpt->stream_version = STREAM_VERSION_CLABELS; -#else - if(STREAMING_PROTOCOL_CURRENT_VERSION < rpt->stream_version) { - rpt->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; - } + if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { + if (!rpt->rrdpush_compression) + rpt->capabilities &= ~STREAM_CAP_COMPRESSION; + } #endif - } - info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version); - sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version); - } else if (rpt->stream_version == 1) { - info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version); + + info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + char initial_response[HTTP_HEADER_SIZE]; + if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities); + } + else if (stream_has_capability(rpt, STREAM_CAP_VN)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities)); + } else if (stream_has_capability(rpt, STREAM_CAP_V2)) { + log_receiver_capabilities(rpt); sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2); - } else { - info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rpt->host->hostname, rpt->client_ip, rpt->client_port); - sprintf(initial_response, "%s", START_STREAMING_PROMPT); + } else { // stream_has_capability(rpt, STREAM_CAP_V1) + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1); } debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); - #ifdef ENABLE_HTTPS - rpt->host->stream_ssl.conn = rpt->ssl.conn; - rpt->host->stream_ssl.flags = rpt->ssl.flags; +#ifdef ENABLE_HTTPS if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { #else if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { #endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); + error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); close(rpt->fd); return 0; } // remove the non-blocking flag from the socket if(sock_delnonblock(rpt->fd) < 0) - error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); + error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); struct timeval timeout; - timeout.tv_sec = 120; + timeout.tv_sec = 600; timeout.tv_usec = 0; if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) - error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); - - // convert the socket to a FILE * - FILE *fp = fdopen(rpt->fd, "r"); - if(!fp) { - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - SOCKET ERROR"); - error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); - close(rpt->fd); - return 0; - } + error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); rrdhost_wrlock(rpt->host); /* if(rpt->host->connected_senders > 0) { @@ -671,34 +697,29 @@ static int rrdpush_receive(struct receiver_state *rpt) */ // rpt->host->connected_senders++; - if(rpt->stream_version > 0) { - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); - rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); - } - else { - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); - rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); - } - if(health_enabled != CONFIG_BOOLEAN_NO) { if(alarms_delay > 0) { rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay; - info( - "Postponing health checks for %" PRId64 " seconds, on host '%s', because it was just connected.", - (int64_t)alarms_delay, - rpt->host->hostname); + log_health( + "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", + rrdhost_hostname(rpt->host), + (int64_t)alarms_delay); } } rpt->host->senders_connect_time = now_realtime_sec(); rpt->host->senders_last_chart_command = 0; rpt->host->trigger_chart_obsoletion_check = 1; + rrdhost_unlock(rpt->host); // call the plugins.d processor to receive the metrics - info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rpt->host->hostname, rpt->client_ip, rpt->client_port); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "CONNECTED"); + info("STREAM %s [receive from [%s]:%s]: receiving metrics...", + rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - cd.version = rpt->stream_version; + log_stream_connection(rpt->client_ip, rpt->client_port, + rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED"); + + cd.capabilities = rpt->capabilities; #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud @@ -707,24 +728,42 @@ static int rrdpush_receive(struct receiver_state *rpt) aclk_host_state_update(rpt->host, 1); #endif + rrdhost_set_is_parent_label(++localhost->senders_count); + + rrdpush_receiver_replication_reset(rpt); rrdcontext_host_child_connected(rpt->host); - size_t count = streaming_parser(rpt, &cd, fp); + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname, + size_t count = streaming_parser(rpt, &cd, rpt->fd, +#ifdef ENABLE_HTTPS + (rpt->ssl.conn) ? &rpt->ssl : NULL +#else + NULL +#endif + ); + + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + + log_stream_connection(rpt->client_ip, rpt->client_port, + rpt->key, rpt->host->machine_guid, rpt->hostname, "DISCONNECTED"); - error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, - rpt->client_port, count); + + error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", + rpt->hostname, rpt->client_ip, rpt->client_port, count); rrdcontext_host_child_disconnected(rpt->host); + rrdpush_receiver_replication_reset(rpt); #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud - // new child connected + // a child disconnected if (netdata_cloud_setting) aclk_host_state_update(rpt->host, 0); #endif + rrdhost_set_is_parent_label(--localhost->senders_count); + // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread if (!netdata_exit && rpt->host) { rrd_rdlock(); @@ -747,7 +786,7 @@ static int rrdpush_receive(struct receiver_state *rpt) } // cleanup - fclose(fp); + close(rpt->fd); return (int)count; } @@ -758,6 +797,9 @@ void *rrdpush_receiver_thread(void *ptr) { info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); worker_register("STREAMRCV"); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE); rrdpush_receive(rpt); worker_unregister(); diff --git a/streaming/replication.c b/streaming/replication.c new file mode 100644 index 000000000..8fa501061 --- /dev/null +++ b/streaming/replication.c @@ -0,0 +1,1407 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "replication.h" +#include "Judy.h" + +#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50 +#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20 +#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10 + +#define WORKER_JOB_FIND_NEXT 1 +#define WORKER_JOB_QUERYING 2 +#define WORKER_JOB_DELETE_ENTRY 3 +#define WORKER_JOB_FIND_CHART 4 +#define WORKER_JOB_CHECK_CONSISTENCY 5 +#define WORKER_JOB_BUFFER_COMMIT 6 +#define WORKER_JOB_CLEANUP 7 + +// master thread worker jobs +#define WORKER_JOB_STATISTICS 8 +#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 9 +#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 10 +#define WORKER_JOB_CUSTOM_METRIC_ADDED 11 +#define WORKER_JOB_CUSTOM_METRIC_DONE 12 +#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 13 +#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 14 +#define WORKER_JOB_CUSTOM_METRIC_WAITS 15 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16 + +#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30 +#define SECONDS_TO_RESET_POINT_IN_TIME 10 + +static struct replication_query_statistics replication_queries = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .queries_started = 0, + .queries_finished = 0, + .points_read = 0, + .points_generated = 0, +}; + +struct replication_query_statistics replication_get_query_statistics(void) { + netdata_spinlock_lock(&replication_queries.spinlock); + struct replication_query_statistics ret = replication_queries; + netdata_spinlock_unlock(&replication_queries.spinlock); + return ret; +} + +// ---------------------------------------------------------------------------- +// sending replication replies + +struct replication_dimension { + STORAGE_POINT sp; + struct storage_engine_query_handle handle; + bool enabled; + + DICTIONARY *dict; + const DICTIONARY_ITEM *rda; + RRDDIM *rd; +}; + +static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) { + size_t dimensions = rrdset_number_of_dimensions(st); + size_t points_read = 0, points_generated = 0; + + struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops; + struct replication_dimension data[dimensions]; + memset(data, 0, sizeof(data)); + + if(enable_streaming && st->last_updated.tv_sec > before) { + internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)before, + (unsigned long long)st->last_updated.tv_sec + ); + before = st->last_updated.tv_sec; + } + + // prepare our array of dimensions + { + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if(unlikely(!rd || !rd_dfe.item || !rd->exposed)) + continue; + + if (unlikely(rd_dfe.counter >= dimensions)) { + internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); + break; + } + + struct replication_dimension *d = &data[rd_dfe.counter]; + + d->dict = rd_dfe.dict; + d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); + d->rd = rd; + + ops->init(rd->tiers[0]->db_metric_handle, &d->handle, after, before); + d->enabled = true; + } + rrddim_foreach_done(rd); + } + + time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before; + while(now <= before) { + time_t min_start_time = 0, min_end_time = 0; + for (size_t i = 0; i < dimensions ;i++) { + struct replication_dimension *d = &data[i]; + if(unlikely(!d->enabled)) continue; + + // fetch the first valid point for the dimension + int max_skip = 100; + while(d->sp.end_time < now && !ops->is_finished(&d->handle) && max_skip-- > 0) { + d->sp = ops->next_metric(&d->handle); + points_read++; + } + + internal_error(max_skip <= 0, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu", + rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(d->rd), (unsigned long long) now); + + if(unlikely(d->sp.end_time < now || storage_point_is_unset(d->sp) || storage_point_is_empty(d->sp))) + continue; + + if(unlikely(!min_start_time)) { + min_start_time = d->sp.start_time; + min_end_time = d->sp.end_time; + } + else { + min_start_time = MIN(min_start_time, d->sp.start_time); + min_end_time = MIN(min_end_time, d->sp.end_time); + } + } + + if(unlikely(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1)) { + internal_error(true, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)min_start_time, + (unsigned long long)min_end_time, + (unsigned long long)wall_clock_time); + break; + } + + if(unlikely(min_end_time < now)) { +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, + "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu", + rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now); +#endif // NETDATA_LOG_REPLICATION_REQUESTS + break; + } + + if(unlikely(min_end_time <= min_start_time)) + min_start_time = min_end_time - st->update_every; + + if(unlikely(!actual_after)) { + actual_after = min_end_time; + actual_before = min_end_time; + } + else + actual_before = min_end_time; + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n" + , (unsigned long long)min_start_time + , (unsigned long long)min_end_time + , (unsigned long long)wall_clock_time + ); + + // output the replay values for this time + for (size_t i = 0; i < dimensions ;i++) { + struct replication_dimension *d = &data[i]; + if(unlikely(!d->enabled)) continue; + + if(likely(d->sp.start_time <= min_end_time && d->sp.end_time >= min_end_time)) + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n", + rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : ""); + + else + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n", + rrddim_id(d->rd)); + + points_generated++; + } + + now = min_end_time + 1; + } + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + if(actual_after) { + char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1]; + log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after); + log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before); + internal_error(true, + "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf, + (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before)); + } + else + internal_error(true, + "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)after, (unsigned long long)before); +#endif // NETDATA_LOG_REPLICATION_REQUESTS + + // release all the dictionary items acquired + // finalize the queries + size_t queries = 0; + for(size_t i = 0; i < dimensions ;i++) { + struct replication_dimension *d = &data[i]; + if(unlikely(!d->enabled)) continue; + + ops->finalize(&d->handle); + + dictionary_acquired_item_release(d->dict, d->rda); + + // update global statistics + queries++; + } + + netdata_spinlock_lock(&replication_queries.spinlock); + replication_queries.queries_started += queries; + replication_queries.queries_finished += queries; + replication_queries.points_read += points_read; + replication_queries.points_generated += points_generated; + netdata_spinlock_unlock(&replication_queries.spinlock); + + return before; +} + +static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) { + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if(!rd->exposed) continue; + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n", + rrddim_id(rd), + (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec, + rd->last_collected_value, + rd->last_calculated_value, + rd->last_stored_value + ); + } + rrddim_foreach_done(rd); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n", + (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec, + (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec + ); +} + +bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) { + time_t query_after = after; + time_t query_before = before; + time_t now = now_realtime_sec(); + time_t tolerance = 2; // sometimes from the time we get this value, to the time we check, + // a data collection has been made + // so, we give this tolerance to detect invalid timestamps + + // find the first entry we have + time_t first_entry_local = rrdset_first_entry_t(st); + if(first_entry_local > now + tolerance) { + internal_error(true, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)first_entry_local, (unsigned long long)now); + first_entry_local = now; + } + + if (query_after < first_entry_local) + query_after = first_entry_local; + + // find the latest entry we have + time_t last_entry_local = st->last_updated.tv_sec; + if(!last_entry_local) { + internal_error(true, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); + last_entry_local = rrdset_last_entry_t(st); + if(!last_entry_local) { + internal_error(true, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); + last_entry_local = now; + } + } + + if(last_entry_local > now + tolerance) { + internal_error(true, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)last_entry_local, (unsigned long long)now); + last_entry_local = now; + } + + if (query_before > last_entry_local) + query_before = last_entry_local; + + // if the parent asked us to start streaming, then fill the rest with the data that we have + if (start_streaming) + query_before = last_entry_local; + + if (query_after > query_before) { + time_t tmp = query_before; + query_before = query_after; + query_after = tmp; + } + + bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false; + + // we might want to optimize this by filling a temporary buffer + // and copying the result to the host's buffer in order to avoid + // holding the host's buffer lock for too long + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); + + if(after != 0 && before != 0) + before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now); + else { + after = 0; + before = 0; + enable_streaming = true; + } + + // get again the world clock time + time_t world_clock_time = now_realtime_sec(); + if(enable_streaming) { + if(now < world_clock_time) { + // we needed time to execute this request + // so, the parent will need to replicate more data + enable_streaming = false; + } + else + replicate_chart_collection_state(wb, st); + } + + // end with first/last entries we have, and the first start time and + // last end time of the data we sent + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n", + + // current chart update every + (int)st->update_every + + // child first db time, child end db time + , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local + + // start streaming boolean + , enable_streaming ? "true" : "false" + + // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true) + , (unsigned long long)after, (unsigned long long)before + + // child world clock time + , (unsigned long long)world_clock_time + ); + + worker_is_busy(WORKER_JOB_BUFFER_COMMIT); + sender_commit(host->sender, wb); + worker_is_busy(WORKER_JOB_CLEANUP); + + return enable_streaming; +} + +// ---------------------------------------------------------------------------- +// sending replication requests + +struct replication_request_details { + struct { + send_command callback; + void *data; + } caller; + + RRDHOST *host; + RRDSET *st; + + struct { + time_t first_entry_t; // the first entry time the child has + time_t last_entry_t; // the last entry time the child has + time_t world_time_t; // the current time of the child + } child_db; + + struct { + time_t first_entry_t; // the first entry time we have + time_t last_entry_t; // the last entry time we have + bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future, and we fixed + time_t now; // the current local world clock time + } local_db; + + struct { + time_t from; // the starting time of the entire gap we have + time_t to; // the ending time of the entire gap we have + } gap; + + struct { + time_t after; // the start time we requested previously from this child + time_t before; // the end time we requested previously from this child + } last_request; + + struct { + time_t after; // the start time of this replication request - the child will add 1 second + time_t before; // the end time of this replication request + bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before' + } wanted; +}; + +static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) { + RRDSET *st = r->st; + + if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t)) + st->rrdhost->receiver->replication_first_time_t = r->wanted.after; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + st->replay.log_next_data_collection = true; + + char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = ""; + + if(r->wanted.after) + log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after); + + if(r->wanted.before) + log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before); + + internal_error(true, + "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: " + "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s" + , rrdhost_hostname(r->host), rrdset_id(r->st) + , r->wanted.after, wanted_after_buf + , r->wanted.before, wanted_before_buf + , r->wanted.start_streaming ? "YES" : "NO" + , msg + , r->last_request.after, r->last_request.before + , r->child_db.first_entry_t, r->child_db.last_entry_t + , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD" + , r->local_db.first_entry_t, r->local_db.last_entry_t + , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now + , r->gap.from, r->gap.to + , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL" + , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : "" + ); + + st->replay.start_streaming = r->wanted.start_streaming; + st->replay.after = r->wanted.after; + st->replay.before = r->wanted.before; +#endif // NETDATA_LOG_REPLICATION_REQUESTS + + char buffer[2048 + 1]; + snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", + rrdset_id(st), r->wanted.start_streaming ? "true" : "false", + (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before); + + int ret = r->caller.callback(buffer, r->caller.data); + if (ret < 0) { + error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)", + rrdhost_hostname(r->host), rrdset_id(r->st), ret); + return false; + } + + return true; +} + +bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st, + time_t first_entry_child, time_t last_entry_child, time_t child_world_time, + time_t prev_first_entry_wanted, time_t prev_last_entry_wanted) +{ + struct replication_request_details r = { + .caller = { + .callback = callback, + .data = callback_data, + }, + + .host = host, + .st = st, + + .child_db = { + .first_entry_t = first_entry_child, + .last_entry_t = last_entry_child, + .world_time_t = child_world_time, + }, + + .local_db = { + .first_entry_t = rrdset_first_entry_t(st), + .last_entry_t = rrdset_last_entry_t(st), + .last_entry_t_adjusted_to_now = false, + .now = now_realtime_sec(), + }, + + .last_request = { + .after = prev_first_entry_wanted, + .before = prev_last_entry_wanted, + }, + + .wanted = { + .after = 0, + .before = 0, + .start_streaming = true, + }, + }; + + // check our local database retention + if(r.local_db.last_entry_t > r.local_db.now) { + r.local_db.last_entry_t = r.local_db.now; + r.local_db.last_entry_t_adjusted_to_now = true; + } + + // let's find the GAP we have + if(!r.last_request.after || !r.last_request.before) { + // there is no previous request + + if(r.local_db.last_entry_t) + // we have some data, let's continue from the last point we have + r.gap.from = r.local_db.last_entry_t; + else + // we don't have any data, the gap is the max timeframe we are allowed to replicate + r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate; + + } + else { + // we had sent a request - let's continue at the point we left it + // for this we don't take into account the actual data in our db + // because the child may also have gaps, and we need to get over it + r.gap.from = r.last_request.before; + } + + // we want all the data up to now + r.gap.to = r.local_db.now; + + // The gap is now r.gap.from -> r.gap.to + + if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) + return send_replay_chart_cmd(&r, "empty replication request, replication is disabled"); + + if (unlikely(!r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, child has no stored data"); + + if (unlikely(!rrdset_number_of_dimensions(st))) + return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions"); + + if (r.child_db.first_entry_t <= 0) + return send_replay_chart_cmd(&r, "empty replication request, first entry of the child db first entry is invalid"); + + if (r.child_db.first_entry_t > r.child_db.last_entry_t) + return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)"); + + if (r.local_db.last_entry_t > r.child_db.last_entry_t) + return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one"); + + // let's find what the child can provide to fill that gap + + if(r.child_db.first_entry_t > r.gap.from) + // the child does not have all the data - let's get what it has + r.wanted.after = r.child_db.first_entry_t; + else + // ok, the child can fill the entire gap we have + r.wanted.after = r.gap.from; + + if(r.gap.to - r.wanted.after > host->rrdpush_replication_step) + // the duration is too big for one request - let's take the first step + r.wanted.before = r.wanted.after + host->rrdpush_replication_step; + else + // wow, we can do it in one request + r.wanted.before = r.gap.to; + + // don't ask from the child more than it has + if(r.wanted.before > r.child_db.last_entry_t) + r.wanted.before = r.child_db.last_entry_t; + + if(r.wanted.after > r.wanted.before) + r.wanted.after = r.wanted.before; + + // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child + r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t); + + // the wanted timeframe is now r.wanted.after -> r.wanted.before + // send it + return send_replay_chart_cmd(&r, "OK"); +} + +// ---------------------------------------------------------------------------- +// replication thread + +// replication request in sender DICTIONARY +// used for de-duplicating the requests +struct replication_request { + struct sender_state *sender; // the sender we should put the reply at + STRING *chart_id; // the chart of the request + time_t after; // the start time of the query (maybe zero) key for sorting (JudyL) + time_t before; // the end time of the query (maybe zero) + bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming + + usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request + Word_t unique_id; // auto-increment, later requests have bigger + bool found; // used as a result boolean for the find call + bool indexed_in_judy; // true when the request is indexed in judy +}; + +// replication sort entry in JudyL array +// used for sorting all requests, across all nodes +struct replication_sort_entry { + struct replication_request *rq; + + size_t unique_id; // used as a key to identify the sort entry - we never access its contents +}; + +#define MAX_REPLICATION_THREADS 20 // + 1 for the main thread + +// the global variables for the replication thread +static struct replication_thread { + netdata_mutex_t mutex; + + struct { + size_t pending; // number of requests pending in the queue + Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1) + + // statistics + size_t added; // number of requests added to the queue + size_t removed; // number of requests removed from the queue + size_t skipped_not_connected; // number of requests skipped, because the sender is not connected to a parent + size_t skipped_no_room; // number of requests skipped, because the sender has no room for responses +// size_t skipped_no_room_since_last_reset; + size_t sender_resets; // number of times a sender reset our last position in the queue + time_t first_time_t; // the minimum 'after' we encountered + + struct { + Word_t after; + Word_t unique_id; + Pvoid_t JudyL_array; + } queue; + + } unsafe; // protected from replication_recursive_lock() + + struct { + size_t executed; // the number of replication requests executed + size_t latest_first_time; // the 'after' timestamp of the last request we executed + } atomic; // access should be with atomic operations + + struct { + size_t waits; + size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time + + netdata_thread_t **threads_ptrs; + size_t threads; + } main_thread; // access is allowed only by the main thread + +} replication_globals = { + .mutex = NETDATA_MUTEX_INITIALIZER, + .unsafe = { + .pending = 0, + .unique_id = 0, + + .added = 0, + .removed = 0, + .skipped_not_connected = 0, + .skipped_no_room = 0, +// .skipped_no_room_since_last_reset = 0, + .sender_resets = 0, + + .first_time_t = 0, + + .queue = { + .after = 0, + .unique_id = 0, + .JudyL_array = NULL, + }, + }, + .atomic = { + .executed = 0, + .latest_first_time = 0, + }, + .main_thread = { + .waits = 0, + .last_executed = 0, + .threads = 0, + .threads_ptrs = NULL, + }, +}; + +#define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED) +#define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED) + +static inline bool replication_recursive_lock_mode(char mode) { + static __thread int recursions = 0; + + if(mode == 'L') { // (L)ock + if(++recursions == 1) + netdata_mutex_lock(&replication_globals.mutex); + } + else if(mode == 'U') { // (U)nlock + if(--recursions == 0) + netdata_mutex_unlock(&replication_globals.mutex); + } + else if(mode == 'C') { // (C)heck + if(recursions > 0) + return true; + else + return false; + } + else + fatal("REPLICATION: unknown lock mode '%c'", mode); + +#ifdef NETDATA_INTERNAL_CHECKS + if(recursions < 0) + fatal("REPLICATION: recursions is %d", recursions); +#endif + + return true; +} + +#define replication_recursive_lock() replication_recursive_lock_mode('L') +#define replication_recursive_unlock() replication_recursive_lock_mode('U') +#define fatal_when_replication_is_not_locked_for_me() do { \ + if(!replication_recursive_lock_mode('C')) \ + fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \ +} while(0) + +void replication_set_next_point_in_time(time_t after, size_t unique_id) { + replication_recursive_lock(); + replication_globals.unsafe.queue.after = after; + replication_globals.unsafe.queue.unique_id = unique_id; + replication_recursive_unlock(); +} + +// ---------------------------------------------------------------------------- +// replication sort entry management + +static struct replication_sort_entry *replication_sort_entry_create_unsafe(struct replication_request *rq) { + fatal_when_replication_is_not_locked_for_me(); + + struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry)); + + rrdpush_sender_pending_replication_requests_plus_one(rq->sender); + + // copy the request + rse->rq = rq; + rse->unique_id = ++replication_globals.unsafe.unique_id; + + // save the unique id into the request, to be able to delete it later + rq->unique_id = rse->unique_id; + rq->indexed_in_judy = false; + return rse; +} + +static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { + freez(rse); +} + +static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *rq) { + replication_recursive_lock(); + + struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq); + +// if(rq->after < (time_t)replication_globals.protected.queue.after && +// rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && +// !replication_globals.protected.skipped_no_room_since_last_reset) { +// +// // make it find this request first +// replication_set_next_point_in_time(rq->after, rq->unique_id); +// } + + replication_globals.unsafe.added++; + replication_globals.unsafe.pending++; + + Pvoid_t *inner_judy_ptr; + + // find the outer judy entry, using after as key + inner_judy_ptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); + if(!inner_judy_ptr) + inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); + + // add it to the inner judy, using unique_id as key + Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); + *item = rse; + rq->indexed_in_judy = true; + + if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t) + replication_globals.unsafe.first_time_t = rq->after; + + replication_recursive_unlock(); + + return rse; +} + +static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) { + fatal_when_replication_is_not_locked_for_me(); + + bool inner_judy_deleted = false; + + replication_globals.unsafe.removed++; + replication_globals.unsafe.pending--; + + rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender); + + rse->rq->indexed_in_judy = false; + + // delete it from the inner judy + JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0); + + // if no items left, delete it from the outer judy + if(**inner_judy_ppptr == NULL) { + JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0); + inner_judy_deleted = true; + } + + // free memory + replication_sort_entry_destroy(rse); + + return inner_judy_deleted; +} + +static void replication_sort_entry_del(struct replication_request *rq) { + Pvoid_t *inner_judy_pptr; + struct replication_sort_entry *rse_to_delete = NULL; + + replication_recursive_lock(); + if(rq->indexed_in_judy) { + + inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0); + if (inner_judy_pptr) { + Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0); + if (our_item_pptr) { + rse_to_delete = *our_item_pptr; + replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr); + } + } + + if (!rse_to_delete) + fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.", + rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after); + + } + + replication_recursive_unlock(); +} + +static inline PPvoid_t JudyLFirstOrNext(Pcvoid_t PArray, Word_t * PIndex, bool first) { + if(unlikely(first)) + return JudyLFirst(PArray, PIndex, PJE0); + + return JudyLNext(PArray, PIndex, PJE0); +} + +static struct replication_request replication_request_get_first_available() { + Pvoid_t *inner_judy_pptr; + + replication_recursive_lock(); + + struct replication_request rq_to_return = (struct replication_request){ .found = false }; + + if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) { + replication_globals.unsafe.queue.after = 0; + replication_globals.unsafe.queue.unique_id = 0; + } + + Word_t started_after = replication_globals.unsafe.queue.after; + + size_t round = 0; + while(!rq_to_return.found) { + round++; + + if(round > 2) + break; + + if(round == 2) { + if(started_after == 0) + break; + + replication_globals.unsafe.queue.after = 0; + replication_globals.unsafe.queue.unique_id = 0; + } + + bool find_same_after = true; + while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, find_same_after))) { + Pvoid_t *our_item_pptr; + + if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after)) + break; + + while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) { + struct replication_sort_entry *rse = *our_item_pptr; + struct replication_request *rq = rse->rq; + struct sender_state *s = rq->sender; + + if (likely(rrdpush_sender_get_buffer_used_percent(s) <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) { + // there is room for this request in the sender buffer + + bool sender_is_connected = + rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); + + bool sender_has_been_flushed_since_this_request = + rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s); + + if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) { + // skip this request, the sender is not connected, or it has reconnected + + replication_globals.unsafe.skipped_not_connected++; + if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) + // we removed the item from the outer JudyL + break; + } + else { + // this request is good to execute + + // copy the request to return it + rq_to_return = *rq; + rq_to_return.chart_id = string_dup(rq_to_return.chart_id); + + // set the return result to found + rq_to_return.found = true; + + if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) + // we removed the item from the outer JudyL + break; + } + } + else { + replication_globals.unsafe.skipped_no_room++; +// replication_globals.protected.skipped_no_room_since_last_reset++; + } + } + + // call JudyLNext from now on + find_same_after = false; + + // prepare for the next iteration on the outer loop + replication_globals.unsafe.queue.unique_id = 0; + } + } + + replication_recursive_unlock(); + return rq_to_return; +} + +// ---------------------------------------------------------------------------- +// replication request management + +static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) { + struct sender_state *s = sender_state; (void)s; + struct replication_request *rq = value; + + // IMPORTANT: + // We use the react instead of the insert callback + // because we want the item to be atomically visible + // to our replication thread, immediately after. + + // If we put this at the insert callback, the item is not guaranteed + // to be atomically visible to others, so the replication thread + // may see the replication sort entry, but fail to find the dictionary item + // related to it. + + replication_sort_entry_add(rq); + + // this request is about a unique chart for this sender + rrdpush_sender_replicating_charts_plus_one(s); +} + +static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) { + struct sender_state *s = sender_state; (void)s; + struct replication_request *rq = old_value; (void)rq; + struct replication_request *rq_new = new_value; + + replication_recursive_lock(); + + if(!rq->indexed_in_judy) { + replication_sort_entry_add(rq); + internal_error( + true, + "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), + (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", + (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); + } + else { + internal_error( + true, + "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), + dictionary_acquired_item_name(item), + (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false", + (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false"); + } + + replication_recursive_unlock(); + + string_freez(rq_new->chart_id); + return false; +} + +static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) { + struct replication_request *rq = value; + + // this request is about a unique chart for this sender + rrdpush_sender_replicating_charts_minus_one(rq->sender); + + if(rq->indexed_in_judy) + replication_sort_entry_del(rq); + + string_freez(rq->chart_id); +} + +static bool replication_execute_request(struct replication_request *rq, bool workers) { + bool ret = false; + + if(likely(workers)) + worker_is_busy(WORKER_JOB_FIND_CHART); + + RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); + if(!st) { + internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found", + rrdhost_hostname(rq->sender->host), string2str(rq->chart_id)); + + goto cleanup; + } + + if(likely(workers)) + worker_is_busy(WORKER_JOB_QUERYING); + + netdata_thread_disable_cancelability(); + + // send the replication data + bool start_streaming = replicate_chart_response( + st->rrdhost, st, rq->start_streaming, rq->after, rq->before); + + netdata_thread_enable_cancelability(); + + if(start_streaming && rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender)) { + // enable normal streaming if we have to + // but only if the sender buffer has not been flushed since we started + + if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_minus_one(st->rrdhost); + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); +#endif + } + else + internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating", + rrdhost_hostname(st->rrdhost), string2str(rq->chart_id)); + } + + __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED); + + ret = true; + +cleanup: + string_freez(rq->chart_id); + return ret; +} + +// ---------------------------------------------------------------------------- +// public API + +void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) { + struct replication_request rq = { + .sender = sender, + .chart_id = string_strdupz(chart_id), + .after = after, + .before = before, + .start_streaming = start_streaming, + .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender), + }; + + if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) + replication_execute_request(&rq, false); + + else + dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request)); +} + +void replication_sender_delete_pending_requests(struct sender_state *sender) { + // allow the dictionary destructor to go faster on locks + replication_recursive_lock(); + dictionary_flush(sender->replication.requests); + replication_recursive_unlock(); +} + +void replication_init_sender(struct sender_state *sender) { + sender->replication.requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender); + dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender); + dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender); +} + +void replication_cleanup_sender(struct sender_state *sender) { + // allow the dictionary destructor to go faster on locks + replication_recursive_lock(); + dictionary_destroy(sender->replication.requests); + replication_recursive_unlock(); +} + +void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { + size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer); + size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size; + + if(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) + s->replication.unsafe.reached_max = true; + + if(s->replication.unsafe.reached_max && + percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) { + s->replication.unsafe.reached_max = false; + replication_recursive_lock(); +// replication_set_next_point_in_time(0, 0); + replication_globals.unsafe.sender_resets++; + replication_recursive_unlock(); + } + + rrdpush_sender_set_buffer_used_percent(s, percentage); +} + +// ---------------------------------------------------------------------------- +// replication thread + +static size_t verify_host_charts_are_streaming_now(RRDHOST *host) { + internal_error( + host->sender && + !rrdpush_sender_pending_replication_requests(host->sender) && + dictionary_entries(host->sender->replication.requests) != 0, + "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication", + rrdhost_hostname(host), + rrdpush_sender_pending_replication_requests(host->sender), + dictionary_entries(host->sender->replication.requests) + ); + + size_t ok = 0; + size_t errors = 0; + + RRDSET *st; + rrdset_foreach_read(st, host) { + RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + + bool is_error = false; + + if(!flags) { + internal_error( + true, + "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED", + rrdhost_hostname(host), rrdset_id(st) + ); + is_error = true; + } + + if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + internal_error( + true, + "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished", + rrdhost_hostname(host), rrdset_id(st) + ); + is_error = true; + } + + if(is_error) + errors++; + else + ok++; + } + rrdset_foreach_done(st); + + internal_error(errors, + "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished", + rrdhost_hostname(host), ok, errors); + + return errors; +} + +static void verify_all_hosts_charts_are_streaming_now(void) { + worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY); + + size_t errors = 0; + RRDHOST *host; + dfe_start_read(rrdhost_root_index, host) + errors += verify_host_charts_are_streaming_now(host); + dfe_done(host); + + size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED); + info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", + executed - replication_globals.main_thread.last_executed, errors); + replication_globals.main_thread.last_executed = executed; +} + +static void replication_initialize_workers(bool master) { + worker_register("REPLICATION"); + worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next"); + worker_register_job_name(WORKER_JOB_QUERYING, "querying"); + worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete"); + worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart"); + worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency"); + worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit"); + worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup"); + + if(master) { + worker_register_job_name(WORKER_JOB_STATISTICS, "statistics"); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, "not connected requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL); + } +} + +#define REQUEST_OK (0) +#define REQUEST_QUEUE_EMPTY (-1) +#define REQUEST_CHART_NOT_FOUND (-2) + +static int replication_execute_next_pending_request(void) { + worker_is_busy(WORKER_JOB_FIND_NEXT); + struct replication_request rq = replication_request_get_first_available(); + + if(unlikely(!rq.found)) + return REQUEST_QUEUE_EMPTY; + + // delete the request from the dictionary + worker_is_busy(WORKER_JOB_DELETE_ENTRY); + if(!dictionary_del(rq.sender->replication.requests, string2str(rq.chart_id))) + error("REPLAY ERROR: 'host:%s/chart:%s' failed to be deleted from sender pending charts index", + rrdhost_hostname(rq.sender->host), string2str(rq.chart_id)); + + replication_set_latest_first_time(rq.after); + + if(unlikely(!replication_execute_request(&rq, true))) + return REQUEST_CHART_NOT_FOUND; + + return REQUEST_OK; +} + +static void replication_worker_cleanup(void *ptr __maybe_unused) { + worker_unregister(); +} + +static void *replication_worker_thread(void *ptr) { + replication_initialize_workers(false); + + netdata_thread_cleanup_push(replication_worker_cleanup, ptr); + + while(!netdata_exit) { + if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + worker_is_idle(); + sleep_usec(1 * USEC_PER_SEC); + } + } + + netdata_thread_cleanup_pop(1); + return NULL; +} + +static void replication_main_cleanup(void *ptr) { + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + int threads = (int)replication_globals.main_thread.threads; + for(int i = 0; i < threads ;i++) { + netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL); + freez(replication_globals.main_thread.threads_ptrs[i]); + } + freez(replication_globals.main_thread.threads_ptrs); + replication_globals.main_thread.threads_ptrs = NULL; + + // custom code + worker_unregister(); + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +void *replication_thread_main(void *ptr __maybe_unused) { + replication_initialize_workers(true); + + int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1); + if(threads < 1 || threads > MAX_REPLICATION_THREADS) { + error("replication threads given %d is invalid, resetting to 1", threads); + threads = 1; + } + + if(--threads) { + replication_globals.main_thread.threads = threads; + replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *)); + + for(int i = 0; i < threads ;i++) { + replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t)); + netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], "REPLICATION", + NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL); + } + } + + netdata_thread_cleanup_push(replication_main_cleanup, ptr); + + // start from 100% completed + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0); + + long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place + bool slow = true; // control the time we sleep - it has to start with true! + usec_t last_now_mono_ut = now_monotonic_usec(); + time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds + + size_t last_executed = 0; + size_t last_sender_resets = 0; + + while(!netdata_exit) { + + // statistics + usec_t now_mono_ut = now_monotonic_usec(); + if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) { + last_now_mono_ut = now_mono_ut; + + replication_recursive_lock(); + + size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED); + if(last_executed != current_executed) { + run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION; + last_executed = current_executed; + slow = false; + } + + if(replication_reset_next_point_in_time_countdown-- == 0) { + // once per second, make it scan all the pending requests next time + replication_set_next_point_in_time(0, 0); +// replication_globals.protected.skipped_no_room_since_last_reset = 0; + replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; + } + + if(!replication_globals.unsafe.pending && --run_verification_countdown == 0) { + // reset the statistics about completion percentage + replication_globals.unsafe.first_time_t = 0; + replication_set_latest_first_time(0); + + verify_all_hosts_charts_are_streaming_now(); + + run_verification_countdown = LONG_MAX; + slow = true; + } + + worker_is_busy(WORKER_JOB_STATISTICS); + + time_t latest_first_time_t = replication_get_latest_first_time(); + if(latest_first_time_t && replication_globals.unsafe.pending) { + // completion percentage statistics + time_t now = now_realtime_sec(); + time_t total = now - replication_globals.unsafe.first_time_t; + time_t done = latest_first_time_t - replication_globals.unsafe.first_time_t; + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, + (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total); + } + else + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0); + + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED)); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_not_connected); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_no_room); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)replication_globals.main_thread.waits); + + replication_recursive_unlock(); + } + + if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + replication_recursive_lock(); + + // the timeout also defines now frequently we will traverse all the pending requests + // when the outbound buffers of all senders is full + usec_t timeout; + if(slow) + // no work to be done, wait for a request to come in + timeout = 1000 * USEC_PER_MS; + + else if(replication_globals.unsafe.pending > 0) { + if(replication_globals.unsafe.sender_resets == last_sender_resets) { + timeout = 1000 * USEC_PER_MS; + } + else { + // there are pending requests waiting to be executed, + // but none could be executed at this time. + // try again after this time. + timeout = 100 * USEC_PER_MS; + } + + last_sender_resets = replication_globals.unsafe.sender_resets; + } + else { + // no requests pending, but there were requests recently (run_verification_countdown) + // so, try in a short time. + // if this is big, one chart replicating will be slow to finish (ping - pong just one chart) + timeout = 10 * USEC_PER_MS; + last_sender_resets = replication_globals.unsafe.sender_resets; + } + + replication_globals.main_thread.waits++; + replication_recursive_unlock(); + + worker_is_idle(); + sleep_usec(timeout); + + // make it scan all the pending requests next time + replication_set_next_point_in_time(0, 0); + replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; + + continue; + } + } + + netdata_thread_cleanup_pop(1); + return NULL; +} diff --git a/streaming/replication.h b/streaming/replication.h new file mode 100644 index 000000000..00462cc3a --- /dev/null +++ b/streaming/replication.h @@ -0,0 +1,33 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef REPLICATION_H +#define REPLICATION_H + +#include "daemon/common.h" + +struct replication_query_statistics { + SPINLOCK spinlock; + size_t queries_started; + size_t queries_finished; + size_t points_read; + size_t points_generated; +}; + +struct replication_query_statistics replication_get_query_statistics(void); + +bool replicate_chart_response(RRDHOST *rh, RRDSET *rs, bool start_streaming, time_t after, time_t before); + +typedef int (*send_command)(const char *txt, void *data); + +bool replicate_chart_request(send_command callback, void *callback_data, + RRDHOST *rh, RRDSET *rs, + time_t first_entry_child, time_t last_entry_child, time_t child_world_time, + time_t response_first_start_time, time_t response_last_end_time); + +void replication_init_sender(struct sender_state *sender); +void replication_cleanup_sender(struct sender_state *sender); +void replication_sender_delete_pending_requests(struct sender_state *sender); +void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming); +void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s); + +#endif /* REPLICATION_H */ diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index b73f24633..a57f1b080 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -11,8 +11,8 @@ * 1. a random data collection thread, calling rrdset_done_push() * this is called for each chart. * - * the output of this work is kept in a BUFFER in RRDHOST - * the sender thread is signalled via a pipe (also in RRDHOST) + * the output of this work is kept in a thread BUFFER + * the sender thread is signalled via a pipe (in RRDHOST) * * 2. a sender thread running at the sending netdata * this is spawned automatically on the first chart to be pushed @@ -46,6 +46,9 @@ unsigned int default_compression_enabled = 1; char *default_rrdpush_destination = NULL; char *default_rrdpush_api_key = NULL; char *default_rrdpush_send_charts_matching = NULL; +bool default_rrdpush_enable_replication = true; +time_t default_rrdpush_seconds_to_replicate = 86400; +time_t default_rrdpush_replication_step = 600; #ifdef ENABLE_HTTPS int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL; char *netdata_ssl_ca_path = NULL; @@ -66,6 +69,31 @@ static void load_stream_conf() { freez(filename); } +bool rrdpush_receiver_needs_dbengine() { + struct section *co; + + for(co = stream_config.first_section; co; co = co->next) { + if(strcmp(co->name, "stream") == 0) + continue; // the first section is not relevant + + char *s; + + s = appconfig_get_by_section(co, "enabled", NULL); + if(!s || !appconfig_test_boolean_value(s)) + continue; + + s = appconfig_get_by_section(co, "default memory mode", NULL); + if(s && strcmp(s, "dbengine") == 0) + return true; + + s = appconfig_get_by_section(co, "memory mode", NULL); + if(s && strcmp(s, "dbengine") == 0) + return true; + } + + return false; +} + int rrdpush_init() { // -------------------------------------------------------------------- // load stream.conf @@ -75,6 +103,11 @@ int rrdpush_init() { default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", ""); default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", ""); default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*"); + + default_rrdpush_enable_replication = config_get_boolean(CONFIG_SECTION_DB, "enable replication", default_rrdpush_enable_replication); + default_rrdpush_seconds_to_replicate = config_get_number(CONFIG_SECTION_DB, "seconds to replicate", default_rrdpush_seconds_to_replicate); + default_rrdpush_replication_step = config_get_number(CONFIG_SECTION_DB, "seconds per replication step", default_rrdpush_replication_step); + rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time); #ifdef ENABLE_COMPRESSION @@ -101,14 +134,14 @@ int rrdpush_init() { bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO); if(invalid_certificate == CONFIG_BOOLEAN_YES){ - if(netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){ + if(netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE){ info("Netdata is configured to accept invalid SSL certificate."); - netdata_validate_server = NETDATA_SSL_INVALID_CERTIFICATE; + netdata_ssl_validate_server = NETDATA_SSL_INVALID_CERTIFICATE; } } - netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", "/etc/ssl/certs/"); - netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", "/etc/ssl/certs/certs.pem"); + netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL); + netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL); #endif return default_rrdpush_enabled; @@ -128,30 +161,31 @@ int rrdpush_init() { // this is for the first iterations of each chart unsigned int remote_clock_resync_iterations = 60; - -static inline int should_send_chart_matching(RRDSET *st) { - // Do not stream anomaly rates charts. - if (unlikely(st->state->is_ar_chart)) +static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) { + if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED)) return false; - if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION)) - return ml_streaming_enabled(); - - if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) { + if(unlikely(!(flags & (RRDSET_FLAG_UPSTREAM_SEND | RRDSET_FLAG_UPSTREAM_IGNORE)))) { RRDHOST *host = st->rrdhost; - if(simple_pattern_matches(host->rrdpush_send_charts_matching, st->id) || - simple_pattern_matches(host->rrdpush_send_charts_matching, st->name)) { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE); - rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); + if (flags & RRDSET_FLAG_ANOMALY_DETECTION) { + if(ml_streaming_enabled()) + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); + else + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); } - else { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND); + else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) || + simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st))) + + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); + else rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); - } + + // get the flags again, to know how to respond + flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE); } - return(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND)); + return flags & RRDSET_FLAG_UPSTREAM_SEND; } int configured_as_parent() { @@ -173,42 +207,25 @@ int configured_as_parent() { return is_parent; } -// checks if the current chart definition has been sent -static inline int need_to_send_chart_definition(RRDSET *st) { - rrdset_check_rdlock(st); - - if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED)))) - return 1; - - RRDDIM *rd; - rrddim_foreach_read(rd, st) { - if(unlikely(!rd->exposed)) { - #ifdef NETDATA_INTERNAL_CHECKS - info("host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", st->rrdhost->hostname, st->id, rd->id); - #endif - return 1; - } - } - - return 0; -} - // chart labels static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { BUFFER *wb = (BUFFER *)data; buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls); return 1; } -void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) { - if (st->state && st->state->chart_labels) { - if(rrdlabels_walkthrough_read(st->state->chart_labels, send_clabels_callback, host->sender->build) > 0) - buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n"); + +static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) { + if (st->rrdlabels) { + if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, wb) > 0) + buffer_sprintf(wb, "CLABEL_COMMIT\n"); } } // Send the current chart definition. // Assumes that collector thread has already called sender_start for mutex / buffer state. -static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { +static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { + bool replication_progress = false; + RRDHOST *host = st->rrdhost; rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED); @@ -216,9 +233,9 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { // properly set the name for the remote end to parse it char *name = ""; if(likely(st->name)) { - if(unlikely(strcmp(st->id, st->name))) { + if(unlikely(st->id != st->name)) { // they differ - name = strchr(st->name, '.'); + name = strchr(rrdset_name(st), '.'); if(name) name++; else @@ -228,14 +245,14 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { // send the chart buffer_sprintf( - host->sender->build + wb , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n" - , st->id + , rrdset_id(st) , name - , st->title - , st->units - , st->family - , st->context + , rrdset_title(st) + , rrdset_units(st) + , rrdset_family(st) + , rrdset_context(st) , rrdset_type_name(st->chart_type) , st->priority , st->update_every @@ -243,120 +260,190 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { , rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":"" , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":"" , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":"" - , (st->plugin_name)?st->plugin_name:"" - , (st->module_name)?st->module_name:"" + , rrdset_plugin_name(st) + , rrdset_module_name(st) ); // send the chart labels - if (host->sender->version >= STREAM_VERSION_CLABELS) - rrdpush_send_clabels(host, st); + if (stream_has_capability(host->sender, STREAM_CAP_CLABELS)) + rrdpush_send_clabels(wb, st); // send the dimensions RRDDIM *rd; rrddim_foreach_read(rd, st) { buffer_sprintf( - host->sender->build + wb , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n" - , rd->id - , rd->name + , rrddim_id(rd) + , rrddim_name(rd) , rrd_algorithm_name(rd->algorithm) , rd->multiplier , rd->divisor , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":"" - , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":"" - , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":"" + , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":"" + , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":"" ); rd->exposed = 1; } + rrddim_foreach_done(rd); + + // send the chart functions + if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS)) + rrd_functions_expose_rrdpush(st, wb); // send the chart local custom variables - RRDSETVAR *rs; - for(rs = st->variables; rs ;rs = rs->next) { - if(unlikely(rs->type == RRDVAR_TYPE_CALCULATED && rs->options & RRDVAR_OPTION_CUSTOM_CHART_VAR)) { - NETDATA_DOUBLE *value = (NETDATA_DOUBLE *) rs->value; - - buffer_sprintf( - host->sender->build - , "VARIABLE CHART %s = " NETDATA_DOUBLE_FORMAT "\n" - , rs->variable - , *value - ); + rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); + + if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) { + time_t first_entry_local = rrdset_first_entry_t_of_tier(st, 0); + time_t last_entry_local = st->last_updated.tv_sec; + + if(unlikely(!last_entry_local)) + last_entry_local = rrdset_last_entry_t(st); + + time_t now = now_realtime_sec(); + if(unlikely(last_entry_local > now)) { + internal_error(true, + "RRDSET REPLAY ERROR: 'host:%s/chart:%s' last updated time %ld is in the future, adjusting it to now %ld", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + last_entry_local, now); + last_entry_local = now; + } + + if(unlikely(first_entry_local && last_entry_local && first_entry_local >= last_entry_local)) { + internal_error(true, + "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first updated time %ld is equal or bigger than last updated time %ld, adjusting it last updated time - update every", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + first_entry_local, last_entry_local); + first_entry_local = last_entry_local - st->update_every; + } + + if(unlikely(!first_entry_local && last_entry_local)) { + internal_error(true, + "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first time %ld, last time %ld, setting both to last time", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + first_entry_local, last_entry_local); + first_entry_local = last_entry_local; } + + buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n", + (unsigned long long)first_entry_local, + (unsigned long long)last_entry_local, + (unsigned long long)now); + + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_plus_one(st->rrdhost); + replication_progress = true; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, "REPLAY: 'host:%s/chart:%s' replication starts", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); +#endif } st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); + return replication_progress; } // sends the current chart dimensions -static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) { - RRDHOST *host = st->rrdhost; - buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0); - if (s->version >= VERSION_GAP_FILLING) - buffer_sprintf(host->sender->build, " %"PRId64"\n", (int64_t)st->last_collected_time.tv_sec); +static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s, RRDSET_FLAGS flags) { + buffer_fast_strcat(wb, "BEGIN \"", 7); + buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); + buffer_fast_strcat(wb, "\" ", 2); + + if(stream_has_capability(s, STREAM_CAP_REPLICATION) || st->last_collected_time.tv_sec > st->upstream_resync_time) + buffer_print_llu(wb, st->usec_since_last_update); else - buffer_strcat(host->sender->build, "\n"); + buffer_fast_strcat(wb, "0", 1); + + buffer_fast_strcat(wb, "\n", 1); RRDDIM *rd; rrddim_foreach_read(rd, st) { - if(rd->updated && rd->exposed) - buffer_sprintf(host->sender->build - , "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n" - , rd->id - , rd->collected_value - ); + if(unlikely(!rd->updated)) + continue; + + if(likely(rd->exposed)) { + buffer_fast_strcat(wb, "SET \"", 5); + buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); + buffer_fast_strcat(wb, "\" = ", 4); + buffer_print_ll(wb, rd->collected_value); + buffer_fast_strcat(wb, "\n", 1); + } + else { + internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed", + rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); + // we will include it in the next iteration + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + } } - buffer_strcat(host->sender->build, "END\n"); + rrddim_foreach_done(rd); + + if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES)) + rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); + + buffer_fast_strcat(wb, "END\n", 4); } static void rrdpush_sender_thread_spawn(RRDHOST *host); // Called from the internal collectors to mark a chart obsolete. -void rrdset_push_chart_definition_now(RRDSET *st) { +bool rrdset_push_chart_definition_now(RRDSET *st) { RRDHOST *host = st->rrdhost; - if(unlikely(!host->rrdpush_send_enabled || !should_send_chart_matching(st))) - return; + if(unlikely(!rrdhost_can_send_definitions_to_parent(host) + || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST)))) + return false; + + BUFFER *wb = sender_start(host->sender); + rrdpush_send_chart_definition(wb, st); + sender_commit(host->sender, wb); - rrdset_rdlock(st); - sender_start(host->sender); - rrdpush_send_chart_definition_nolock(st); - sender_commit(host->sender); - rrdset_unlock(st); + return true; } void rrdset_done_push(RRDSET *st) { - if(unlikely(!should_send_chart_matching(st))) - return; - RRDHOST *host = st->rrdhost; - if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn)) - rrdpush_sender_thread_spawn(host); + // fetch the flags we need to check with one atomic operation + RRDHOST_FLAGS host_flags = __atomic_load_n(&host->flags, __ATOMIC_SEQ_CST); + + // check if we are not connected + if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) { + + if(unlikely(!(host_flags & (RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED)))) + rrdpush_sender_thread_spawn(host); + + if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) { + rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); + error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); + } - // Handle non-connected case - if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) { - if(unlikely(!host->rrdpush_sender_error_shown)) - error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname); - host->rrdpush_sender_error_shown = 1; return; } - else if(unlikely(host->rrdpush_sender_error_shown)) { - info("STREAM %s [send]: sending metrics...", host->hostname); - host->rrdpush_sender_error_shown = 0; + else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) { + info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); } - sender_start(host->sender); + RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST); + bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED); + bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED); - if(need_to_send_chart_definition(st)) - rrdpush_send_chart_definition_nolock(st); + if(unlikely((exposed_upstream && replication_in_progress) || + !should_send_chart_matching(st, rrdset_flags))) + return; + + BUFFER *wb = sender_start(host->sender); - rrdpush_send_chart_metrics_nolock(st, host->sender); + if(unlikely(!exposed_upstream)) + replication_in_progress = rrdpush_send_chart_definition(wb, st); - // signal the sender there are more data - if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) - error("STREAM %s [send]: cannot write to internal pipe", host->hostname); + if (likely(!replication_in_progress)) + rrdpush_send_chart_metrics(wb, st, host->sender, rrdset_flags); - sender_commit(host->sender); + sender_commit(host->sender, wb); } // labels @@ -365,45 +452,38 @@ static int send_labels_callback(const char *name, const char *value, RRDLABEL_SR buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value); return 1; } -void rrdpush_send_labels(RRDHOST *host) { - if (!host->host_labels || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE) || (rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_STOP))) +void rrdpush_send_host_labels(RRDHOST *host) { + if(unlikely(!rrdhost_can_send_definitions_to_parent(host) + || !stream_has_capability(host->sender, STREAM_CAP_HLABELS))) return; - sender_start(host->sender); - - rrdlabels_walkthrough_read(host->host_labels, send_labels_callback, host->sender->build); - buffer_sprintf(host->sender->build, "OVERWRITE %s\n", "labels"); - sender_commit(host->sender); + BUFFER *wb = sender_start(host->sender); - if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) - error("STREAM %s [send]: cannot write to internal pipe", host->hostname); + rrdlabels_walkthrough_read(host->rrdlabels, send_labels_callback, wb); + buffer_sprintf(wb, "OVERWRITE %s\n", "labels"); - rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); + sender_commit(host->sender, wb); } void rrdpush_claimed_id(RRDHOST *host) { - if(unlikely(!host->rrdpush_send_enabled || !__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) - return; - - if(host->sender->version < STREAM_VERSION_CLAIM) + if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM)) return; - sender_start(host->sender); + if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) + return; + + BUFFER *wb = sender_start(host->sender); rrdhost_aclk_state_lock(host); - buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") ); + buffer_sprintf(wb, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") ); rrdhost_aclk_state_unlock(host); - sender_commit(host->sender); - - // signal the sender there are more data - if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) - error("STREAM %s [send]: cannot write to internal pipe", host->hostname); + sender_commit(host->sender, wb); } int connect_to_one_of_destinations( - struct rrdpush_destinations *destinations, + RRDHOST *host, int default_port, struct timeval *timeout, size_t *reconnects_counter, @@ -413,28 +493,44 @@ int connect_to_one_of_destinations( { int sock = -1; - for (struct rrdpush_destinations *d = destinations; d; d = d->next) { - if (d->disabled_no_proper_reply) { - d->disabled_no_proper_reply = 0; - continue; - } else if (d->disabled_because_of_localhost) { - continue; - } else if (d->disabled_already_streaming && (d->disabled_already_streaming + 30 > now_realtime_sec())) { - continue; - } else if (d->disabled_because_of_denied_access) { - d->disabled_because_of_denied_access = 0; + for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) { + time_t now = now_realtime_sec(); + + if(d->postpone_reconnection_until > now) { + info( + "STREAM %s: skipping destination '%s' (default port: %d) due to last error (code: %d, %s), will retry it in %d seconds", + rrdhost_hostname(host), + string2str(d->destination), + default_port, + d->last_handshake, d->last_error?d->last_error:"unset reason description", + (int)(d->postpone_reconnection_until - now)); + continue; } + info( + "STREAM %s: attempting to connect to '%s' (default port: %d)...", + rrdhost_hostname(host), + string2str(d->destination), + default_port); + if (reconnects_counter) *reconnects_counter += 1; - sock = connect_to_this(d->destination, default_port, timeout); + + sock = connect_to_this(string2str(d->destination), default_port, timeout); + if (sock != -1) { - if (connected_to && connected_to_size) { - strncpy(connected_to, d->destination, connected_to_size); - connected_to[connected_to_size - 1] = '\0'; - } + if (connected_to && connected_to_size) + strncpyz(connected_to, string2str(d->destination), connected_to_size); + *destination = d; + + // move the current item to the end of the list + // without this, this destination will break the loop again and again + // not advancing the destinations to find one that may work + DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, d, prev, next); + DOUBLE_LINKED_LIST_APPEND_UNSAFE(host->destinations, d, prev, next); + break; } } @@ -442,44 +538,51 @@ int connect_to_one_of_destinations( return sock; } -struct rrdpush_destinations *destinations_init(const char *dests) { - const char *s = dests; - struct rrdpush_destinations *destinations = NULL, *prev = NULL; - while(*s) { - const char *e = s; - - // skip path, moving both s(tart) and e(nd) - if(*e == '/') - while(!isspace(*e) && *e != ',') s = ++e; - - // skip separators, moving both s(tart) and e(nd) - while(isspace(*e) || *e == ',') s = ++e; - - // move e(nd) to the first separator - while(*e && !isspace(*e) && *e != ',' && *e != '/') e++; - - // is there anything? - if(!*s || s == e) break; - - char buf[e - s + 1]; - strncpyz(buf, s, e - s); - struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations)); - strncpyz(d->destination, buf, sizeof(d->destination)-1); - d->disabled_no_proper_reply = 0; - d->disabled_because_of_localhost = 0; - d->disabled_already_streaming = 0; - d->disabled_because_of_denied_access = 0; - d->next = NULL; - if (!destinations) { - destinations = d; - } else { - prev->next = d; - } - prev = d; +struct destinations_init_tmp { + RRDHOST *host; + struct rrdpush_destinations *list; + int count; +}; + +bool destinations_init_add_one(char *entry, void *data) { + struct destinations_init_tmp *t = data; + + struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations)); + d->destination = string_strdupz(entry); + + DOUBLE_LINKED_LIST_APPEND_UNSAFE(t->list, d, prev, next); + + t->count++; + info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host)); + + return false; // we return false, so that we will get all defined destinations +} + +void rrdpush_destinations_init(RRDHOST *host) { + if(!host->rrdpush_send_destination) return; + + rrdpush_destinations_free(host); + + struct destinations_init_tmp t = { + .host = host, + .list = NULL, + .count = 0, + }; + + foreach_entry_in_connection_string(host->rrdpush_send_destination, destinations_init_add_one, &t); - s = e; + host->destinations = t.list; +} + +void rrdpush_destinations_free(RRDHOST *host) { + while (host->destinations) { + struct rrdpush_destinations *tmp = host->destinations; + DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, tmp, prev, next); + string_freez(tmp->destination); + freez(tmp); } - return destinations; + + host->destinations = NULL; } // ---------------------------------------------------------------------------- @@ -495,11 +598,13 @@ void rrdpush_sender_thread_stop(RRDHOST *host) { netdata_mutex_lock(&host->sender->mutex); netdata_thread_t thr = 0; - if(host->rrdpush_sender_spawn) { - info("STREAM %s [send]: signaling sending thread to stop...", host->hostname); + if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) { + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); + + info("STREAM %s [send]: signaling sending thread to stop...", rrdhost_hostname(host)); // signal the thread that we want to join it - host->rrdpush_sender_join = 1; + rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN); // copy the thread id, so that we will be waiting for the right one // even if a new one has been spawn @@ -512,10 +617,10 @@ void rrdpush_sender_thread_stop(RRDHOST *host) { netdata_mutex_unlock(&host->sender->mutex); if(thr != 0) { - info("STREAM %s [send]: waiting for the sending thread to stop...", host->hostname); + info("STREAM %s [send]: waiting for the sending thread to stop...", rrdhost_hostname(host)); void *result; netdata_thread_join(thr, &result); - info("STREAM %s [send]: sending thread has exited.", host->hostname); + info("STREAM %s [send]: sending thread has exited.", rrdhost_hostname(host)); } } @@ -531,15 +636,16 @@ void log_stream_connection(const char *client_ip, const char *client_port, const static void rrdpush_sender_thread_spawn(RRDHOST *host) { netdata_mutex_lock(&host->sender->mutex); - if(!host->rrdpush_sender_spawn) { + if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) { char tag[NETDATA_THREAD_TAG_MAX + 1]; - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", host->hostname); + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", rrdhost_hostname(host)); if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host->sender)) - error("STREAM %s [send]: failed to create new thread for client.", host->hostname); + error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host)); else - host->rrdpush_sender_spawn = 1; + rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); } + netdata_mutex_unlock(&host->sender->mutex); } @@ -608,7 +714,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { else if(!strcmp(name, "tags")) tags = value; else if(!strcmp(name, "ver")) - stream_version = MIN((uint32_t) strtoul(value, NULL, 0), STREAMING_PROTOCOL_CURRENT_VERSION); + stream_version = convert_stream_version_to_capabilities(strtoul(value, NULL, 0)); else { // An old Netdata child does not have a compatible streaming protocol, map to something sane. if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME")) @@ -624,7 +730,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION")) name = "NETDATA_HOST_OS_DETECTION"; else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && stream_version == UINT_MAX) { - stream_version = 1; + stream_version = convert_stream_version_to_capabilities(1); } if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) { @@ -635,7 +741,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { } if (stream_version == UINT_MAX) - stream_version = 0; + stream_version = convert_stream_version_to_capabilities(0); if(!key || !*key) { rrdhost_system_info_free(system_info); @@ -660,21 +766,30 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { if(regenerate_guid(key, buf) == -1) { rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - INVALID KEY"); + log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID KEY"); error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID (use the command uuidgen to generate one). Forbidding access.", w->client_ip, w->client_port, key); return rrdpush_receiver_permission_denied(w); } if(regenerate_guid(machine_guid, buf) == -1) { rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - INVALID MACHINE GUID"); + log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID MACHINE GUID"); error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid); return rrdpush_receiver_permission_denied(w); } + const char *api_key_type = appconfig_get(&stream_config, key, "type", "api"); + if(!api_key_type || !*api_key_type) api_key_type = "unknown"; + if(strcmp(api_key_type, "api") != 0) { + rrdhost_system_info_free(system_info); + log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - API KEY GIVEN IS NOT API KEY"); + error("STREAM [receive from [%s]:%s]: API key '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, key, api_key_type); + return rrdpush_receiver_permission_denied(w); + } + if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) { rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - KEY NOT ENABLED"); + log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ENABLED"); error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key); return rrdpush_receiver_permission_denied(w); } @@ -685,7 +800,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { if(!simple_pattern_matches(key_allow_from, w->client_ip)) { simple_pattern_free(key_allow_from); rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname) ? hostname : "-", "ACCESS DENIED - KEY NOT ALLOWED FROM THIS IP"); + log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ALLOWED FROM THIS IP"); error("STREAM [receive from [%s]:%s]: API key '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, key); return rrdpush_receiver_permission_denied(w); } @@ -693,9 +808,18 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { } } + const char *machine_guid_type = appconfig_get(&stream_config, machine_guid, "type", "machine"); + if(!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown"; + if(strcmp(machine_guid_type, "machine") != 0) { + rrdhost_system_info_free(system_info); + log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID GIVEN IS NOT A MACHINE GUID"); + error("STREAM [receive from [%s]:%s]: machine GUID '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid, machine_guid_type); + return rrdpush_receiver_permission_denied(w); + } + if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) { rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - MACHINE GUID NOT ENABLED"); + log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ENABLED"); error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid); return rrdpush_receiver_permission_denied(w); } @@ -706,7 +830,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { if(!simple_pattern_matches(machine_allow_from, w->client_ip)) { simple_pattern_free(machine_allow_from); rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname) ? hostname : "-", "ACCESS DENIED - MACHINE GUID NOT ALLOWED FROM THIS IP"); + log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ALLOWED FROM THIS IP"); error("STREAM [receive from [%s]:%s]: Machine GUID '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, machine_guid); return rrdpush_receiver_permission_denied(w); } @@ -746,7 +870,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { struct receiver_state *rpt = callocz(1, sizeof(*rpt)); rrd_rdlock(); - RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0); + RRDHOST *host = rrdhost_find_by_guid(machine_guid); if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */ host = NULL; if (host) { @@ -763,7 +887,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { info( "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - " "existing connection is dead (%"PRId64" sec), accepting new connection.", - host->hostname, + rrdhost_hostname(host), w->client_ip, w->client_port, (int64_t)age); @@ -772,12 +896,12 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { netdata_mutex_unlock(&host->receiver_lock); rrdhost_unlock(host); rrd_unlock(); - log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, host->hostname, + log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, rrdhost_hostname(host), "REJECTED - ALREADY CONNECTED"); info( "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - " "existing connection is active (within last %"PRId64" sec), rejecting new connection.", - host->hostname, + rrdhost_hostname(host), w->client_ip, w->client_port, (int64_t)age); @@ -811,7 +935,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { rpt->client_port = strdupz(w->client_port); rpt->update_every = update_every; rpt->system_info = system_info; - rpt->stream_version = stream_version; + rpt->capabilities = stream_version; #ifdef ENABLE_HTTPS rpt->ssl.conn = w->ssl.conn; rpt->ssl.flags = w->ssl.flags; @@ -855,3 +979,66 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { buffer_flush(w->response.data); return 200; } + +static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) { + if(caps & STREAM_CAP_V1) buffer_strcat(wb, "V1 "); + if(caps & STREAM_CAP_V2) buffer_strcat(wb, "V2 "); + if(caps & STREAM_CAP_VN) buffer_strcat(wb, "VN "); + if(caps & STREAM_CAP_VCAPS) buffer_strcat(wb, "VCAPS "); + if(caps & STREAM_CAP_HLABELS) buffer_strcat(wb, "HLABELS "); + if(caps & STREAM_CAP_CLAIM) buffer_strcat(wb, "CLAIM "); + if(caps & STREAM_CAP_CLABELS) buffer_strcat(wb, "CLABELS "); + if(caps & STREAM_CAP_COMPRESSION) buffer_strcat(wb, "COMPRESSION "); + if(caps & STREAM_CAP_FUNCTIONS) buffer_strcat(wb, "FUNCTIONS "); + if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION "); + if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY "); +} + +void log_receiver_capabilities(struct receiver_state *rpt) { + BUFFER *wb = buffer_create(100); + stream_capabilities_to_string(wb, rpt->capabilities); + + info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", + rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb)); + + buffer_free(wb); +} + +void log_sender_capabilities(struct sender_state *s) { + BUFFER *wb = buffer_create(100); + stream_capabilities_to_string(wb, s->capabilities); + + info("STREAM %s [send to %s]: established link with negotiated capabilities: %s", + rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb)); + + buffer_free(wb); +} + +STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) { + STREAM_CAPABILITIES caps = 0; + + if(version <= 1) caps = STREAM_CAP_V1; + else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS; + else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM; + else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS; + else if(version <= STREAM_OLD_VERSION_COMPRESSION) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION; + else caps = version; + + if(caps & STREAM_CAP_VCAPS) + caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2|STREAM_CAP_VN); + + if(caps & STREAM_CAP_VN) + caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2); + + if(caps & STREAM_CAP_V2) + caps &= ~(STREAM_CAP_V1); + + return caps & STREAM_OUR_CAPABILITIES; +} + +int32_t stream_capabilities_to_vn(uint32_t caps) { + if(caps & STREAM_CAP_COMPRESSION) return STREAM_OLD_VERSION_COMPRESSION; + if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS; + return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM) +} + diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 1eb39cc6c..c5f7618c1 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -10,32 +10,83 @@ #define CONNECTED_TO_SIZE 100 -#define STREAM_VERSION_CLAIM 3 -#define STREAM_VERSION_CLABELS 4 -#define STREAM_VERSION_COMPRESSION 5 -#define VERSION_GAP_FILLING 6 +// ---------------------------------------------------------------------------- +// obsolete versions - do not use anymore + +#define STREAM_OLD_VERSION_CLAIM 3 +#define STREAM_OLD_VERSION_CLABELS 4 +#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production + +// ---------------------------------------------------------------------------- +// capabilities negotiation + +typedef enum { + // do not use the first 3 bits + STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol + STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels) + STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol) + // v3 = claiming supported + // v4 = chart labels supported + // v5 = lz4 compression supported + STREAM_CAP_VCAPS = (1 << 6), // capabilities negotiation supported + STREAM_CAP_HLABELS = (1 << 7), // host labels supported + STREAM_CAP_CLAIM = (1 << 8), // claiming supported + STREAM_CAP_CLABELS = (1 << 9), // chart labels supported + STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported + STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported + STREAM_CAP_REPLICATION = (1 << 12), // replication supported + STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data + + // this must be signed int, so don't use the last bit + // needed for negotiating errors between parent and child +} STREAM_CAPABILITIES; #ifdef ENABLE_COMPRESSION -#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_COMPRESSION) +#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION #else -#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_CLABELS) -#endif //ENABLE_COMPRESSION +#define STREAM_HAS_COMPRESSION 0 +#endif // ENABLE_COMPRESSION + +#define STREAM_OUR_CAPABILITIES ( \ + STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | \ + STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | \ + STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS | STREAM_CAP_REPLICATION | STREAM_CAP_BINARY ) + +#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability))) + +// ---------------------------------------------------------------------------- +// stream handshake + +#define HTTP_HEADER_SIZE 8192 #define STREAMING_PROTOCOL_VERSION "1.1" -#define START_STREAMING_PROMPT "Hit me baby, push them over..." -#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..." +#define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..." +#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..." #define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version=" #define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back" #define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server" #define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info." -#define HTTP_HEADER_SIZE 8192 - typedef enum { - RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW, - RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW -} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY; + STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION + STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS + STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM + STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS + STREAM_HANDSHAKE_OK_V1 = 1, + STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1, + STREAM_HANDSHAKE_ERROR_LOCALHOST = -2, + STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3, + STREAM_HANDSHAKE_ERROR_DENIED = -4, + STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5, + STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6, + STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7, + STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8, + STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9 +} STREAM_HANDSHAKE; + + +// ---------------------------------------------------------------------------- typedef struct { char *os_name; @@ -47,8 +98,8 @@ typedef struct { #ifdef ENABLE_COMPRESSION struct compressor_state { - char *buffer; - size_t buffer_size; + char *compression_result_buffer; + size_t compression_result_buffer_size; struct compressor_data *data; // Compression API specific data void (*reset)(struct compressor_state *state); size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer); @@ -56,21 +107,14 @@ struct compressor_state { }; struct decompressor_state { - char *buffer; - size_t buffer_size; - size_t buffer_len; - size_t buffer_pos; - char *out_buffer; - size_t out_buffer_len; - size_t out_buffer_pos; + size_t signature_size; size_t total_compressed; size_t total_uncompressed; size_t packet_count; - struct decompressor_data *data; // Decompression API specific data + struct decompressor_stream *stream; // Decompression API specific data void (*reset)(struct decompressor_state *state); size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size); - size_t (*put)(struct decompressor_state *state, const char *data, size_t size); - size_t (*decompress)(struct decompressor_state *state); + size_t (*decompress)(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state); size_t (*get)(struct decompressor_state *state, char *data, size_t size); void (*destroy)(struct decompressor_state **state); @@ -80,11 +124,17 @@ struct decompressor_state { // Thread-local storage // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. +typedef enum { + SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown + SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression +} SENDER_FLAGS; + struct sender_state { RRDHOST *host; - pid_t task_id; - unsigned int overflow:1; - int timeout, default_port; + pid_t tid; // the thread id of the sender, from gettid() + SENDER_FLAGS flags; + int timeout; + int default_port; usec_t reconnect_delay; char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c size_t begin; @@ -92,22 +142,62 @@ struct sender_state { size_t sent_bytes; size_t sent_bytes_on_this_connection; size_t send_attempts; - time_t last_sent_t; + time_t last_traffic_seen_t; size_t not_connected_loops; // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here. netdata_mutex_t mutex; struct circular_buffer *buffer; - BUFFER *build; - char read_buffer[512]; + char read_buffer[PLUGINSD_LINE_MAX + 1]; int read_len; - int32_t version; + STREAM_CAPABILITIES capabilities; + + int rrdpush_sender_pipe[2]; // collector to sender thread signaling + int rrdpush_sender_socket; + #ifdef ENABLE_COMPRESSION - unsigned int rrdpush_compression; struct compressor_state *compressor; #endif +#ifdef ENABLE_HTTPS + struct netdata_ssl ssl; // structure used to encrypt the connection +#endif + + struct { + DICTIONARY *requests; // de-duplication of replication requests, per chart + + struct { + size_t pending_requests; // the currently outstanding replication requests + size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart) + } atomic; + + struct { + bool reached_max; // used to avoid resetting the replication thread too frequently + } unsafe; // protected by sender mutex + + } replication; + + struct { + size_t buffer_used_percentage; // the current utilization of the sending buffer + usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC + } atomic; }; +#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED); +#define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED) + +#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED); +#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED) + +#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED) + +#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED) + struct receiver_state { RRDHOST *host; netdata_thread_t thread; @@ -127,9 +217,9 @@ struct receiver_state { char *program_version; struct rrdhost_system_info *system_info; int update_every; - uint32_t stream_version; + STREAM_CAPABILITIES capabilities; time_t last_msg_t; - char read_buffer[1024]; // Need to allow RRD_ID_LENGTH_MAX * 4 + the other fields + char read_buffer[PLUGINSD_LINE_MAX + 1]; int read_len; unsigned int shutdown:1; // Tell the thread to exit unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!) @@ -140,14 +230,18 @@ struct receiver_state { unsigned int rrdpush_compression; struct decompressor_state *decompressor; #endif + + time_t replication_first_time_t; }; struct rrdpush_destinations { - char destination[CONNECTED_TO_SIZE + 1]; - int disabled_no_proper_reply; - int disabled_because_of_localhost; - time_t disabled_already_streaming; - int disabled_because_of_denied_access; + STRING *destination; + + const char *last_error; + time_t postpone_reconnection_until; + STREAM_HANDSHAKE last_handshake; + + struct rrdpush_destinations *prev; struct rrdpush_destinations *next; }; @@ -158,27 +252,35 @@ extern unsigned int default_compression_enabled; extern char *default_rrdpush_destination; extern char *default_rrdpush_api_key; extern char *default_rrdpush_send_charts_matching; +extern bool default_rrdpush_enable_replication; +extern time_t default_rrdpush_seconds_to_replicate; +extern time_t default_rrdpush_replication_step; extern unsigned int remote_clock_resync_iterations; -extern void sender_init(RRDHOST *parent); -extern struct rrdpush_destinations *destinations_init(const char *destinations); -void sender_start(struct sender_state *s); -void sender_commit(struct sender_state *s); -extern int rrdpush_init(); -extern int configured_as_parent(); -extern void rrdset_done_push(RRDSET *st); -extern void rrdset_push_chart_definition_now(RRDSET *st); -extern void *rrdpush_sender_thread(void *ptr); -extern void rrdpush_send_labels(RRDHOST *host); -extern void rrdpush_claimed_id(RRDHOST *host); - -extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url); -extern void rrdpush_sender_thread_stop(RRDHOST *host); - -extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv); -extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); -extern int connect_to_one_of_destinations( - struct rrdpush_destinations *destinations, +void rrdpush_destinations_init(RRDHOST *host); +void rrdpush_destinations_free(RRDHOST *host); + +void sender_init(RRDHOST *host); + +BUFFER *sender_start(struct sender_state *s); +void sender_commit(struct sender_state *s, BUFFER *wb); +void sender_cancel(struct sender_state *s); +int rrdpush_init(); +bool rrdpush_receiver_needs_dbengine(); +int configured_as_parent(); +void rrdset_done_push(RRDSET *st); +bool rrdset_push_chart_definition_now(RRDSET *st); +void *rrdpush_sender_thread(void *ptr); +void rrdpush_send_host_labels(RRDHOST *host); +void rrdpush_claimed_id(RRDHOST *host); + +int rrdpush_receiver_thread_spawn(struct web_client *w, char *url); +void rrdpush_sender_thread_stop(RRDHOST *host); + +void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva); +void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); +int connect_to_one_of_destinations( + RRDHOST *host, int default_port, struct timeval *timeout, size_t *reconnects_counter, @@ -186,10 +288,18 @@ extern int connect_to_one_of_destinations( size_t connected_to_size, struct rrdpush_destinations **destination); +void rrdpush_signal_sender_to_wake_up(struct sender_state *s); + #ifdef ENABLE_COMPRESSION struct compressor_state *create_compressor(); struct decompressor_state *create_decompressor(); -size_t is_compressed_data(const char *data, size_t data_size); #endif +void log_receiver_capabilities(struct receiver_state *rpt); +void log_sender_capabilities(struct sender_state *s); +STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version); +int32_t stream_capabilities_to_vn(uint32_t caps); + +#include "replication.h" + #endif //NETDATA_RRDPUSH_H diff --git a/streaming/sender.c b/streaming/sender.c index c4836aeaf..8e637d2bd 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -1,6 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" +#include "parser/parser.h" #define WORKER_SENDER_JOB_CONNECT 0 #define WORKER_SENDER_JOB_PIPE_READ 1 @@ -17,9 +18,15 @@ #define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12 #define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13 #define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14 - -#if WORKER_UTILIZATION_MAX_JOB_TYPES < 15 -#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 15 +#define WORKER_SENDER_JOB_BUFFER_RATIO 15 +#define WORKER_SENDER_JOB_BYTES_RECEIVED 16 +#define WORKER_SENDER_JOB_BYTES_SENT 17 +#define WORKER_SENDER_JOB_REPLAY_REQUEST 18 +#define WORKER_SENDER_JOB_FUNCTION_REQUEST 19 +#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20 + +#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21 +#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21 #endif extern struct config stream_config; @@ -27,10 +34,31 @@ extern int netdata_use_ssl_on_stream; extern char *netdata_ssl_ca_path; extern char *netdata_ssl_ca_file; +static __thread BUFFER *sender_thread_buffer = NULL; +static __thread bool sender_thread_buffer_used = false; + +void sender_thread_buffer_free(void) { + if(sender_thread_buffer) { + buffer_free(sender_thread_buffer); + sender_thread_buffer = NULL; + } +} + // Collector thread starting a transmission -void sender_start(struct sender_state *s) { - netdata_mutex_lock(&s->mutex); - buffer_flush(s->build); +BUFFER *sender_start(struct sender_state *s __maybe_unused) { + if(!sender_thread_buffer) + sender_thread_buffer = buffer_create(1024); + + if(sender_thread_buffer_used) + fatal("STREAMING: thread buffer is used multiple times concurrently."); + + sender_thread_buffer_used = true; + buffer_flush(sender_thread_buffer); + return sender_thread_buffer; +} + +void sender_cancel(struct sender_state *s __maybe_unused) { + sender_thread_buffer_used = false; } static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); @@ -43,137 +71,218 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); */ static inline void deactivate_compression(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION); - error("STREAM_COMPRESSION: Deactivating compression to avoid stream corruption"); - default_compression_enabled = 0; - s->rrdpush_compression = 0; - s->version = STREAM_VERSION_CLABELS; - error("STREAM_COMPRESSION %s [send to %s]: Restarting connection without compression", s->host->hostname, s->connected_to); + error("STREAM_COMPRESSION: Compression returned error, disabling it."); + s->flags &= ~SENDER_FLAG_COMPRESSION; + error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to); rrdpush_sender_thread_close_socket(s->host); } #endif +#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3 + // Collector thread finishing a transmission -void sender_commit(struct sender_state *s) { - char *src = (char *)buffer_tostring(s->host->sender->build); - size_t src_len = s->host->sender->build->len; +void sender_commit(struct sender_state *s, BUFFER *wb) { + + if(unlikely(wb != sender_thread_buffer)) + fatal("STREAMING: sender is trying to commit a buffer that is not this thread's buffer."); + + if(unlikely(!sender_thread_buffer_used)) + fatal("STREAMING: sender is committing a buffer twice."); + + sender_thread_buffer_used = false; + + char *src = (char *)buffer_tostring(wb); + size_t src_len = buffer_strlen(wb); + + if(unlikely(!src || !src_len)) + return; + + netdata_mutex_lock(&s->mutex); + + if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { + info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", + rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE); + + s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; + } + #ifdef ENABLE_COMPRESSION - if (src && src_len) { - if (s->compressor && s->rrdpush_compression) { - src_len = s->compressor->compress(s->compressor, src, src_len, &src); - if (!src_len) { - deactivate_compression(s); - buffer_flush(s->build); - netdata_mutex_unlock(&s->mutex); - return; + if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) { + while(src_len) { + size_t size_to_compress = src_len; + + if(unlikely(size_to_compress > COMPRESSION_MAX_MSG_SIZE)) { + if (stream_has_capability(s, STREAM_CAP_BINARY)) + size_to_compress = COMPRESSION_MAX_MSG_SIZE; + else { + if (size_to_compress > COMPRESSION_MAX_MSG_SIZE) { + // we need to find the last newline + // so that the decompressor will have a whole line to work with + + const char *t = &src[COMPRESSION_MAX_MSG_SIZE]; + while (--t >= src) + if (unlikely(*t == '\n')) + break; + + if (t <= src) { + size_to_compress = COMPRESSION_MAX_MSG_SIZE; + } else + size_to_compress = t - src + 1; + } + } } + + char *dst; + size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst); + if (!dst_len) { + error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying", + rrdhost_hostname(s->host), s->connected_to); + + s->compressor->reset(s->compressor); + dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst); + if(!dst_len) { + error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression", + rrdhost_hostname(s->host), s->connected_to); + + deactivate_compression(s); + netdata_mutex_unlock(&s->mutex); + return; + } + } + + if(cbuffer_add_unsafe(s->host->sender->buffer, dst, dst_len)) + s->flags |= SENDER_FLAG_OVERFLOW; + + src = src + size_to_compress; + src_len -= size_to_compress; } - if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) - s->overflow = 1; } + else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) + s->flags |= SENDER_FLAG_OVERFLOW; #else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) - s->overflow = 1; + s->flags |= SENDER_FLAG_OVERFLOW; #endif - buffer_flush(s->build); - netdata_mutex_unlock(&s->mutex); -} + replication_recalculate_buffer_used_ratio_unsafe(s); -static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { - __atomic_clear(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST); - - if(host->rrdpush_sender_socket != -1) { - close(host->rrdpush_sender_socket); - host->rrdpush_sender_socket = -1; - } + netdata_mutex_unlock(&s->mutex); + rrdpush_signal_sender_to_wake_up(s); } -static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, RRDVAR *rv) { - NETDATA_DOUBLE *value = (NETDATA_DOUBLE *)rv->value; - +static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) { buffer_sprintf( - host->sender->build + wb , "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n" - , rv->name - , *value + , rrdvar_name(rva) + , rrdvar2number(rva) ); - debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rv->name, *value); + debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva)); } -void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) { - if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && __atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)) { - sender_start(host->sender); - rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv); - sender_commit(host->sender); +void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) { + if(rrdhost_can_send_definitions_to_parent(host)) { + BUFFER *wb = sender_start(host->sender); + rrdpush_sender_add_host_variable_to_buffer(wb, rva); + sender_commit(host->sender, wb); } } +struct custom_host_variables_callback { + BUFFER *wb; +}; -static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr, void *host_ptr) { - RRDVAR *rv = (RRDVAR *)rrdvar_ptr; - RRDHOST *host = (RRDHOST *)host_ptr; +static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdvar_ptr __maybe_unused, void *struct_ptr) { + const RRDVAR_ACQUIRED *rv = (const RRDVAR_ACQUIRED *)item; + struct custom_host_variables_callback *tmp = struct_ptr; + BUFFER *wb = tmp->wb; - if(unlikely(rv->options & RRDVAR_OPTION_CUSTOM_HOST_VAR && rv->type == RRDVAR_TYPE_CALCULATED)) { - rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv); - - // return 1, so that the traversal will return the number of variables sent + if(unlikely(rrdvar_flags(rv) & RRDVAR_FLAG_CUSTOM_HOST_VAR && rrdvar_type(rv) == RRDVAR_TYPE_CALCULATED)) { + rrdpush_sender_add_host_variable_to_buffer(wb, rv); return 1; } - - // returning a negative number will break the traversal return 0; } static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { - sender_start(host->sender); - int ret = rrdvar_callback_for_all_host_variables(host, rrdpush_sender_thread_custom_host_variables_callback, host); - (void)ret; - sender_commit(host->sender); - - debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); + if(rrdhost_can_send_definitions_to_parent(host)) { + BUFFER *wb = sender_start(host->sender); + struct custom_host_variables_callback tmp = { + .wb = wb + }; + int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp); + (void)ret; + sender_commit(host->sender, wb); + + debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); + } } // resets all the chart, so that their definitions // will be resent to the central netdata static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { - rrdhost_rdlock(host); + error("Clearing stream_collected_metrics flag in charts of host %s", rrdhost_hostname(host)); RRDSET *st; rrdset_foreach_read(st, host) { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); st->upstream_resync_time = 0; - rrdset_rdlock(st); - RRDDIM *rd; rrddim_foreach_read(rd, st) rd->exposed = 0; - - rrdset_unlock(st); + rrddim_foreach_done(rd); } + rrdset_foreach_done(st); - rrdhost_unlock(host); + rrdhost_sender_replicating_charts_zero(host); } -static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) { +static void rrdpush_sender_cbuffer_flush(RRDHOST *host) { + rrdpush_sender_set_flush_time(host->sender); + netdata_mutex_lock(&host->sender->mutex); - size_t len = cbuffer_next_unsafe(host->sender->buffer, NULL); - if (len) - error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", host->hostname, len); + // flush the output buffer from any data it may have + cbuffer_flush(host->sender->buffer); + replication_recalculate_buffer_used_ratio_unsafe(host->sender); - cbuffer_remove_unsafe(host->sender->buffer, len); netdata_mutex_unlock(&host->sender->mutex); +} + +static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) { + rrdpush_sender_set_flush_time(host->sender); + + // stop all replication commands inflight + replication_sender_delete_pending_requests(host->sender); + // reset the state of all charts rrdpush_sender_thread_reset_all_charts(host); + + rrdpush_sender_replicating_charts_zero(host->sender); +} + +static void rrdpush_sender_on_connect(RRDHOST *host) { + rrdpush_sender_cbuffer_flush(host); + rrdpush_sender_charts_and_replication_reset(host); rrdpush_sender_thread_send_custom_host_variables(host); } -static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) { - rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); - rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_STOP); +static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { + if(host->sender->rrdpush_sender_socket != -1) { + close(host->sender->rrdpush_sender_socket); + host->sender->rrdpush_sender_socket = -1; + } + + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); + + // do not flush the circular buffer here + // this function is called sometimes with the mutex lock, sometimes without the lock + rrdpush_sender_charts_and_replication_reset(host); } void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) @@ -203,52 +312,123 @@ void rrdpush_clean_encoded(stream_encoded_t *se) freez(se->kernel_version); } -static inline long int parse_stream_version_for_errors(char *http) -{ - if (!memcmp(http, START_STREAMING_ERROR_SAME_LOCALHOST, sizeof(START_STREAMING_ERROR_SAME_LOCALHOST))) - return -2; - else if (!memcmp(http, START_STREAMING_ERROR_ALREADY_STREAMING, sizeof(START_STREAMING_ERROR_ALREADY_STREAMING))) - return -3; - else if (!memcmp(http, START_STREAMING_ERROR_NOT_PERMITTED, sizeof(START_STREAMING_ERROR_NOT_PERMITTED))) - return -4; - else - return -1; -} +struct { + const char *response; + size_t length; + int32_t version; + bool dynamic; + const char *error; + int worker_job_id; + time_t postpone_reconnect_seconds; +} stream_responses[] = { + { + .response = START_STREAMING_PROMPT_VN, + .length = sizeof(START_STREAMING_PROMPT_VN) - 1, + .version = STREAM_HANDSHAKE_OK_V3, // and above + .dynamic = true, // dynamic = we will parse the version / capabilities + .error = NULL, + .worker_job_id = 0, + .postpone_reconnect_seconds = 0, + }, + { + .response = START_STREAMING_PROMPT_V2, + .length = sizeof(START_STREAMING_PROMPT_V2) - 1, + .version = STREAM_HANDSHAKE_OK_V2, + .dynamic = false, + .error = NULL, + .worker_job_id = 0, + .postpone_reconnect_seconds = 0, + }, + { + .response = START_STREAMING_PROMPT_V1, + .length = sizeof(START_STREAMING_PROMPT_V1) - 1, + .version = STREAM_HANDSHAKE_OK_V1, + .dynamic = false, + .error = NULL, + .worker_job_id = 0, + .postpone_reconnect_seconds = 0, + }, + { + .response = START_STREAMING_ERROR_SAME_LOCALHOST, + .length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1, + .version = STREAM_HANDSHAKE_ERROR_LOCALHOST, + .dynamic = false, + .error = "remote server rejected this stream, the host we are trying to stream is its localhost", + .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, + .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour + }, + { + .response = START_STREAMING_ERROR_ALREADY_STREAMING, + .length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1, + .version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, + .dynamic = false, + .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it", + .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, + .postpone_reconnect_seconds = 2 * 60, // 2 minutes + }, + { + .response = START_STREAMING_ERROR_NOT_PERMITTED, + .length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1, + .version = STREAM_HANDSHAKE_ERROR_DENIED, + .dynamic = false, + .error = "remote server denied access, probably we don't have the right API key?", + .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, + .postpone_reconnect_seconds = 1 * 60, // 1 minute + }, + + // terminator + { + .response = NULL, + .length = 0, + .version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, + .dynamic = false, + .error = "remote node response is not understood, is it Netdata?", + .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, + .postpone_reconnect_seconds = 1 * 60, // 1 minute + } +}; -static inline long int parse_stream_version(RRDHOST *host, char *http) -{ - long int stream_version = -1; - int answer = -1; - char *stream_version_start = strchr(http, '='); - if (stream_version_start) { - stream_version_start++; - stream_version = strtol(stream_version_start, NULL, 10); - answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(stream_version_start - http)); - if (!answer) { - rrdpush_set_flags_to_newest_stream(host); +static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender_state *s, char *http, size_t http_length) { + int32_t version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE; + + int i; + for(i = 0; stream_responses[i].response ; i++) { + if(stream_responses[i].dynamic && + http_length > stream_responses[i].length && http_length < (stream_responses[i].length + 30) && + strncmp(http, stream_responses[i].response, stream_responses[i].length) == 0) { + + version = str2i(&http[stream_responses[i].length]); + break; } - } else { - answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2)); - if (!answer) { - stream_version = 1; - rrdpush_set_flags_to_newest_stream(host); - } else { - answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)); - if (!answer) { - stream_version = 0; - rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_STOP); - rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); - } - else { - stream_version = parse_stream_version_for_errors(http); - } + else if(http_length == stream_responses[i].length && strcmp(http, stream_responses[i].response) == 0) { + version = stream_responses[i].version; + + break; } } - return stream_version; + const char *error = stream_responses[i].error; + int worker_job_id = stream_responses[i].worker_job_id; + time_t delay = stream_responses[i].postpone_reconnect_seconds; + + if(version >= STREAM_HANDSHAKE_OK_V1) { + host->destination->last_error = NULL; + host->destination->last_handshake = version; + host->destination->postpone_reconnection_until = 0; + s->capabilities = convert_stream_version_to_capabilities(version); + return true; + } + + error("STREAM %s [send to %s]: %s.", rrdhost_hostname(host), s->connected_to, error); + + worker_is_busy(worker_job_id); + rrdpush_sender_thread_close_socket(host); + host->destination->last_error = error; + host->destination->last_handshake = version; + host->destination->postpone_reconnection_until = now_realtime_sec() + delay; + return false; } -static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, - struct sender_state *s) { +static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) { struct timeval tv = { .tv_sec = timeout, @@ -258,11 +438,8 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po // make sure the socket is closed rrdpush_sender_thread_close_socket(host); - debug(D_STREAM, "STREAM: Attempting to connect..."); - info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_send_destination); - - host->rrdpush_sender_socket = connect_to_one_of_destinations( - host->destinations + s->rrdpush_sender_socket = connect_to_one_of_destinations( + host , default_port , &tv , &s->reconnects_counter @@ -271,48 +448,50 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po , &host->destination ); - if(unlikely(host->rrdpush_sender_socket == -1)) { - error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_send_destination); - return 0; + if(unlikely(s->rrdpush_sender_socket == -1)) { + error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination); + return false; } - info("STREAM %s [send to %s]: initializing communication...", host->hostname, s->connected_to); + info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to); #ifdef ENABLE_HTTPS - if( netdata_client_ctx ){ - host->ssl.flags = NETDATA_SSL_START; - if (!host->ssl.conn){ - host->ssl.conn = SSL_new(netdata_client_ctx); - if(!host->ssl.conn){ + if(netdata_ssl_client_ctx){ + host->sender->ssl.flags = NETDATA_SSL_START; + if (!host->sender->ssl.conn){ + host->sender->ssl.conn = SSL_new(netdata_ssl_client_ctx); + if(!host->sender->ssl.conn){ error("Failed to allocate SSL structure."); - host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; + host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; } } else{ - SSL_clear(host->ssl.conn); + SSL_clear(host->sender->ssl.conn); } - if (host->ssl.conn) + if (host->sender->ssl.conn) { - if (SSL_set_fd(host->ssl.conn, host->rrdpush_sender_socket) != 1) { - error("Failed to set the socket to the SSL on socket fd %d.", host->rrdpush_sender_socket); - host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; + if (SSL_set_fd(host->sender->ssl.conn, s->rrdpush_sender_socket) != 1) { + error("Failed to set the socket to the SSL on socket fd %d.", s->rrdpush_sender_socket); + host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; } else{ - host->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE; + host->sender->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE; } } } else { - host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; + host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; } #endif + // reset our capabilities to default + s->capabilities = STREAM_OUR_CAPABILITIES; + #ifdef ENABLE_COMPRESSION -// Negotiate stream VERSION_CLABELS if stream compression is not supported -s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION)); -if(!s->rrdpush_compression) - s->version = STREAM_VERSION_CLABELS; -#endif //ENABLE_COMPRESSION + // If we don't want compression, remove it from our capabilities + if(!(s->flags & SENDER_FLAG_COMPRESSION)) + s->capabilities &= ~STREAM_CAP_COMPRESSION; +#endif // ENABLE_COMPRESSION /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the version negotiation resulted in a high enough version. @@ -337,7 +516,7 @@ if(!s->rrdpush_compression) "&ml_enabled=%d" "&mc_version=%d" "&tags=%s" - "&ver=%d" + "&ver=%u" "&NETDATA_INSTANCE_CLOUD_TYPE=%s" "&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s" "&NETDATA_INSTANCE_CLOUD_INSTANCE_REGION=%s" @@ -370,20 +549,20 @@ if(!s->rrdpush_compression) "User-Agent: %s/%s\r\n" "Accept: */*\r\n\r\n" , host->rrdpush_send_api_key - , host->hostname - , host->registry_hostname + , rrdhost_hostname(host) + , rrdhost_registry_hostname(host) , host->machine_guid , default_rrd_update_every - , host->os - , host->timezone - , host->abbrev_timezone + , rrdhost_os(host) + , rrdhost_timezone(host) + , rrdhost_abbrev_timezone(host) , host->utc_offset , host->system_info->hops + 1 , host->system_info->ml_capable , host->system_info->ml_enabled , host->system_info->mc_version - , (host->tags) ? host->tags : "" - , s->version + , rrdhost_tags(host) + , s->capabilities , (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : "" , (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : "" , (host->system_info->cloud_instance_region) ? host->system_info->cloud_instance_region : "" @@ -412,146 +591,128 @@ if(!s->rrdpush_compression) , (host->system_info->host_ram_total) ? host->system_info->host_ram_total : "" , (host->system_info->host_disk_space) ? host->system_info->host_disk_space : "" , STREAMING_PROTOCOL_VERSION - , host->program_name - , host->program_version + , rrdhost_program_name(host) + , rrdhost_program_version(host) ); http[eol] = 0x00; rrdpush_clean_encoded(&se); #ifdef ENABLE_HTTPS - if (!host->ssl.flags) { + if (!host->sender->ssl.flags) { ERR_clear_error(); - SSL_set_connect_state(host->ssl.conn); - int err = SSL_connect(host->ssl.conn); + SSL_set_connect_state(host->sender->ssl.conn); + int err = SSL_connect(host->sender->ssl.conn); if (err != 1){ - err = SSL_get_error(host->ssl.conn, err); - error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->ssl.conn,err),NULL)); + err = SSL_get_error(host->sender->ssl.conn, err); + error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->sender->ssl.conn,err),NULL)); if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); rrdpush_sender_thread_close_socket(host); - if (host->destination->next) - host->destination->disabled_no_proper_reply = 1; - return 0; - }else { - host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; + host->destination->last_error = "SSL error"; + host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR; + host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; + return false; + } + else { + host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; } } else { if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) { - if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE) { - if ( security_test_certificate(host->ssl.conn)) { + if (netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE) { + if ( security_test_certificate(host->sender->ssl.conn)) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); error("Closing the stream connection, because the server SSL certificate is not valid."); rrdpush_sender_thread_close_socket(host); - if (host->destination->next) - host->destination->disabled_no_proper_reply = 1; - return 0; + host->destination->last_error = "invalid SSL certificate"; + host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE; + host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; + return false; } } } } } - if(send_timeout(&host->ssl,host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) { -#else - if(send_timeout(host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) { #endif + + ssize_t bytes; + + bytes = send_timeout( +#ifdef ENABLE_HTTPS + &host->sender->ssl, +#endif + s->rrdpush_sender_socket, + http, + strlen(http), + 0, + timeout); + + if(bytes <= 0) { // timeout is 0 worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); - error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", host->hostname, s->connected_to); rrdpush_sender_thread_close_socket(host); - return 0; + error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to); + host->destination->last_error = "timeout while sending request"; + host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT; + host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60; + return false; } - info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, s->connected_to); + info("STREAM %s [send to %s]: waiting response from remote netdata...", rrdhost_hostname(host), s->connected_to); - ssize_t received; + bytes = recv_timeout( #ifdef ENABLE_HTTPS - received = recv_timeout(&host->ssl,host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout); - if(received == -1) { -#else - received = recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout); - if(received == -1) { + &host->sender->ssl, #endif + s->rrdpush_sender_socket, + http, + HTTP_HEADER_SIZE, + 0, + timeout); + + if(bytes <= 0) { // timeout is 0 worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); - error("STREAM %s [send to %s]: remote netdata does not respond.", host->hostname, s->connected_to); rrdpush_sender_thread_close_socket(host); - return 0; + error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to); + host->destination->last_error = "timeout while expecting first response"; + host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT; + host->destination->postpone_reconnection_until = now_realtime_sec() + 30; + return false; } - http[received] = '\0'; + http[bytes] = '\0'; debug(D_STREAM, "Response to sender from far end: %s", http); - int32_t version = (int32_t)parse_stream_version(host, http); - if(version == -1) { - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE); - error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, s->connected_to); - rrdpush_sender_thread_close_socket(host); - //catch other reject reasons and force to check other destinations - if (host->destination->next) - host->destination->disabled_no_proper_reply = 1; - return 0; - } - else if(version == -2) { - error("STREAM %s [send to %s]: remote server is the localhost for [%s].", host->hostname, s->connected_to, host->hostname); - rrdpush_sender_thread_close_socket(host); - host->destination->disabled_because_of_localhost = 1; - return 0; - } - else if(version == -3) { - error("STREAM %s [send to %s]: remote server already receives metrics for [%s].", host->hostname, s->connected_to, host->hostname); - rrdpush_sender_thread_close_socket(host); - host->destination->disabled_already_streaming = now_realtime_sec(); - return 0; - } - else if(version == -4) { - error("STREAM %s [send to %s]: remote server denied access for [%s].", host->hostname, s->connected_to, host->hostname); - rrdpush_sender_thread_close_socket(host); - if (host->destination->next) - host->destination->disabled_because_of_denied_access = 1; - return 0; - } - s->version = version; + if(!rrdpush_sender_validate_response(host, s, http, bytes)) + return false; #ifdef ENABLE_COMPRESSION - s->rrdpush_compression = (s->rrdpush_compression && (s->version >= STREAM_VERSION_COMPRESSION)); - if(s->rrdpush_compression) - { - // parent supports compression - if(s->compressor) + if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) { + if(!s->compressor) + s->compressor = create_compressor(); + else s->compressor->reset(s->compressor); } - else { - //parent does not support compression or has compression disabled - debug(D_STREAM, "Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname); - infoerr("Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname); - s->version = STREAM_VERSION_CLABELS; - } #endif //ENABLE_COMPRESSION + log_sender_capabilities(s); - info("STREAM %s [send to %s]: established communication with a parent using protocol version %d - ready to send metrics..." - , host->hostname - , s->connected_to - , s->version); - - if(sock_setnonblock(host->rrdpush_sender_socket) < 0) - error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, s->connected_to); + if(sock_setnonblock(s->rrdpush_sender_socket) < 0) + error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); - if(sock_enlarge_out(host->rrdpush_sender_socket) < 0) - error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", host->hostname, s->connected_to); + if(sock_enlarge_out(s->rrdpush_sender_socket) < 0) + error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to); - debug(D_STREAM, "STREAM: Connected on fd %d...", host->rrdpush_sender_socket); + debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket); - return 1; + return true; } -static void attempt_to_connect(struct sender_state *state) +static bool attempt_to_connect(struct sender_state *state) { state->send_attempts = 0; if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) { - state->last_sent_t = now_monotonic_sec(); - // reset the buffer, to properly send charts and metrics - rrdpush_sender_thread_data_flush(state->host); + rrdpush_sender_on_connect(state->host); // send from the beginning state->begin = 0; @@ -563,372 +724,628 @@ static void attempt_to_connect(struct sender_state *state) state->sent_bytes_on_this_connection = 0; // let the data collection threads know we are ready - __atomic_test_and_set(&state->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST); + rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); + + return true; } - else { - // increase the failed connections counter - state->not_connected_loops++; - // reset the number of bytes sent - state->sent_bytes_on_this_connection = 0; + // we couldn't connect - // slow re-connection on repeating errors - sleep_usec(USEC_PER_SEC * state->reconnect_delay); // seconds - } + // increase the failed connections counter + state->not_connected_loops++; + + // reset the number of bytes sent + state->sent_bytes_on_this_connection = 0; + + // slow re-connection on repeating errors + sleep_usec(USEC_PER_SEC * state->reconnect_delay); // seconds + + return false; } // TCP window is open and we have data to transmit. -void attempt_to_send(struct sender_state *s) { - - rrdpush_send_labels(s->host); +static ssize_t attempt_to_send(struct sender_state *s) { + ssize_t ret = 0; #ifdef NETDATA_INTERNAL_CHECKS struct circular_buffer *cb = s->buffer; #endif - netdata_thread_disable_cancelability(); netdata_mutex_lock(&s->mutex); char *chunk; size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk); debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); - ssize_t ret; + #ifdef ENABLE_HTTPS - SSL *conn = s->host->ssl.conn ; - if(conn && !s->host->ssl.flags) { - ret = SSL_write(conn, chunk, outstanding); - } else { - ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT); - } + SSL *conn = s->host->sender->ssl.conn ; + if(conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) + ret = netdata_ssl_write(conn, chunk, outstanding); + else + ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT); #else - ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT); + ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT); #endif + if (likely(ret > 0)) { cbuffer_remove_unsafe(s->buffer, ret); s->sent_bytes_on_this_connection += ret; s->sent_bytes += ret; - debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", s->host->hostname, s->connected_to, ret); - s->last_sent_t = now_monotonic_sec(); + debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret); } else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK)) - debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", s->host->hostname, s->connected_to); + debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to); else if (ret == -1) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR); debug(D_STREAM, "STREAM: Send failed - closing socket..."); - error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", s->host->hostname, s->connected_to, s->sent_bytes_on_this_connection); + error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection); rrdpush_sender_thread_close_socket(s->host); } - else { + else debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission"); - } + replication_recalculate_buffer_used_ratio_unsafe(s); netdata_mutex_unlock(&s->mutex); - netdata_thread_enable_cancelability(); + + return ret; } -void attempt_read(struct sender_state *s) { -int ret; +static ssize_t attempt_read(struct sender_state *s) { + ssize_t ret = 0; + #ifdef ENABLE_HTTPS - if (s->host->ssl.conn && !s->host->stream_ssl.flags) { - ERR_clear_error(); - int desired = sizeof(s->read_buffer) - s->read_len - 1; - ret = SSL_read(s->host->ssl.conn, s->read_buffer, desired); + if (s->host->sender->ssl.conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { + size_t desired = sizeof(s->read_buffer) - s->read_len - 1; + ret = netdata_ssl_read(s->host->sender->ssl.conn, s->read_buffer, desired); if (ret > 0 ) { - s->read_len += ret; - return; + s->read_len += (int)ret; + return ret; } - int sslerrno = SSL_get_error(s->host->ssl.conn, desired); - if (sslerrno == SSL_ERROR_WANT_READ || sslerrno == SSL_ERROR_WANT_WRITE) - return; worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); - u_long err; - char buf[256]; - while ((err = ERR_get_error()) != 0) { - ERR_error_string_n(err, buf, sizeof(buf)); - error("STREAM %s [send to %s] ssl error: %s", s->host->hostname, s->connected_to, buf); - } - error("Restarting connection"); rrdpush_sender_thread_close_socket(s->host); - return; + return ret; } #endif - ret = recv(s->host->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT); - if (ret>0) { + ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT); + if (ret > 0) { s->read_len += ret; - return; + return ret; } - debug(D_STREAM, "Socket was POLLIN, but req %zu bytes gave %d", sizeof(s->read_buffer) - s->read_len - 1, ret); - - if (ret<0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) - return; + if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) + return ret; - if (ret==0) { + if (ret == 0 || errno == ECONNRESET) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED); - error("STREAM %s [send to %s]: connection closed by far end. Restarting connection", s->host->hostname, s->connected_to); + error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to); } else { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR); - error("STREAM %s [send to %s]: error during receive (%d). Restarting connection", s->host->hostname, s->connected_to, ret); + error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret); } rrdpush_sender_thread_close_socket(s->host); + + return ret; +} + +struct inflight_stream_function { + struct sender_state *sender; + STRING *transaction; + usec_t received_ut; +}; + +void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { + struct inflight_stream_function *tmp = data; + + struct sender_state *s = tmp->sender; + + if(rrdhost_can_send_definitions_to_parent(s->host)) { + BUFFER *wb = sender_start(s); + + pluginsd_function_result_begin_to_buffer(wb + , string2str(tmp->transaction) + , code + , functions_content_type_to_format(func_wb->contenttype) + , func_wb->expires); + + buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb)); + pluginsd_function_result_end_to_buffer(wb); + + sender_commit(s, wb); + + internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).", + rrdhost_hostname(s->host), s->connected_to, + string2str(tmp->transaction), + buffer_strlen(func_wb), + now_realtime_usec() - tmp->received_ut); + } + string_freez(tmp->transaction); + buffer_free(func_wb); + freez(tmp); } // This is just a placeholder until the gap filling state machine is inserted void execute_commands(struct sender_state *s) { + worker_is_busy(WORKER_SENDER_JOB_EXECUTE); + char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline; *end = 0; - while( starthost->hostname, s->connected_to, start); - start = newline+1; + while( start < end && (newline = strchr(start, '\n')) ) { + *newline = '\0'; + + log_access("STREAM: %d from '%s' for host '%s': %s", + gettid(), s->connected_to, rrdhost_hostname(s->host), start); + + internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start); + + char *words[PLUGINSD_MAX_WORDS] = { NULL }; + size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); + + const char *keyword = get_word(words, num_words, 0); + + if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { + worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + + char *transaction = get_word(words, num_words, 1); + char *timeout_s = get_word(words, num_words, 2); + char *function = get_word(words, num_words, 3); + + if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { + error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", + rrdhost_hostname(s->host), s->connected_to, + keyword, + transaction?transaction:"(unset)", + timeout_s?timeout_s:"(unset)", + function?function:"(unset)"); + } + else { + int timeout = str2i(timeout_s); + if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; + + struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function)); + tmp->received_ut = now_realtime_usec(); + tmp->sender = s; + tmp->transaction = string_strdupz(transaction); + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1); + + int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp); + if(code != HTTP_RESP_OK) { + rrd_call_function_error(wb, "Failed to route request to collector", code); + stream_execute_function_callback(wb, code, tmp); + } + } + } + else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { + worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST); + + const char *chart_id = get_word(words, num_words, 1); + const char *start_streaming = get_word(words, num_words, 2); + const char *after = get_word(words, num_words, 3); + const char *before = get_word(words, num_words, 4); + + if (!chart_id || !start_streaming || !after || !before) { + error("STREAM %s [send to %s] %s command is incomplete" + " (chart=%s, start_streaming=%s, after=%s, before=%s)", + rrdhost_hostname(s->host), s->connected_to, + keyword, + chart_id ? chart_id : "(unset)", + start_streaming ? start_streaming : "(unset)", + after ? after : "(unset)", + before ? before : "(unset)"); + } + else { + replication_add_request(s, chart_id, + strtoll(after, NULL, 0), + strtoll(before, NULL, 0), + !strcmp(start_streaming, "true") + ); + } + } + else { + error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)"); + } + + worker_is_busy(WORKER_SENDER_JOB_EXECUTE); + start = newline + 1; } - if (startread_buffer, start, end-start); - s->read_len = end-start; + s->read_len = end - start; + } + else { + s->read_buffer[0] = '\0'; + s->read_len = 0; } } +struct rrdpush_sender_thread_data { + struct sender_state *sender_state; + RRDHOST *host; + char *pipe_buffer; +}; -static void rrdpush_sender_thread_cleanup_callback(void *ptr) { - worker_unregister(); +static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen) { + static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER; - RRDHOST *host = (RRDHOST *)ptr; + bool ret = true; - netdata_mutex_lock(&host->sender->mutex); + netdata_mutex_lock(&mutex); - info("STREAM %s [send]: sending thread cleans up...", host->hostname); + int new_pipe_fds[2]; + if(reopen) { + if(pipe(new_pipe_fds) != 0) { + error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host)); + new_pipe_fds[PIPE_READ] = -1; + new_pipe_fds[PIPE_WRITE] = -1; + ret = false; + } + } - rrdpush_sender_thread_close_socket(host); + int old_pipe_fds[2]; + old_pipe_fds[PIPE_READ] = pipe_fds[PIPE_READ]; + old_pipe_fds[PIPE_WRITE] = pipe_fds[PIPE_WRITE]; - // close the pipe - if(host->rrdpush_sender_pipe[PIPE_READ] != -1) { - close(host->rrdpush_sender_pipe[PIPE_READ]); - host->rrdpush_sender_pipe[PIPE_READ] = -1; + if(reopen) { + pipe_fds[PIPE_READ] = new_pipe_fds[PIPE_READ]; + pipe_fds[PIPE_WRITE] = new_pipe_fds[PIPE_WRITE]; + } + else { + pipe_fds[PIPE_READ] = -1; + pipe_fds[PIPE_WRITE] = -1; } - if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1) { - close(host->rrdpush_sender_pipe[PIPE_WRITE]); - host->rrdpush_sender_pipe[PIPE_WRITE] = -1; + if(old_pipe_fds[PIPE_READ] > 2) + close(old_pipe_fds[PIPE_READ]); + + if(old_pipe_fds[PIPE_WRITE] > 2) + close(old_pipe_fds[PIPE_WRITE]); + + netdata_mutex_unlock(&mutex); + return ret; +} + +void rrdpush_signal_sender_to_wake_up(struct sender_state *s) { + if(unlikely(s->tid == gettid())) + return; + + RRDHOST *host = s->host; + + int pipe_fd = s->rrdpush_sender_pipe[PIPE_WRITE]; + + // signal the sender there are more data + if (pipe_fd != -1 && write(pipe_fd, " ", 1) == -1) { + error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host)); + rrdpush_sender_pipe_close(host, s->rrdpush_sender_pipe, true); } +} + +static void rrdpush_sender_thread_cleanup_callback(void *ptr) { + struct rrdpush_sender_thread_data *data = ptr; + worker_unregister(); + + RRDHOST *host = data->host; - if(!host->rrdpush_sender_join) { - info("STREAM %s [send]: sending thread detaches itself.", host->hostname); + netdata_mutex_lock(&host->sender->mutex); + + info("STREAM %s [send]: sending thread cleans up...", rrdhost_hostname(host)); + + rrdpush_sender_thread_close_socket(host); + rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); + + if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN)) { + info("STREAM %s [send]: sending thread detaches itself.", rrdhost_hostname(host)); netdata_thread_detach(netdata_thread_self()); } - host->rrdpush_sender_spawn = 0; + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); - info("STREAM %s [send]: sending thread now exits.", host->hostname); + info("STREAM %s [send]: sending thread now exits.", rrdhost_hostname(host)); netdata_mutex_unlock(&host->sender->mutex); + + freez(data->pipe_buffer); + freez(data); } -void sender_init(RRDHOST *parent) +void sender_init(RRDHOST *host) { - if (parent->sender) + if (host->sender) return; - parent->sender = callocz(1, sizeof(*parent->sender)); - parent->sender->host = parent; - parent->sender->buffer = cbuffer_new(1024, 1024*1024); - parent->sender->build = buffer_create(1); + host->sender = callocz(1, sizeof(*host->sender)); + host->sender->host = host; + host->sender->buffer = cbuffer_new(1024, 1024 * 1024); + host->sender->capabilities = STREAM_OUR_CAPABILITIES; + + host->sender->rrdpush_sender_pipe[PIPE_READ] = -1; + host->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1; + host->sender->rrdpush_sender_socket = -1; + #ifdef ENABLE_COMPRESSION - parent->sender->rrdpush_compression = default_compression_enabled; - if (default_compression_enabled) - parent->sender->compressor = create_compressor(); + if(default_compression_enabled) { + host->sender->flags |= SENDER_FLAG_COMPRESSION; + host->sender->compressor = create_compressor(); + } + else + host->sender->flags &= ~SENDER_FLAG_COMPRESSION; #endif - netdata_mutex_init(&parent->sender->mutex); + + netdata_mutex_init(&host->sender->mutex); + replication_init_sender(host->sender); } void *rrdpush_sender_thread(void *ptr) { + worker_register("STREAMSND"); + worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect"); + worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read"); + worker_register_job_name(WORKER_SENDER_JOB_SOCKET_RECEIVE, "receive"); + worker_register_job_name(WORKER_SENDER_JOB_EXECUTE, "execute"); + worker_register_job_name(WORKER_SENDER_JOB_SOCKET_SEND, "send"); + + // disconnection reasons + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR, "disconnect socket error"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR, "disconnect receive error"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake"); + + worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request"); + worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function"); + + worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE); + struct sender_state *s = ptr; - s->task_id = gettid(); + s->tid = gettid(); - if(!s->host->rrdpush_send_enabled || !s->host->rrdpush_send_destination || + if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination || !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key || !*s->host->rrdpush_send_api_key) { error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", - s->host->hostname, s->task_id); + rrdhost_hostname(s->host), s->tid); return NULL; } #ifdef ENABLE_HTTPS if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ){ security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING); - security_location_for_context(netdata_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path); + ssl_security_location_for_context(netdata_ssl_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path); } #endif - info("STREAM %s [send]: thread created (task id %d)", s->host->hostname, s->task_id); + info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), s->tid); + + s->timeout = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600); + + s->default_port = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "default port", 19999); + + s->buffer->max_size = (size_t)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024 * 10); + + s->reconnect_delay = (unsigned int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5); - s->timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60); - s->default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999); - s->buffer->max_size = - (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024 * 10); - s->reconnect_delay = - (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5); remote_clock_resync_iterations = (unsigned int)appconfig_get_number( &stream_config, CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING // initialize rrdpush globals - __atomic_clear(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST); - if(pipe(s->host->rrdpush_sender_pipe) == -1) { - error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname); + rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); + + int pipe_buffer_size = 10 * 1024; +#ifdef F_GETPIPE_SZ + pipe_buffer_size = fcntl(s->rrdpush_sender_pipe[PIPE_READ], F_GETPIPE_SZ); +#endif + if(pipe_buffer_size < 10 * 1024) + pipe_buffer_size = 10 * 1024; + + if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) { + error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.", + rrdhost_hostname(s->host)); return NULL; } - s->version = STREAMING_PROTOCOL_CURRENT_VERSION; - enum { - Collector, - Socket - }; - struct pollfd fds[2]; - fds[Collector].fd = s->host->rrdpush_sender_pipe[PIPE_READ]; - fds[Collector].events = POLLIN; + struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data)); + thread_data->pipe_buffer = mallocz(pipe_buffer_size); + thread_data->sender_state = s; + thread_data->host = s->host; - worker_register("STREAMSND"); - worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect"); - worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read"); - worker_register_job_name(WORKER_SENDER_JOB_SOCKET_RECEIVE, "receive"); - worker_register_job_name(WORKER_SENDER_JOB_EXECUTE, "execute"); - worker_register_job_name(WORKER_SENDER_JOB_SOCKET_SEND, "send"); + // reset our cleanup flags + rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN); - // disconnection reasons - worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout"); - worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error"); - worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR, "disconnect socket error"); - worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow"); - worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error"); - worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed"); - worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR, "disconnect receive error"); - worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error"); - worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression"); - worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake"); + netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data); - netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, s->host); - for(; s->host->rrdpush_send_enabled && !netdata_exit ;) { + for(; rrdhost_has_rrdpush_sender_enabled(s->host) && !netdata_exit ;) { // check for outstanding cancellation requests netdata_thread_testcancel(); // The connection attempt blocks (after which we use the socket in nonblocking) - if(unlikely(s->host->rrdpush_sender_socket == -1)) { + if(unlikely(s->rrdpush_sender_socket == -1)) { worker_is_busy(WORKER_SENDER_JOB_CONNECT); - s->overflow = 0; + rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + s->flags &= ~SENDER_FLAG_OVERFLOW; s->read_len = 0; s->buffer->read = 0; s->buffer->write = 0; - attempt_to_connect(s); - if (s->version >= VERSION_GAP_FILLING) { - time_t now = now_realtime_sec(); - sender_start(s); - buffer_sprintf(s->build, "TIMESTAMP %"PRId64"", (int64_t)now); - sender_commit(s); - } + + if(unlikely(!attempt_to_connect(s))) + continue; + + s->last_traffic_seen_t = now_monotonic_sec(); rrdpush_claimed_id(s->host); + rrdpush_send_host_labels(s->host); + + rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); + continue; } // If the TCP window never opened then something is wrong, restart connection - if(unlikely(now_monotonic_sec() - s->last_sent_t > s->timeout)) { + if(unlikely(now_monotonic_sec() - s->last_traffic_seen_t > s->timeout && + !rrdpush_sender_pending_replication_requests(s) && + !rrdpush_sender_replicating_charts(s) + )) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); - error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", s->host->hostname, s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts); + error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts); rrdpush_sender_thread_close_socket(s->host); continue; } - worker_is_idle(); - - // Wait until buffer opens in the socket or a rrdset_done_push wakes us - fds[Collector].revents = 0; - fds[Socket].revents = 0; - fds[Socket].fd = s->host->rrdpush_sender_socket; - netdata_mutex_lock(&s->mutex); - char *chunk; - size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, &chunk); - chunk = NULL; // Do not cache pointer outside of region - could be invalidated + size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, NULL); + size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer); netdata_mutex_unlock(&s->mutex); - if(outstanding) { + + worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->host->sender->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->host->sender->buffer->max_size); + + if(outstanding) s->send_attempts++; - fds[Socket].events = POLLIN | POLLOUT; - } - else { - fds[Socket].events = POLLIN; + + if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) { + if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) { + error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.", + rrdhost_hostname(s->host)); + rrdpush_sender_thread_close_socket(s->host); + break; + } } - int retval = poll(fds, 2, 1000); + worker_is_idle(); + + // Wait until buffer opens in the socket or a rrdset_done_push wakes us + enum { + Collector = 0, + Socket = 1, + }; + struct pollfd fds[2] = { + [Collector] = { + .fd = s->rrdpush_sender_pipe[PIPE_READ], + .events = POLLIN, + .revents = 0, + }, + [Socket] = { + .fd = s->rrdpush_sender_socket, + .events = POLLIN | (outstanding ? POLLOUT : 0 ), + .revents = 0, + } + }; + int poll_rc = poll(fds, 2, 1000); + debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...", fds[Collector].revents, fds[Socket].revents, outstanding); if(unlikely(netdata_exit)) break; + internal_error(fds[Collector].fd != s->rrdpush_sender_pipe[PIPE_READ], + "STREAM %s [send to %s]: pipe changed after poll().", rrdhost_hostname(s->host), s->connected_to); + + internal_error(fds[Socket].fd != s->rrdpush_sender_socket, + "STREAM %s [send to %s]: socket changed after poll().", rrdhost_hostname(s->host), s->connected_to); + // Spurious wake-ups without error - loop again - if (retval == 0 || ((retval == -1) && (errno == EAGAIN || errno == EINTR))) { + if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) { debug(D_STREAM, "Spurious wakeup"); continue; } // Only errors from poll() are internal, but try restarting the connection - if(unlikely(retval == -1)) { + if(unlikely(poll_rc == -1)) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR); - error("STREAM %s [send to %s]: failed to poll(). Closing socket.", s->host->hostname, s->connected_to); + error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to); + rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true); rrdpush_sender_thread_close_socket(s->host); continue; } + // If we have data and have seen the TCP window open then try to close it by a transmission. + if(likely(outstanding && (fds[Socket].revents & POLLOUT))) { + worker_is_busy(WORKER_SENDER_JOB_SOCKET_SEND); + ssize_t bytes = attempt_to_send(s); + if(bytes > 0) { + s->last_traffic_seen_t = now_monotonic_sec(); + worker_set_metric(WORKER_SENDER_JOB_BYTES_SENT, (NETDATA_DOUBLE)bytes); + } + } + // If the collector woke us up then empty the pipe to remove the signal - if (fds[Collector].revents & POLLIN || fds[Collector].revents & POLLPRI) { + if (fds[Collector].revents & (POLLIN|POLLPRI)) { worker_is_busy(WORKER_SENDER_JOB_PIPE_READ); debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding); - char buffer[1000 + 1]; - if (read(s->host->rrdpush_sender_pipe[PIPE_READ], buffer, 1000) == -1) - error("STREAM %s [send to %s]: cannot read from internal pipe.", s->host->hostname, s->connected_to); + if (read(fds[Collector].fd, thread_data->pipe_buffer, pipe_buffer_size) == -1) + error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to); } // Read as much as possible to fill the buffer, split into full lines for execution. if (fds[Socket].revents & POLLIN) { worker_is_busy(WORKER_SENDER_JOB_SOCKET_RECEIVE); - attempt_read(s); + ssize_t bytes = attempt_read(s); + if(bytes > 0) { + s->last_traffic_seen_t = now_monotonic_sec(); + worker_set_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, (NETDATA_DOUBLE)bytes); + } } - worker_is_busy(WORKER_SENDER_JOB_EXECUTE); - execute_commands(s); + if(unlikely(s->read_len)) + execute_commands(s); - // If we have data and have seen the TCP window open then try to close it by a transmission. - if (outstanding && fds[Socket].revents & POLLOUT) { - worker_is_busy(WORKER_SENDER_JOB_SOCKET_SEND); - attempt_to_send(s); + if(unlikely(fds[Collector].revents & (POLLERR|POLLHUP|POLLNVAL))) { + char *error = NULL; + + if (unlikely(fds[Collector].revents & POLLERR)) + error = "pipe reports errors (POLLERR)"; + else if (unlikely(fds[Collector].revents & POLLHUP)) + error = "pipe closed (POLLHUP)"; + else if (unlikely(fds[Collector].revents & POLLNVAL)) + error = "pipe is invalid (POLLNVAL)"; + + if(error) { + rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true); + error("STREAM %s [send to %s]: restarting internal pipe: %s.", + rrdhost_hostname(s->host), s->connected_to, error); + } } - // TODO-GAPS - why do we only check this on the socket, not the pipe? - if (outstanding) { + if(unlikely(fds[Socket].revents & (POLLERR|POLLHUP|POLLNVAL))) { char *error = NULL; + if (unlikely(fds[Socket].revents & POLLERR)) error = "socket reports errors (POLLERR)"; else if (unlikely(fds[Socket].revents & POLLHUP)) error = "connection closed by remote end (POLLHUP)"; else if (unlikely(fds[Socket].revents & POLLNVAL)) error = "connection is invalid (POLLNVAL)"; + if(unlikely(error)) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR); - error("STREAM %s [send to %s]: restart stream because %s - %zu bytes transmitted.", s->host->hostname, - s->connected_to, error, s->sent_bytes_on_this_connection); + error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.", + rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection); rrdpush_sender_thread_close_socket(s->host); } } // protection from overflow - if (s->overflow) { + if(unlikely(s->flags & SENDER_FLAG_OVERFLOW)) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW); errno = 0; - error("STREAM %s [send to %s]: buffer full (%zu-bytes) after %zu bytes. Restarting connection", - s->host->hostname, s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection); + error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection", + rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection); rrdpush_sender_thread_close_socket(s->host); } + + worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE) dictionary_entries(s->replication.requests)); } netdata_thread_cleanup_pop(1); diff --git a/streaming/stream.conf b/streaming/stream.conf index 33172bbcb..7c9ccc9b8 100644 --- a/streaming/stream.conf +++ b/streaming/stream.conf @@ -33,36 +33,31 @@ destination = # Skip Certificate verification? - # # The netdata child is configurated to avoid invalid SSL/TLS certificate, # so certificates that are self-signed or expired will stop the streaming. # Case the server certificate is not valid, you can enable the use of # 'bad' certificates setting the next option as 'yes'. - # #ssl skip certificate verification = yes # Certificate Authority Path + # OpenSSL has a default directory where the known certificates are stored. + # In case it is necessary, it is possible to change this rule using the variable + # "CApath", e.g. CApath = /etc/ssl/certs/ # - # OpenSSL has a default directory where the known certificates are stored, - # case it is necessary it is possible to change this rule using the variable - # "CApath" - # - #CApath = /etc/ssl/certs/ + #CApath = # Certificate Authority file + # When the Netdata parent has a certificate that is not recognized as valid, + # we can add it to the list of known certificates in "CApath" and give it to + # Netdata as an argument, e.g. CAfile = /etc/ssl/certs/cert.pem # - # When the Netdata parent has certificate, that is not recognized as valid, - # we can add this certificate in the list of known certificates in CApath - # and give for Netdata as argument. - # - #CAfile = /etc/ssl/certs/cert.pem + #CAfile = # The API_KEY to use (as the sender) api key = # Stream Compression - # - # The netdata child is configurated to enable stream compression by default. + # The default is enabled # You can control stream compression in this agent with options: yes | no #enable compression = yes @@ -91,6 +86,7 @@ reconnect delay seconds = 5 # Sync the clock of the charts for that many iterations, when starting. + # It is ignored when replication is enabled initial clock resync iterations = 60 # ----------------------------------------------------------------------------- @@ -115,6 +111,11 @@ [API_KEY] # Default settings for this API key + # This GUID is to be used as an API key from remote agents connecting + # to this machine. Failure to match such a key, denies access. + # YOU MUST SET THIS FIELD ON ALL API KEYS. + type = api + # You can disable the API key, by setting this to: no # The default (for unknown API keys) is: no enabled = no @@ -127,9 +128,8 @@ # The default history in entries, for all hosts using this API key. # You can also set it per host below. - # If you don't set it here, the history size of the central netdata - # will be used. - default history = 3600 + # For the default db mode (dbengine), this is ignored. + #default history = 3600 # The default memory mode to be used for all hosts using this API key. # You can also set it per host below. @@ -140,7 +140,7 @@ # ram keep it in RAM, don't touch the disk # none no database at all (use this on headless proxies) # dbengine like a traditional database - default memory mode = ram + #default memory mode = dbengine # Shall we enable health monitoring for the hosts using this API key? # 3 possible values: @@ -150,7 +150,7 @@ # ensure that the netdata process on the child is gracefully stopped, to prevent invalid last_collected alarms # You can also set it per host, below. # The default is taken from [health].enabled of netdata.conf - health enabled by default = auto + #health enabled by default = auto # postpone alarms for a short period after the sender is connected default postpone alarms on connect seconds = 60 @@ -163,11 +163,19 @@ #default proxy send charts matching = * # Stream Compression - # - # The stream with the child can be configurated to enable stream compression. + # By default it is enabled. # You can control stream compression in this parent agent stream with options: yes | no #enable compression = yes + # Replication + # Enable replication for all hosts using this api key. Default: enabled + #enable replication = yes + + # How many seconds to replicate from each child. Default: a day + #seconds to replicate = 86400 + + # The duration we want to replicate per each step. + #replication_step = 600 # ----------------------------------------------------------------------------- # 3. PER SENDING HOST SETTINGS, ON PARENT NETDATA @@ -184,6 +192,11 @@ # you can give settings for each sending host here. [MACHINE_GUID] + # This GUID is to be used as a MACHINE GUID from remote agents connecting + # to this machine, not an API key. + # YOU MUST SET THIS FIELD ON ALL MACHINE GUIDs. + type = machine + # enable this host: yes | no # When disabled, the parent will not receive metrics for this host. # THIS IS NOT A SECURITY MECHANISM - AN ATTACKER CAN SET ANY OTHER GUID. @@ -197,14 +210,15 @@ # and at stream.conf [API_KEY].allow from allow from = * - # The number of entries in the database - history = 3600 + # The number of entries in the database. + # This is ignored for db mode dbengine. + #history = 3600 # The memory mode of the database: save | map | ram | none | dbengine - memory mode = save + #memory mode = dbengine # Health / alarms control: yes | no | auto - health enabled = yes + #health enabled = auto # postpone alarms when the sender connects postpone alarms on connect seconds = 60 @@ -217,8 +231,16 @@ #proxy send charts matching = * # Stream Compression - # - # The stream with the child can be configurated to enable stream compression. + # By default, enabled. # You can control stream compression in this parent agent stream with options: yes | no #enable compression = yes - \ No newline at end of file + + # Replication + # Enable replication for all hosts using this api key. + #enable replication = yes + + # How many seconds to replicate from each child. + #seconds to replicate = 86400 + + # The duration we want to replicate per each step. + #replication_step = 600 -- cgit v1.2.3